mit 6.824 mapreduce part1

最近忙着工作上的事,看书看论文的时间也少了,周五啦放松下,把最近做的分布式课程梳理下。

之前一直在做麻省理工大学的分布式课程mit 6.824
虽然一直做,但也只是做了个lab1中的前两个part

关于map reduce ,论文太多了。
google的官方论文

part1 单机版顺序map reduce

mit给了部分map reduce执行的代码,包括测试用例,单机的多机的,任务失败的,不得不说,看看人家写的代码很有帮助,自己写的golang实在是搓。

1.doMap 函数

doMap函数:根绝master调用的输入文件,调用用户定义的map函数,然后将结果分区写到不同的out put file上。

如何分区,这个一开始没写好,一直测试不过,因为一次map reduce任务,有m个map任务,map任务取决于hdfs上block,n个reduce任务,这个取决于如何分区,这里没给分区函数,但测试用例初始化master的时候,file的个数决定map的个数,reduce的个数取决于给定的数量。

使用很简单的分区方式,对key进行hash后/reduce个数n取余
也就是hash(key)%nReduce得到分区文件

结果存储没啥好说的,一开始的建议是直接将输出的key value以json的形势存储文件,比较期待后边的倒排索引如何实现。

2.doReduce 函数

doReduce函数:根据master的调用,拿到属于该reduce任务的分区文件,也就是hash(key)%nReduce,得到该分区下的所有的map函数产生的数据文件,读取合并排序后调用reduce函数,得到output key value ,json格式写文件。完活

3.merge 函数

merge函数是mit给的,没有细看,大概就是把每个reduce,按照reduce的分区顺序合并,写入最后的结果文件中。

整体把map reduce的单机流程理清,最重要的还是如何实现分布式啦,如何处理map或者reduce函数的失败,hdfs的失败。

part2 单机的word count

根据刚才已经实现的map reduce框架,自己写map函数和reduce函数,有10多个文件,和测试代码,很简单与预计结果相同啦

part3 分布式map reduce

功能

这一部分模拟的是线程和rpc模拟分布式的map reduce的调度。

master线程主要功能:
1.map reduce 任务的调度
2.worker的注册

worker线程主要功能:执行map或者reduce任务

1.首先进程先启动master线程,初始化一个master对象,master对象中主要包含mapreduce任务需要的输入文件、reduce个数、worker极其状态等。然后会启动一个rpc服务,等待worker线程的注册。
2.初始化n个worker线程启动,并且调用注册master方法。
3.master根据已注册的可用的worker调度map和reduce任务

大体流程如上述。part3就是完成这个调度。

实现

第一步 master的数据结构

master数据结构

type Master struct {
    sync.Mutex

    address         string
    registerChannel chan string
    doneChannel     chan bool
    workers         []string // protected by the mutex

    // Per-task information
    jobName string   // Name of currently executing job
    files   []string // Input files
    nReduce int      // Number of reduce partitions

    shutdown chan struct{}
    l        net.Listener
    stats    []int        // 0:可用,1:不可用

}

一个并发锁,名称,注册管道,完成管道,workers,workers的状态,任务名,输入文件,reduce个数。基本能用到的就是这些。

第二步 handle register event

首先master和worker是在不同的线程中并行执行
master线程(部分代码)

mr.jobName = jobName
mr.files = files
mr.nReduce = nreduce

fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)

schedule(mapPhase)
schedule(reducePhase)
finish()
mr.merge()

fmt.Printf("%s: Map/Reduce task completed\n", mr.address)

mr.doneChannel <- true

worker线程(部分代码)

wk := new(Worker)
wk.name = me
wk.Map = MapFunc
wk.Reduce = ReduceFunc
wk.nRPC = nRPC
rpcs := rpc.NewServer()
rpcs.Register(wk)
os.Remove(me) // only needed for "unix"
l, e := net.Listen("unix", me)
if e != nil {
    log.Fatal("RunWorker: worker ", me, " error: ", e)
}
wk.l = l
wk.register(MasterAddress)

也就是说在master调用schedule(mapPhase)时候,我们并不能知道worker知否已经注册到了master。

先来看下wokrer调用master的register的rpc服务

