学而后知不足


  • 首页

  • 分类

  • 归档

  • 标签

dubbo学习-spring schema扩展

发表于 2017-06-02   |   分类于 dubbo

dubbo对spring的标签的扩展

<dubbo:application name="${dubbo.application.name}" owner="whx" organization="whx-inc"/>
<dubbo:registry protocol="zookeeper"
                address="${dubbo.registry.address}"
                group="${dubbo.registry.group}"
                file="${dubbo.registry.file}"/>

<dubbo:annotation package="com.whx"/>

<dubbo:consumer check="false" retries="0"/>

<dubbo:provider threads="600" threadpool="fixed" retries="0" loadbalance="roundrobin" accesslog="true"/>

<dubbo:protocol name="dubbo" port="${dubbo.protocol.dubbo.port}" charset="utf-8"/>

<dubbo:protocol name="hessian" port="${jetty.port}" server="jetty" accesslog="true"/>
<dubbo:protocol name="rest" port="${dubbo.protocol.rest.port}"
   keepalive="true" server="netty" 
   extension="com.whx.common.rest.FastJsonProvider,com.whx.common.rest.JsonExceptionMapper" />

<context:property-placeholder
        ignore-resource-not-found="false"
        location="classpath*:application.properties,classpath*:redis.properties,classpath*:dubbo.properties,classpath*:kestrel.properties,classpath*:db.properties" />
<dubbo:monitor address="${dubbo.monitor.address}" />

这些注解其实都是对spring标签的扩展,由spring的api扩展出来,方便我们不去读取xml,直接通过标签的方式加载为bean对象

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

static {
    Version.checkDuplicate(DubboNamespaceHandler.class);
}

public void init() {
    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}

}

spring schema扩展实现

1.编写Bean类,此为业务类
public class People {

    private String id;
    private String name;
    private Integer age;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "People{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}
2.编写xsd文件 xsd文件中包含了Bean类的属性映射
<?xml version="1.0" encoding="UTF-8"?>
    <xsd:schema
        xmlns="http://whx4j8.github.io/people"
        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
        xmlns:beans="http://www.springframework.org/schema/beans"
        targetNamespace="http://whx4j8.github.io/people"
        elementFormDefault="qualified"
        attributeFormDefault="unqualified">

    <xsd:import namespace="http://www.springframework.org/schema/beans"/>
    <xsd:element name="people">
        <xsd:complexType>
            <xsd:complexContent>
                <xsd:extension base="beans:identifiedType">
                    <xsd:attribute name="name" type="xsd:string"/>
                    <xsd:attribute name="age" type="xsd:int"/>
                </xsd:extension>
            </xsd:complexContent>
        </xsd:complexType>
    </xsd:element>

</xsd:schema>

element name=”people” 代表people,spring会通过其内的配置项xsd:attribute在xml提示字段,类似xml提示

xmlns targetNamespace 代表对应的唯一命名空间

一般将xsd放到META-INF下

3.编写NamespaceHandler 和 spring.schemas 映射配置文件

registerBeanDefinitionParser(“people”, new PeopleBeanDefinitionParser());

当读取到people这个节点名时,使用PeopleBeanDefinitionParser类去解析,而不是仅仅将字段注入属性中。

public class MyNamespaceHandler extends NamespaceHandlerSupport {

    @Override
    public void init() {
        registerBeanDefinitionParser("people", new PeopleBeanDefinitionParser());
    }

}

PeopleBeanDefinitionParser通过反射将xml中配置的值注入到生成的bean对象中。

public class PeopleBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {

    @Override
    protected Class<?> getBeanClass(Element element) {
        return People.class;
    }


    @Override
    protected void doParse(Element element, BeanDefinitionBuilder builder) {
        String name = element.getAttribute("name");
        String age = element.getAttribute("age");
        String id = element.getAttribute("id");

        if (!StringUtils.isEmpty(name)){
            builder.addPropertyValue("name",name);
        }

        if (!StringUtils.isEmpty(age)){
            builder.addPropertyValue("age",age);
        }

        if (!StringUtils.isEmpty(id)){
            builder.addPropertyValue("id",id);
        }

    }
}
4.spring.handlers 和 spring.schemas

现在xml规范有了,xml解析器也有了,Bean对象也有了。

spring.handlers文件

http\://whx4j8.github.io/people=com.whx.spring.MyNamespaceHandler
告诉spring当使用http\://whx4j8.github.io/people使用com.whx.spring.MyNamespaceHandler去配置解析

spring.schemas文件

http\://whx4j8.github.io/people.xsd=META-INF/people.xsd
告诉spring去加载那些xsd文件
5.application.xml
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:whx="http://whx4j8.github.io/people"
       xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://whx4j8.github.io/people
                http://whx4j8.github.io/people.xsd">

