最近忙着工作上的事,看书看论文的时间也少了,周五啦放松下,把最近做的分布式课程梳理下。
之前一直在做麻省理工大学的分布式课程mit 6.824
虽然一直做,但也只是做了个lab1中的前两个part
关于map reduce ,论文太多了。
google的官方论文
part1 单机版顺序map reduce
mit给了部分map reduce执行的代码,包括测试用例,单机的多机的,任务失败的,不得不说,看看人家写的代码很有帮助,自己写的golang实在是搓。
1.doMap 函数
doMap函数:根绝master调用的输入文件,调用用户定义的map函数,然后将结果分区写到不同的out put file上。
如何分区,这个一开始没写好,一直测试不过,因为一次map reduce任务,有m个map任务,map任务取决于hdfs上block,n个reduce任务,这个取决于如何分区,这里没给分区函数,但测试用例初始化master的时候,file的个数决定map的个数,reduce的个数取决于给定的数量。
使用很简单的分区方式,对key进行hash后/reduce个数n取余
也就是hash(key)%nReduce得到分区文件
结果存储没啥好说的,一开始的建议是直接将输出的key value以json的形势存储文件,比较期待后边的倒排索引如何实现。
2.doReduce 函数
doReduce函数:根据master的调用,拿到属于该reduce任务的分区文件,也就是hash(key)%nReduce,得到该分区下的所有的map函数产生的数据文件,读取合并排序后调用reduce函数,得到output key value ,json格式写文件。完活
3.merge 函数
merge函数是mit给的,没有细看,大概就是把每个reduce,按照reduce的分区顺序合并,写入最后的结果文件中。
整体把map reduce的单机流程理清,最重要的还是如何实现分布式啦,如何处理map或者reduce函数的失败,hdfs的失败。
part2 单机的word count
根据刚才已经实现的map reduce框架,自己写map函数和reduce函数,有10多个文件,和测试代码,很简单与预计结果相同啦
part3 分布式map reduce
功能
这一部分模拟的是线程和rpc模拟分布式的map reduce的调度。
master线程主要功能:
1.map reduce 任务的调度
2.worker的注册
worker线程主要功能:执行map或者reduce任务
1.首先进程先启动master线程,初始化一个master对象,master对象中主要包含mapreduce任务需要的输入文件、reduce个数、worker极其状态等。然后会启动一个rpc服务,等待worker线程的注册。
2.初始化n个worker线程启动,并且调用注册master方法。
3.master根据已注册的可用的worker调度map和reduce任务
大体流程如上述。part3就是完成这个调度。
实现
第一步 master的数据结构
master数据结构
type Master struct {
sync.Mutex
address string
registerChannel chan string
doneChannel chan bool
workers []string // protected by the mutex
// Per-task information
jobName string // Name of currently executing job
files []string // Input files
nReduce int // Number of reduce partitions
shutdown chan struct{}
l net.Listener
stats []int // 0:可用,1:不可用
}
一个并发锁,名称,注册管道,完成管道,workers,workers的状态,任务名,输入文件,reduce个数。基本能用到的就是这些。
第二步 handle register event
首先master和worker是在不同的线程中并行执行
master线程(部分代码)
mr.jobName = jobName
mr.files = files
mr.nReduce = nreduce
fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)
schedule(mapPhase)
schedule(reducePhase)
finish()
mr.merge()
fmt.Printf("%s: Map/Reduce task completed\n", mr.address)
mr.doneChannel <- true
worker线程(部分代码)
wk := new(Worker)
wk.name = me
wk.Map = MapFunc
wk.Reduce = ReduceFunc
wk.nRPC = nRPC
rpcs := rpc.NewServer()
rpcs.Register(wk)
os.Remove(me) // only needed for "unix"
l, e := net.Listen("unix", me)
if e != nil {
log.Fatal("RunWorker: worker ", me, " error: ", e)
}
wk.l = l
wk.register(MasterAddress)
也就是说在master调用schedule(mapPhase)时候,我们并不能知道worker知否已经注册到了master。
先来看下wokrer调用master的register的rpc服务
func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
mr.Lock()
defer mr.Unlock()
debug("Register: worker %s\n", args.Worker)
mr.workers = append(mr.workers, args.Worker)
go func() {
mr.registerChannel <- args.Worker
}()
return nil
}
在master.workers中添加一个新的worker,并且写入mr.registerChannel
所以在schedule方法中,我们只要监听这个event即可
func handleRegisterEvent(mr *Master,callback *Callback){
go func(mr *Master){
for true{
<- mr.registerChannel //wait 新的注册事件
mr.Lock()
mr.stats = append(mr.stats,0) //初始化可用状态
callback.callbackChannel <- "register" //通知callback
mr.Unlock()
}
}(mr)
}
起一个线程专门wait worker的 register事件,然后通知callback,callback在获取可用worker时候用到
第三步 获取可用worker
在获取可用的worker时
先去master中找有没有可用的worker,状态为0的worker,如果有直接返回,没有则等待callback通知。
callback的写入事件包涵两种:
1.worker注册事件
2.worker完成任务事件
map reduce任务量可能很大,但是worker却只有几个。当worker都在使用中,等着分配worker的map reduce任务,只能等待。
当有新的可用的worker才能被调度。
/**
获取空闲的worker
*/
func idleWorker(mr *Master,callback *Callback) (idleWorker string,index int){
//fmt.Printf("before getidle worker : %s stats : %s\n",mr.workers, mr.stats)
for true {
idleWorker,index = getIdleWorker(mr)
if idleWorker == "" { //等待新事件
event := <- callback.callbackChannel
if event == "register" {
//fmt.Println("handle new register worker event")
}else {
//fmt.Println("handle new done worker event")
}
} else{ //获取到woker
break;
}
}
//fmt.Printf("after getidle worker : %s stats %d\n:",mr.workers, mr.stats)
return idleWorker,index
}
/**
获取可用状态的worker
*/
func getIdleWorker(mr *Master)(idleWorker string,index int){
mr.Lock()
defer mr.Unlock()
for i,_ := range mr.stats {
if(mr.stats[i] != 1 ){ //有可用的worker
idleWorker = mr.workers[i]
index = i
}
}
return
}
第四步 调度主流程
基本的获取worker的操作在前两步中已经有了。
1.handle register event
2.获取到可用的worker
3.调用worker的doMap rpc,告诉worker之行map/reduce任务
4.完成任务通知
这里说明一下因为并不能知道当前是否有任务在等待worker的完成,如果直接callback.callbackChannel <- “done”这样写入造成会造成阻塞,当worker够用没有线程在等待callback事件时,这个线程则会阻塞在这里,无法完成任务。所以使用Select非阻塞的方式,如果没有线程在等待callback事件的写入,则不通知。
5.等待所有任务完成
callback := &Callback {
name : "callback",
callbackChannel : make(chan string),
}
workdone := make(chan string)
handleRegisterEvent(mr, callback) //单独handle注册事件
for i:=0 ; i<ntasks ; i++ { //先分配map任务给当前空闲的worker
go func(nowNumTask int) {
w,index := idleWorker(mr,callback) //阻塞,等待可用worker
mr.stats[index] = 1
var taskArgs *DoTaskArgs
if phase == mapPhase{
taskArgs = &DoTaskArgs{
JobName : mr.jobName,
File : mr.files[nowNumTask],
Phase:mapPhase,
TaskNumber:nowNumTask,
NumOtherPhase:nios,
}
}else {
taskArgs = &DoTaskArgs{
JobName : mr.jobName,
Phase:reducePhase,
TaskNumber:nowNumTask,
NumOtherPhase:nios,
}
}
reply := new(struct{})
ok := call(w,"Worker.DoTask",taskArgs,reply)
if ok == false {
fmt.Println("do " + phase + " error")
}
mr.stats[index] = 0 //完成任务
select {
case callback.callbackChannel <- "done":
default :
}
workdone <- "work done"
}(i)
}
for i:=0 ; i < ntasks ; i++ { //等待所有worker完成
<- workdone
}
总结
part3 实际需要考虑的就是多线程的同步、通知等问题。有时间可以看下yarn的调度方式。