func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
    mr.Lock()
    defer mr.Unlock()
    debug("Register: worker %s\n", args.Worker)
    mr.workers = append(mr.workers, args.Worker)
    go func() {
        mr.registerChannel <- args.Worker
    }()
    return nil
}

在master.workers中添加一个新的worker,并且写入mr.registerChannel
所以在schedule方法中,我们只要监听这个event即可

func handleRegisterEvent(mr *Master,callback *Callback){

    go func(mr *Master){
        for true{
            <- mr.registerChannel                //wait 新的注册事件
            mr.Lock()
            mr.stats = append(mr.stats,0)            //初始化可用状态
            callback.callbackChannel <- "register"        //通知callback
            mr.Unlock()
        }

    }(mr)

}    

起一个线程专门wait worker的 register事件,然后通知callback,callback在获取可用worker时候用到

第三步 获取可用worker

在获取可用的worker时
先去master中找有没有可用的worker,状态为0的worker,如果有直接返回,没有则等待callback通知。
callback的写入事件包涵两种:

1.worker注册事件
2.worker完成任务事件

map reduce任务量可能很大,但是worker却只有几个。当worker都在使用中,等着分配worker的map reduce任务,只能等待。
当有新的可用的worker才能被调度。

/**
    获取空闲的worker
 */
func idleWorker(mr *Master,callback *Callback) (idleWorker string,index int){

    //fmt.Printf("before getidle worker : %s stats : %s\n",mr.workers, mr.stats)
    for true {
        idleWorker,index = getIdleWorker(mr)

        if idleWorker == "" {                        //等待新事件
            event := <- callback.callbackChannel
            if event == "register" {
                //fmt.Println("handle new register worker event")
            }else {
                //fmt.Println("handle new done worker event")
            }
        } else{                                //获取到woker
            break;
        }

    }
    //fmt.Printf("after getidle worker : %s stats %d\n:",mr.workers, mr.stats)
    return idleWorker,index
}

/**
    获取可用状态的worker
 */
func getIdleWorker(mr *Master)(idleWorker string,index int){
    mr.Lock()
    defer mr.Unlock()
    for i,_ := range mr.stats {
        if(mr.stats[i] != 1 ){                    //有可用的worker
            idleWorker = mr.workers[i]
            index = i
        }
    }
    return
}
第四步 调度主流程

基本的获取worker的操作在前两步中已经有了。

1.handle register event

2.获取到可用的worker

3.调用worker的doMap rpc,告诉worker之行map/reduce任务

4.完成任务通知

这里说明一下因为并不能知道当前是否有任务在等待worker的完成,如果直接callback.callbackChannel <- “done”这样写入造成会造成阻塞,当worker够用没有线程在等待callback事件时,这个线程则会阻塞在这里,无法完成任务。所以使用Select非阻塞的方式,如果没有线程在等待callback事件的写入,则不通知。

5.等待所有任务完成

callback := &Callback {
    name : "callback",
    callbackChannel : make(chan string),
}

workdone := make(chan string)

handleRegisterEvent(mr, callback)    //单独handle注册事件

for i:=0 ; i<ntasks ; i++ {                    //先分配map任务给当前空闲的worker

    go func(nowNumTask int) {
        w,index := idleWorker(mr,callback)        //阻塞,等待可用worker

        mr.stats[index] = 1

        var taskArgs *DoTaskArgs
        if phase == mapPhase{
            taskArgs = &DoTaskArgs{
                JobName : mr.jobName,
                File : mr.files[nowNumTask],
                Phase:mapPhase,
                TaskNumber:nowNumTask,
                NumOtherPhase:nios,
            }
        }else {
            taskArgs = &DoTaskArgs{
                JobName : mr.jobName,
                Phase:reducePhase,
                TaskNumber:nowNumTask,
                NumOtherPhase:nios,
            }
        }

        reply := new(struct{})

        ok := call(w,"Worker.DoTask",taskArgs,reply)
        if ok == false {
            fmt.Println("do " + phase + " error")
        }

        mr.stats[index] = 0        //完成任务

        select {
        case callback.callbackChannel <- "done":
        default :
        }

        workdone <- "work done"

    }(i)

}

for i:=0 ; i < ntasks ; i++ {            //等待所有worker完成
    <- workdone
}

总结

part3 实际需要考虑的就是多线程的同步、通知等问题。有时间可以看下yarn的调度方式。