    <whx:people id="people" name="whx" age="22"></whx:people>
</beans>

xmlns:whx=”http://whx4j8.github.io/people“ 为该schema命名为whx,在下面的xml直接使用whx即可

6.使用
public static void main(String[] args){

    ApplicationContext ctx = new ClassPathXmlApplicationContext("application.xml");
    People people = (People) ctx.getBean("people");
    System.out.println(people.toString());

}

输出People{id=’people’, name=’whx’, age=22}

参考

dubbo源码解析

关于dubbo monitor的扩展-监控报表

发表于 2017-05-23   |   分类于 分布式服务

服务化

服务化的优点

1.代码统一化,业务逻辑封装在服务内,同样的业务逻辑调用服务,对于上层代码量轻,易于维护

2.资源控制,数据库链接资源等等。mysql、es等等链接也是重要的资源,将需要维护的数据库的资源控制在服务内,若压力较大,横向扩容

3.业务拆分,减轻同一数据库的压力,便于项目灵活变动,不同服务可能使用不同的数据库

服务化的缺点

1.数据一致性,跨服务的数据一致性:分布式事物、tcc事物、消息事物等等,一般业务的难以维护

2.日志的查找,多个服务之间的日志,难以查找,需要介入服务治理工具,日志跟踪、日志收集、日志聚合查询等等,参考dapper

3.服务本身的预警、分析、降级等等

dubbo monitor

dubbo为阿里的一个开源的rpc框架,以不再维护,后期当当网进行过二次开发dubbox,大多数互联网公司都在使用该框架。

dubbo monitor为dubbo的一个监控工具,可以在上monitor 看到接口的TPS、并发、响应时间,可惜的是没有对monitor进行报表汇总工作。

当流量激增的时候,调用方容易出现超时的情况,超时较多接口失败,影响用户体验。接口没做并发处理、幂等处理,还容易造成数据重复提交、脏读等情况。

provider压力较大的情况,容易出现线程池用光,调用方新的请求不能响应,系统吞吐降低。

dubbo monitor本身收集各个接口的调用、执行数据。却没有进行预警、和报表处理

改造

2种思路

1.接入dubbo monitor对其添加报警功能
2.接入一个新的中间轻量级服务,对其monitor添加数据分析、报表、报警功能

elastic_search压测

发表于 2017-05-08   |   分类于 测试

多机压测 elastic search

1.压测对象

resin3

压测目标:分页查询用户全量订单接口
1.2台resin http服务  (测试环境 resin3 默认配置,未调优)
2.2台dubbo 服务 (测试环境 4g内存 provider 600线程)
3.5台elastic_search集群 ssd硬盘 (将上线的es)

jetty8

压测目标:分页查询用户全量订单接口
1.2台resin http服务  (测试环境 jetty8 默认配置,未调优)
2.2台dubbo 服务 (测试环境 4g内存 provider 600线程)
3.5台elastic_search集群 ssd硬盘 (将上线的es)

2.压测工具

siege 2.7.0

3.压测方式

单台虚拟机性能较弱,能够tcp链接数和Linux进程数量有限。采用多机压测,统计平均结果的方式

每台压测机上的脚本

siege -c $1 -r $2 --log=/data/pressure_test/data.txt --header "Cookie: cookie登录的标识" -f /data/pressure_test/url.txt
$1     启动的线程数量
$2    每个线程请求的数量
/data/pressure_test/data.txt        压测结果数据文件
/data/pressure_test/url.txt        将要压测的url

压测脚本部署在5台压测机器上,由中控机控制并发访问

*.143
*.144
*.145
*.107
*.28    

中控机启动脚本,使用后台启动,保证每台压测上脚本并发执行

ips=`cat /root/pressure_test/ips`
for ip in $ips
do
    {
        ssh $ip "sh /data/pressure_test/pressure_test.sh $1 $2 > temp.log"
    }&
done
wait

4.resin3与jetty8下的ES查询压测对比

QPS

横坐标为同时请求的并发数量,纵坐标为接口实际每秒查询率QPS

响应率

横坐标为同时请求的并发数量,纵坐标为接口实际响应率

响应时间

横坐标为同时请求的并发数量,纵坐标为接口实际平均响应时间

resin在默认配置下,2台在3200并发下为极限性能,qps达到2k,然后急速下滑,响应率下降,响应时间突增。

jetty在默认配置下,2台在3600-5200并发为基本可以维持3k左右qps,响应率100%,响应可以维持在1s内。在5600并发下为极限并发,开始出现响应率下降,响应时间突增的现象。

一次简单的压测

发表于 2017-03-17   |   分类于 测试

对http服务关键接口压测

对测试环境http服务中关键接口压测
以下测试数据为测试环境数据、非生产环境真实数据

压测目标

测试环境单点resin 单点dubbo mongoDB
以订单列表接口为压测对象

压测工具

siege 2.7.0

测试用例

${thread_num}       线程并发访问数量
${request_num}     每个线程请求接口次数

thread_num   200 400 600 800 1000 1200 1400 1600 1800 2000
request_num 50

siege -c ${thread_num} -r ${request_num} --log=/data/test_data/data.txt --header "${header}" orderlist

QPS

可以看出当并发达到1600时候,qps达到峰值2113.之后随着并发的增加,qps逐渐降低

并且出现siege aborted due to excessive socket failure,该问题是建立tcp过程中无法拿到临时端口导致无法建立链接

查看uc-rest测试机tcp链接状态

netstat -n | awk ‘/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}’

