概述
这次我们要处理worker故障.
- master通过调用
common_rpc.go: call()
来分配任务.当超时发生, 该函数 返回false
, 这时候master要把该任务重新分配给另一个worker. common_rpc.go: call()
返回false
不等于执行失败, worker可能执行得比较慢导致超时.这样可能导致, 实际上有多个worker在执行同一个task(对于master来说, 只有一个).- 此外, mapreduce通过GFS保证task的输出是完整的和原子的(要么全部文件都输出, 要么都不输出), 这个实验并没有实现这个保证, 之间简单地令worker crash.
问题
面对worker故障, 我们应该如何调整schedule()的代码?
当所有task都被完成了, schedule就结束, 那么如何跟踪每个task的状态?
有别于论文, 实验中可能出现的故障有(按阶段分类):
- 执行map阶段发现worker crash, 重新分配task
- map阶段结束, 中间输出已写入到本地disk, mapper不可访问, 但由于实验是多线程模拟分布式环境, 所有内容都是可见的, 这些中间结果也是可访问的.
- 执行reduce任务出错, 重新分配该任务即可。
- reduce执行完毕, reducer出故障, 结果也是可访问的, 所以不予考虑.
- worker向disk输出结果(包括中间文件、job result)出错。这里没有GFS, 只是简单地让worker crash。所以也等价于执行task中出错。
所以我们只需要考虑: task执行失败如何处理。
实现
我们首先看test_test.go
文件:
func TestOneFailure(t *testing.T) { mr := setup() // Start 2 workers that fail after 10 tasks go RunWorker(mr.address, port("worker"+strconv.Itoa(0)), MapFunc, ReduceFunc, 10, nil) go RunWorker(mr.address, port("worker"+strconv.Itoa(1)), MapFunc, ReduceFunc, -1, nil) mr.Wait() check(t, mr.files) checkWorker(t, mr.stats) cleanup(mr)}func TestManyFailures(t *testing.T) { mr := setup() i := 0 done := false for !done { select { case done = <-mr.doneChannel: check(t, mr.files) cleanup(mr) break default: // Start 2 workers each sec. The workers fail after 10 tasks w := port("worker" + strconv.Itoa(i)) go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil) i++ w = port("worker" + strconv.Itoa(i)) go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil) i++ time.Sleep(1 * time.Second) } }}
TestOneFailure()
使得worker0在10个rpc后dump掉,
TestManyFailure()
每秒启动2两个worker, 都是在10个rpc后崩掉, 模拟频繁故障的场景.
第一个测试导致如下错误:
Schedule: mapPhase doneSchedule: 10 reducePhase tasks (20 I/Os)/var/tmp/824-1000/mr28342-worker1: given reducePhase task #0 on file 824-mrinput-0.txt (nios: 20)/var/tmp/824-1000/mr28342-worker1: reducePhase task #0 doneSchedule: reducePhase doneMaster: RPC /var/tmp/824-1000/mr28342-worker0 shutdown errorMerge: read mrtmp.test-res-0Merge: read mrtmp.test-res-12019/09/29 15:38:28 Merge: open mrtmp.test-res-1: no such file or directory
我们看到, Master提示worker0 shutdown errot
后, 就开始调用Merge().
这句提示来自master.go: KillWorkers()
, 其关键部分如下:
var reply ShutdownReplyok := call(w, "Worker.Shutdown", new(struct{}), &reply)if ok == false { fmt.Printf("Master: RPC %s shutdown error\n", w)} else { ntasks = append(ntasks, reply.Ntasks)}
当call()
返回false时, 就表示Worker关闭出错. 正常情况下它返回Worker
完成的task数量.
通过调试发现, 在reduce阶段, 除了worker1分配到了一个任务外, 每次迭代从registerChan
中取到的都是worker0的rpc地址.
根据P3的实现, 直接调用call()
而没有处理返回值, 所以即使call()
发现超时, 也只会正常返回并且把worker0的地址放入registerChan
, 所以worker0的名字一直霸占着registerChan
.
实现1
如下代码, 用一个[]bool
跟踪task完成情况, 通过goroutine和chan来接受完成的Task的索引.
实现1没有实现如何重新分配task, 所以不能通过测试。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { // ... var ( wg sync.WaitGroup RPCMethodName = "Worker.DoTask" TaskTracker = make([]bool, ntasks) // should be protected by mutex inform = make(chan int) // get index of task done . ) // ------------------------------------------------------------------ init for i := 0; i < ntasks; i++ { TaskTracker[i] = false } // ------------------------------------------------------------------ tracker. go func(tracker *[]bool, informChan chan int) { NumTaskDone := 0 for { i := <-informChan // get index of task finished. if !TaskTracker[i] { TaskTracker[i] = true NumTaskDone++ log.Println("Task i = ", i, " done.") } if NumTaskDone >= ntasks { break } } log.Println("All task done.") }(&TaskTracker, inform) for i := 0; i < ntasks; i++ { if !TaskTracker[i] { workerName := <-registerChan go func(TaskIndex int, waitG *sync.WaitGroup, InformChan chan int) { waitG.Add(1) defer waitG.Done() args := DoTaskArgs{ JobName: jobName, File: mapFiles[TaskIndex], Phase: phase, TaskNumber: TaskIndex, NumOtherPhase: nOther, } isDone := call(workerName, RPCMethodName, args, nil) if isDone { go func() { registerChan <- workerName InformChan <- TaskIndex // ---------------------- New }() } return }(i, &wg, inform) } } wg.Wait() fmt.Printf("Schedule: %v done\n", phase)}
实现2
在实现1基础上添加重新分配Task
功能.
构思如下:
不需要采用互斥锁, main派生出dispatcher和tracker两个goroutine。这两个线程通过同步chan与Controller形成MPSC队列,来对TaskTracker进行访问。
代码:
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { var ntasks int var nOther int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mapFiles) nOther = nReduce case reducePhase: ntasks = nReduce nOther = len(mapFiles) } fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther) // All ntasks tasks have to be scheduled on workers. Once all tasks // have completed successfully, schedule() should return. // // Your code here (Part III, Part IV). // var ( wg sync.WaitGroup RPCMethodName = "Worker.DoTask" TaskTracker = make([]bool, ntasks) // should be protected by mutex TaskDoneChan = make(chan int) // get index of task done . ToDoChan = make(chan int) NumTaskDone = 0 ) // init for i := 0; i < ntasks; i++ { TaskTracker[i] = false } // --------------------------------------------------------------- tracker. go func() { for { i := <-TaskDoneChan // get index of task finished. TaskState := TaskTracker[i] if TaskState == false { TaskTracker[i] = true NumTaskDone++ log.Println("Task i = ", i, " done.") } if NumTaskDone >= ntasks { break } } log.Println("All task done.") }() // --------------------------------------------------------------- Dispatcher. go func() { for { i := <-ToDoChan // We should locking it. TaskState := TaskTracker[i] if TaskState == false { workerName := <-registerChan // Controller. go func(TaskIndex int, waitG *sync.WaitGroup) { waitG.Add(1) defer waitG.Done() //log.Println("Assign task", i, " to ", workerName) args := DoTaskArgs{ JobName: jobName, File: mapFiles[TaskIndex], Phase: phase, TaskNumber: TaskIndex, NumOtherPhase: nOther, } isDone := call(workerName, RPCMethodName, args, nil) if isDone { go func() { registerChan <- workerName TaskDoneChan <- TaskIndex }() // set TaskTracker is done. //go func(){ } else { // The worker may crashed. go func() { ToDoChan <- i }() } return }(i, &wg) } } }() for i := 0; i < ntasks; i++ { ToDoChan <- i } // In case of main exit before dispatcher and tracker start working. time.Sleep(time.Duration(100) * time.Millisecond) wg.Wait() fmt.Printf("Schedule: %v done\n", phase)}
通过测试:
/var/tmp/824-1000/mr10741-master: Map/Reduce task completedPASSok mapreduce 3.255s