TIME_WAIT 11186

CLOSE_WAIT 9

ESTABLISHED 108

大量TIME_WAIT 链接未释放,占用临时端口,新请求无法建立

响应率

在并发1600,响应率瞬间降低至80%,大量未来响应请求出现

响应时间

响应时间在并发1200之后开始剧增

jenkins集成maven cobertur jacoco

发表于 2016-09-29

jenkins集成maven

安装maven环境

1.在本地安装maven环境,maven是一个jar包管理工具。去官网http://maven.apache.org/download.cgi 下载apache-maven-3.3.9-bin.zip,解压后放到当前用户的目录下。

/Users/wanghongxing/code/tool/apache-maven-3.3.9             

2.进入到该目录后,测试环境是否成功。执行命令查看环境

./bin/mvn -v

显示java和maven的version和path,则代表成功。不成功多半是java环境变量没有配置好

➜  apache-maven-3.3.9 ./bin/mvn -v
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00)
Maven home: /Users/wanghongxing/code/tool/apache-maven-3.3.9
Java version: 1.8.0_91, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.11.6", arch: "x86_64", family: "mac"
jenkins集成maven环境

1.配置jenkins中的maven工具
在 系统管理-Global Tool Configuration
配置jdk、maven、git工具,git一般是配置好的不用管,maven和jdk要配置好路径,注意不要点自动安装。如下图

2.点击新建,选择构建maven项目

选择github project


点击保存

执行构建,如果失败 查看本次构建的Console Output,看日志提示哪里执行时报错了。

jenkins集成cobertur

看这篇博客介绍的很全,照着配置就行了。http://www.voidcn.com/blog/javalover_yao/article/p-1907365.html

jenkins集成jacoco

这篇博客,同理
http://zhaozhiming.github.io/blog/2012/11/30/use-jacoco-in-jenkins-and-sonar-part-2/

高并发下接口的并发问题

发表于 2016-07-21   |   分类于 线上bug解决

事故

前些天上线的扫码送会员活动。
场景:用户登录账号之后,扫二维码,送七天黄金会员,限制每个帐号只能领取一个
有恶意用户刷接口,在高并发下越过限制。

原因

领取会员流程:
    1.后端先生成卡卷,将卡号放到消息队列中
    2.用户扫码请求领取会员接口
        2-1).先检查用户是否已经领取过该活动会员
        2-2).领取过return “该帐号已领取”的标示  
        2-3).没领取从消息队列中拿取一张卡号
        2-4).激活卡
        2-5).更新用户本次活动为已经激活

这个流程在一般环境下是没有问题的,在高并发下就不行了。

            2-1)        2-2)         2-3)       2-4)      2-5)

线程a                                                   -->

线程b                                      -->

线程c                                    -->

高并发下模拟几个线程同时请求

现在的rpc服务,除去极其敏感性数据的操作,其它数据的接口基本都没有做数据一致性控制。

其实做了控制也不能解决这个问题。再来说这个问题,高并发下因为线程a已经执行完激活卡的操作,用户的会员已经建立权益。但这时候线程a还没有执行到2-5,还没更新用户的领取卡卷的状态,这时候,又有一个这个用户的领取卡卷请求过来。2-1的check 操作并不能阻止这个请求,同样的再次领取卡卷并且激活,导致线程a在的执行在2-1到2-5之间都会有其它的线程越过检查。

解决

解决这种并发问题无非是两种,悲观锁和乐观锁。
悲观锁阻塞,乐观锁快速响应失败。

                优点                        缺点

悲观锁        可以响应重复请求,幂等            高并发下请求堆积


乐观锁        高并发下没有大量线程阻塞        不可重复响应,不幂等

考虑并发量比较大,采用的乐观锁实现。对流程进行加锁。

2种实现方式:redis和mysql,考虑下在不修改原表的情况下,使用redis的SETNX的api

实现:

    2-0).活动-帐号形成key,SETNX(key)成功返回1,失败返回0
          只有返回1,才能进行后续流程,将并发控制交给redis,redis是线程模型没有并发问题
    2-1).先检查用户是否已经领取过该活动会员
    2-2).领取过return “该帐号已领取”的标示  
    2-3).没领取从消息队列中拿取一张卡号
    2-4).激活卡
    2-5).更新用户本次活动为已经激活
    2-6).将删除活动-帐号形成的key


        2-0)    2-1)   2-2)     2-3)  2-4)  2-5)   2-6)

线程a   ->1                                          

线程b   ->0                                   
         <-    
线程c     ->0                            
         <-

只有线程a已经执行过2-6,才能线程b进入流程,但是这时候用户已经为领取过卡卷状态            
线程a                                                    ->

线程b  ->1 用户卡卷已经更新过

线程c  ->0                                            

浏览器302重定向之迷

发表于 2016-07-19   |   分类于 线上bug解决

事故

晚上9点左右忽然爆出线上支付宝wap拉不起来。
场景:h5页面点击确认支付,不能拉起支付,
但是直接拿到确认支付的请求,放在浏览器中可以拉起支付。

排查原因

先看了预支付请求对接部门的返回,正常的结果。

不知道原因,但是看到不能拉起支付的请求,是同时请求了支付两次,而且302跳转不正常。
显示异常

XMLHttpRequest cannot load ****支付url**** 
'control-allow-origin' header is present on the requested resource

查了下,是js跨域的异常,这时估计是什么原因导致浏览器302重定向失败了。也不知道为啥。

正常的支付流程

    request                                      response

1.前端请求支付链接请求                        
                                            302响应,带着对接部门的支付链接
2.浏览器拿到302,请求对接部门支付链接        
                                            302响应,带着支付宝wap的支付链接
3.浏览器拿到302,请求支付宝wap的支付链接
                                            返回支付页面

现在流程2就爆出跨域问题了。。
怀疑同时请求两次导致的,直接将支付链接扔到浏览器上,查看浏览器请求,一次支付请求,重定向正常,拉起支付。

点击确认支付,同时请求两次支付请求,重定向异常,显示跨域异常,不能拉起支付。

找到问题所在。

总结

现在还不知道什么原因导致的,同时请求相同的302浏览器重定向失败。
明明好好查下。

mit 6.824 mapreduce part1

发表于 2016-07-08   |   分类于 分布式服务

最近忙着工作上的事,看书看论文的时间也少了,周五啦放松下,把最近做的分布式课程梳理下。

之前一直在做麻省理工大学的分布式课程mit 6.824
虽然一直做,但也只是做了个lab1中的前两个part

关于map reduce ,论文太多了。
google的官方论文

part1 单机版顺序map reduce

mit给了部分map reduce执行的代码,包括测试用例,单机的多机的,任务失败的,不得不说,看看人家写的代码很有帮助,自己写的golang实在是搓。

1.doMap 函数

doMap函数:根绝master调用的输入文件,调用用户定义的map函数,然后将结果分区写到不同的out put file上。

如何分区,这个一开始没写好,一直测试不过,因为一次map reduce任务,有m个map任务,map任务取决于hdfs上block,n个reduce任务,这个取决于如何分区,这里没给分区函数,但测试用例初始化master的时候,file的个数决定map的个数,reduce的个数取决于给定的数量。

使用很简单的分区方式,对key进行hash后/reduce个数n取余
也就是hash(key)%nReduce得到分区文件

结果存储没啥好说的,一开始的建议是直接将输出的key value以json的形势存储文件,比较期待后边的倒排索引如何实现。

2.doReduce 函数

doReduce函数:根据master的调用,拿到属于该reduce任务的分区文件,也就是hash(key)%nReduce,得到该分区下的所有的map函数产生的数据文件,读取合并排序后调用reduce函数,得到output key value ,json格式写文件。完活

3.merge 函数

merge函数是mit给的,没有细看,大概就是把每个reduce,按照reduce的分区顺序合并,写入最后的结果文件中。

整体把map reduce的单机流程理清,最重要的还是如何实现分布式啦,如何处理map或者reduce函数的失败,hdfs的失败。

part2 单机的word count

根据刚才已经实现的map reduce框架,自己写map函数和reduce函数,有10多个文件,和测试代码,很简单与预计结果相同啦

part3 分布式map reduce

功能

这一部分模拟的是线程和rpc模拟分布式的map reduce的调度。

master线程主要功能:
1.map reduce 任务的调度
2.worker的注册

worker线程主要功能:执行map或者reduce任务

1.首先进程先启动master线程,初始化一个master对象,master对象中主要包含mapreduce任务需要的输入文件、reduce个数、worker极其状态等。然后会启动一个rpc服务,等待worker线程的注册。
2.初始化n个worker线程启动,并且调用注册master方法。
3.master根据已注册的可用的worker调度map和reduce任务

大体流程如上述。part3就是完成这个调度。

实现

第一步 master的数据结构

master数据结构

type Master struct {
    sync.Mutex

    address         string
    registerChannel chan string
    doneChannel     chan bool
    workers         []string // protected by the mutex

    // Per-task information
    jobName string   // Name of currently executing job
    files   []string // Input files
    nReduce int      // Number of reduce partitions

    shutdown chan struct{}
    l        net.Listener
    stats    []int        // 0:可用,1:不可用

}

一个并发锁,名称,注册管道,完成管道,workers,workers的状态,任务名,输入文件,reduce个数。基本能用到的就是这些。

第二步 handle register event

首先master和worker是在不同的线程中并行执行
master线程(部分代码)

mr.jobName = jobName
mr.files = files
mr.nReduce = nreduce

fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)

schedule(mapPhase)
schedule(reducePhase)
finish()
mr.merge()

fmt.Printf("%s: Map/Reduce task completed\n", mr.address)

mr.doneChannel <- true

worker线程(部分代码)

wk := new(Worker)
wk.name = me
wk.Map = MapFunc
wk.Reduce = ReduceFunc
wk.nRPC = nRPC
rpcs := rpc.NewServer()
rpcs.Register(wk)
os.Remove(me) // only needed for "unix"
l, e := net.Listen("unix", me)
if e != nil {
    log.Fatal("RunWorker: worker ", me, " error: ", e)
}
wk.l = l
wk.register(MasterAddress)

也就是说在master调用schedule(mapPhase)时候,我们并不能知道worker知否已经注册到了master。

先来看下wokrer调用master的register的rpc服务

func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
    mr.Lock()
    defer mr.Unlock()
    debug("Register: worker %s\n", args.Worker)
    mr.workers = append(mr.workers, args.Worker)
    go func() {
        mr.registerChannel <- args.Worker
    }()
    return nil
}

在master.workers中添加一个新的worker,并且写入mr.registerChannel
所以在schedule方法中,我们只要监听这个event即可

func handleRegisterEvent(mr *Master,callback *Callback){

    go func(mr *Master){
        for true{
            <- mr.registerChannel                //wait 新的注册事件
            mr.Lock()
            mr.stats = append(mr.stats,0)            //初始化可用状态
            callback.callbackChannel <- "register"        //通知callback
            mr.Unlock()
        }

    }(mr)

}    

起一个线程专门wait worker的 register事件,然后通知callback,callback在获取可用worker时候用到

第三步 获取可用worker

在获取可用的worker时
先去master中找有没有可用的worker,状态为0的worker,如果有直接返回,没有则等待callback通知。
callback的写入事件包涵两种:

1.worker注册事件
2.worker完成任务事件

map reduce任务量可能很大,但是worker却只有几个。当worker都在使用中,等着分配worker的map reduce任务,只能等待。
当有新的可用的worker才能被调度。

/**
    获取空闲的worker
 */
func idleWorker(mr *Master,callback *Callback) (idleWorker string,index int){

    //fmt.Printf("before getidle worker : %s stats : %s\n",mr.workers, mr.stats)
    for true {
        idleWorker,index = getIdleWorker(mr)

        if idleWorker == "" {                        //等待新事件
            event := <- callback.callbackChannel
            if event == "register" {
                //fmt.Println("handle new register worker event")
            }else {
                //fmt.Println("handle new done worker event")
            }
        } else{                                //获取到woker
            break;
        }

    }
    //fmt.Printf("after getidle worker : %s stats %d\n:",mr.workers, mr.stats)
    return idleWorker,index
}

/**
    获取可用状态的worker
 */
func getIdleWorker(mr *Master)(idleWorker string,index int){
    mr.Lock()
    defer mr.Unlock()
    for i,_ := range mr.stats {
        if(mr.stats[i] != 1 ){                    //有可用的worker
            idleWorker = mr.workers[i]
            index = i
        }
    }
    return
}
第四步 调度主流程

基本的获取worker的操作在前两步中已经有了。

1.handle register event

2.获取到可用的worker

3.调用worker的doMap rpc,告诉worker之行map/reduce任务

4.完成任务通知

这里说明一下因为并不能知道当前是否有任务在等待worker的完成,如果直接callback.callbackChannel <- “done”这样写入造成会造成阻塞,当worker够用没有线程在等待callback事件时,这个线程则会阻塞在这里,无法完成任务。所以使用Select非阻塞的方式,如果没有线程在等待callback事件的写入,则不通知。

5.等待所有任务完成

callback := &Callback {
    name : "callback",
    callbackChannel : make(chan string),
}

workdone := make(chan string)

handleRegisterEvent(mr, callback)    //单独handle注册事件

for i:=0 ; i<ntasks ; i++ {                    //先分配map任务给当前空闲的worker

    go func(nowNumTask int) {
        w,index := idleWorker(mr,callback)        //阻塞,等待可用worker

        mr.stats[index] = 1

        var taskArgs *DoTaskArgs
        if phase == mapPhase{
            taskArgs = &DoTaskArgs{
                JobName : mr.jobName,
                File : mr.files[nowNumTask],
                Phase:mapPhase,
                TaskNumber:nowNumTask,
                NumOtherPhase:nios,
            }
        }else {
            taskArgs = &DoTaskArgs{
                JobName : mr.jobName,
                Phase:reducePhase,
                TaskNumber:nowNumTask,
                NumOtherPhase:nios,
            }
        }

        reply := new(struct{})

        ok := call(w,"Worker.DoTask",taskArgs,reply)
        if ok == false {
            fmt.Println("do " + phase + " error")
        }

        mr.stats[index] = 0        //完成任务

        select {
        case callback.callbackChannel <- "done":
        default :
        }

        workdone <- "work done"

    }(i)

}

for i:=0 ; i < ntasks ; i++ {            //等待所有worker完成
    <- workdone
}

总结

part3 实际需要考虑的就是多线程的同步、通知等问题。有时间可以看下yarn的调度方式。

使用flume,伪分布式集群配置

发表于 2016-06-18   |   分类于 分布式服务

伪分布式搭建

flume主要有0.9和1.6两个版本,这里使用0.9版本

1.下载flume-0.9.4-cdh4.1.5
2.解压后进入到项目目录
3.执行bin/flume master,查看master可用http://host:35871/
4.另启一个终端执行bin/flume node,查看agent可用http://master:35862/
5.在http://host:35871/上,点config,提交一个配置
    configure node :    master(我机器的主机名)
    source         :    tail("/home/hadoop/flume_file/sourcefile")
    sink           :    text("/home/hadoop/flume_file/sinkfile")
6.回到配置主页面状态显示SUCCEEDED
7.现在向sourcefile追加内容,echo 'hello world' >> sourcefile
8.查看同步的sinkfile,显示master [INFO Sat Jun 18 17:35:01 CST 2016] { tailSrcFile : sourcefile } hello world

这样使用flume中的agent,没有使用到collector,在本机器同步数据。

下篇测试多机部署数据同步。

下面关于flume中的master、node介绍是从官方文档翻译的,水平很有限,如有问题请见谅并指出https://github.com/whx4J8/whx4j8.github.io

非分布式模式

flume有两种模式:flume master 和 flume node

master控制节点的数据流,他是一个单一的逻辑实体,拥有全局的状态数据,并控制flume节点的数据流和监控flume 节点。flume 节点作为事件流的数据路径,它们可以是事件数据的来源、管道、和消费者。节点周期性的与master发送一个心跳,并获取它们的数据流配置。

flume master

master可以通过下面的命令启动

flume master

然后master开始运行,你可以使用浏览器访问,localhost:35871。此网页显示与master 联系的所有的flume node的状态。并且显示当前所分配的每个node的配置,当你启动这个flume 节点运行时,状态和配置表应该是空的。
该网页包含四个表,这些表中的信息代表当前flume中的全局状态。

节点状态表
节点配置表
物理/逻辑节点映射表
命令历史表

master节点状态表包含所有与master接触的flume节点的名称,和它们当前的配置版本(最初是无),它们的状态。每个flume节点的名称与Linux主机名相同。

master节点配置表包含节点的逻辑名称,和分配给他的版本号,和他的source和sinks的规范。最初,这表是空的,但是在你更改值之后,你可以查看网页查看更新。有两组列,用户输入的version/source/sink。

master物理/逻辑节点映射表包含了逻辑节点到它们的物理节点的映射。

master命令历史记录表包含命令的状态。在一般的情况下,命令修改master全局状态。命令在master上被处理,并且分配一个唯一的命令标示号,每个命令有一个状态(成功、失败、挂起),一个命令行和一个尝试执行的消息。

flume node

要启动flume node , 在另一个终端执行命令

flume node_nowatch

检查flume node,访问localhost:35862。每个节点在一个表上显示自己的数据,包括关于节点的诊断、统计数据、数据流和正在运行所在机器的系统度量。如果你有多个flume程序运行在一台机器上,他会自动的递增端口号,并尝试绑定下一个端口号,记录最终的选定的端口。

如果node已经运行,你应该重新刷新master 状态页,确保node与master联通。如果你新加入一个节点,你应该在主节点状态表中看到一个新的节点,在逻辑节点映射表中有一个新的条目,将逻辑节点关联物理节点。

通过flume master 配置 node

node节点可以与master接触,master可以得到她们的配置。使你可以不需要登录到远程机器修改配置后重启守护进程。你可以快速更改节点以前的数据流。

在master的页面,点击config连接。你有两种形式。这些都是web接口配置node 数据流。当flume node 连接到master,将被通知数据流version已经改变了,然后重新实例化,并且激活配置。

对于这个例子,可以在 configure a single node 中输入值,并且提交。

刷新master的页面,注意version、source、sink的改变。状态更新位active,他已经准备接受控制台数据了。
在master,一个节点可以有几种状态

hello:一个新的node实例连接到master
idle:一个节点刚刚完成配置或者没有配置
configuring:一个节点已经收到一个配置,并且正在激活配置
active:一个节点正在积极的从源中提取数据,并且讲数据推到sink中
lost:节点与master失去联系
decommissioned:一个节点被故意退役从集群中
error:异常

常见的配置

配置 node name source sink
1 host tail(“/home/hadoop/flume_file/sourcefile”) console

通过master改变系统中的不同节点的配置,从二获取从各种来源获取数据。

Sink 介绍

flume有各种各样的来源,产生或接受新的事件消息进入系统。你能限制这些消息到控制台,如你所料。flume为不同的目的地提供各种各样的事件sink。
许多事件的目的地,可能是本地硬盘,hdfs,控制台,转发网络接口。你使用sink作为一个接口,发送事件。

通过指定中心的配置,并提交master,可以将不同的source连接到不同的sink上。

flume event sinks 事件接受器

null                              没有,事件被删除
console[("format")]               控制台,格式可选
text("txtfile"[,"format"])        文件
dfs("dfsfile")                    分布式文件系统
syslogTcp("host",port)            同步tcp端口

分层flume节点:agent 和 collector

如果通过网络发送事件是容易的、高效的、可靠的将是非常美好的。但是现实中,从分布式机器中和网络中收集数据,将极大的增加可能发生的事故和种类。底线是通过负责的策略保证可靠性。

flume 简化了这些问题,通过提供一个预定义的拓扑性和可靠性。只需要你为node节点设置一个角色。一个简单的flume node 的拓扑结构将node分为两个role。一个是flume agent 和flume collector。agent位于正在产生日志的机器上。例如你可以指定一个flume agent配置为syslogtcp为source,并且配置syslog 生成服务器发送日志到指定的本地端口。flume agent 会有一个agentSink作为他的sink配置,并且发送到collector。

collector集群侦听来自多个agent的数据记录,最终写到hdfs。
为了演示伪分布式模式下的新的sinks,你将实例化一个flume node在本地。为此,你需要在开始时配置一些额外的参数。下面的命令行启动一个物理节点名为collector

flume node_nowatch -n collector

在master的页面上,你最中会看到两个节点,host和collector。flume node节点状态网页。在localhost:35862 localhost:35863。

端口绑定依赖实例化顺序,第一个物理节点初始化在35862、第二个初始化在35863

接着,collector承担起收集器的角色,通过使用聚集多配置的方式,数据从控制台发送到collector中。将agent使用agentsink,一个高可靠的网络sink。collector 节点的source配置为collectorSource同时他的source配置为console

host : console | agentSink("localhost",35853) ;
collector : collectorSource(35853) | console ;

当你在终端输入,事件被发送到collector。现在当消息到达collector时,大概有一定的延迟15s左右,这实际上是一个可配置的默认的设置的值,但测试时同步的延迟没有这么高。

理解flume

发表于 2016-06-16   |   分类于 分布式服务

线上使用flume同步日志到hdfs和es,一直没有机会看,现在来看下flume日志同步工具的大体设计

架构

flume 是一套分布的高可用的日志传输组件,有效的移动大量的数据。
flume日志设计简单、坚固、灵活。主要描述一个面向流的数据流,数据流描述一个单一的数据流的传输和处理,从数据的生成到最终目的。
由逻辑节点组成,可以将接受的事件转换或者聚合。逻辑节点节点链接在一起形成数据流。
控制这些的是master节点,一个单独的服务。记录着flume所有的物理和逻辑节点。master分配逻辑节点,并且负责所有的逻辑节点配置更新,反过来逻辑节点定期的联系master节点,所以它们可以共享信息,并且检查更新它们的配置信息

典型的flume,由一组逻辑节点部署,分为三层。代理节点通常安装在产生日志的机器上,作为数据与flume的接触点,并且将数据转发到collector层,然后存储到最后的存储层。代理层可以监听系统log,或者由web服务、Hadoop产生的日志。代理产生的数据流发送给collector,collector然后讲数据汇聚成较大的流写入hdfs

逻辑节点是一个非常灵活的抽象。每个逻辑节点都有2个组件,一个source源和一个sink接收器。source告诉逻辑节点那里收集数据,然后sink告诉逻辑节点发送数据到哪里。逻辑节点唯一的区别就是如何配置source和sink。它们还可以对数据进行简单的处理。在前面的例子中,collector 和 agent 运行在相同节点的软件。master分配一个配置给每个逻辑节点,一个节点的所有组件在运行时被动态分配,flume服务无需重启任何java进程的情况下它们的配置可以被修改很多次。

source sink optional decorator 是强大的源语。flume保证每个流数据的属性,活着计算事件元数据,甚至产生新的事件插入到流数据中。一个逻辑节点同样可以发送数据流到多个逻辑节点。这允许多个流,每个字流可以配置不同。有可能一个流是一个集合路径,可靠的传送一个持久性存储的数据,另一个流计算轻量级的分析,提供一个报警系统。

可靠性

可靠性,在不丢失数据的过程中继续传递事件。大型分布式系统可以在许多方面遭受故障,硬件、网络、内存、软件崩溃、运行缓慢。flume将容错作为核心设计。保持运行和收集数据,即使许多组件都失败了。

flume保证的一个agent所接受到的数据只要保证agent node运行最后都会执行到collector,数据可以可靠的传输到最终目的地。
但是,可靠的交付需要相当多的资源,往往一些数据需要更强大的保证。因此flume指定三个支持的可靠级别
1.端到端
2.存储故障
3.最大的努力

可靠性说明

虽然flume 及其容忍机器、网络、软件故障,从来没有任何这样的100%可靠性。如果flume所安装在的机器都发生了不可逆转的破坏,水槽数据的副本都丢失了,就无法恢复了

end-to-end 端到端级别的可靠性保证一旦flume一旦接受一个事件,该事件使她的断点,只要agent接受到event,将会被保持很长的时间。首先agent先写事件到硬盘的write-ahead log。如果代理崩溃或者重新启动,该事件不会被丢失。然后事件根据流程成功的发送到结束位置。一个确认ack被发送回原agent,以便他知道不需要将事件存储在磁盘上。这个可靠性水平可以承受任何数量的下游代理的故障。

store on failure 失败存储级别,需要节点将数据发送给下游节点的时候返回一个ack包,保证成功。如果发送节点发现失败,他将数据存储到节点所在的硬盘上,直到下游节点的修复,或者选择另一个下游的目的地。虽然这是有效的,如果上游发送数据进程挂了,数据可能丢失。

best-effort 最低的保证级别,数据从上游发送到下游节点没有任何的确认和重试交付。如果节点失败,他们发送的任何数据都会丢失。

消除单点
尽管使用端到端级别的可靠性水平,一个flume 流如果没有任何节点处理事件也是不可能有进展的。数据将会继续被agents收集,但是代理可能无法找到一个合适数据流下游,直到找到一个合适的在线节点。为了保持高可用性,flume允许没有任何用户的干预下入,若是发送到下游失败,则就去找另外的下游节点。flume的master进程也是可以被复制的高可用的,这意味着数据层和控制层都可以容忍一些故障。

扩展性

可扩展性是通过向系统中添加更多的资源来提高系统性能的能力。flume的目标是横向扩展,逐步添加更年多的机器,系统增加吞吐量能力。在flume中的一个关键性能指标是进入系统的数量或大小,并被交付。当负荷增加时,很简单,讲更多的资源添加到系统中,以增加更多的负荷量

有三个独立的组件组成的flume需要有不同的方法增加可扩展性:collector层,master,storage层

collector层需要能够扩展一处理大量的来自大量的代理节点的数据。通过在collector层添加更多的机器。可以增加系统的数量和最大的可用吞吐量。

一个单独的collector通常可以处理许多agent节点,因为每个单独的agent产生的日志,与collector的全部带宽相比,只是非常小的一部分。因此flume平衡流分配到不同的collector节点中。flume采用随机算法均匀的分配到collector list。这会自动传播负载,而且还能保持collector失败的情况下,传播负载。

由于系统中的节点增加,master节点在控制路径上的传输可能成为瓶颈。flume的master还可以通过更多的机器来支持横向扩展,虽然只是少数的商品服务器可以提供大良的节点安装。flume的master的状态保持同步和完全复制,这确保了他是容错和高度可扩展的。

可管理性

可管理性是控制数据流、监控节点、修改配置,控制输出到大型系统的一个能力。手动管理源点到终点是繁琐的容易出错的,另一个主要的痛点,又可能成千上万的日志生成应用程序和服务。他是一个重要集中管理点,监测和改变数据流,以及动态处理不同条件或问题的能力。

flume 的master是管理全局数据流状态的一点。痛殴master,用户可以监控流程和配置属性。master要求自动响应系统的比阿花,如负载不平衡,部分失败或者重新配置的硬件。

你可以利用master重新配置节点。虽然本博客描述的是传统的三层部署的例子。节点的灵活性允许任意节点的拓扑。你可以通过主界面提交一个采用数据流规范编写的小脚本重新配置节点。

你可以通过两个接口管理master:web接口或者flume的shell脚本。web接口提供交互式式的系统的状态,shell提供人工编写的脚本或机器生成的脚本管理。

功能扩展性

可扩展性是添加性功能到系统中大的能力。可以通过增加链接到现有存储层或者数据平台的flume。通过简单的接口就可以实现,功能设计到简单组件的分离,流程规范语言,和一个简单而灵活的数据模型。

flume提供特别的可靠性和资源性能的组件。一般来自从文件系统中的文件,系统日志和syslog-ng或者标准输出的一个过程。同样有很多的事件输出目的地。虽然hdfs是主要的输出地,但是事件也可以被发送到本地,或者监控和报警等渠道。

123
whx4J8

whx4J8

github.com/whx4J8

24 日志
12 分类
14 标签
© 2017 whx4J8
由 Hexo 强力驱动
主题 - NexT.Mist