概述

这次我们要处理worker故障.

  • master通过调用common_rpc.go: call()来分配任务.当超时发生, 该函数 返回false, 这时候master要把该任务重新分配给另一个worker.
  • common_rpc.go: call()返回false不等于执行失败, worker可能执行得比较慢导致超时.这样可能导致, 实际上有多个worker在执行同一个task(对于master来说, 只有一个).
  • 此外, mapreduce通过GFS保证task的输出是完整的和原子的(要么全部文件都输出, 要么都不输出), 这个实验并没有实现这个保证, 之间简单地令worker crash.

问题

面对worker故障, 我们应该如何调整schedule()的代码?

当所有task都被完成了, schedule就结束, 那么如何跟踪每个task的状态?

有别于论文, 实验中可能出现的故障有(按阶段分类):

  • 执行map阶段发现worker crash, 重新分配task
  • map阶段结束, 中间输出已写入到本地disk, mapper不可访问, 但由于实验是多线程模拟分布式环境, 所有内容都是可见的, 这些中间结果也是可访问的.
  • 执行reduce任务出错, 重新分配该任务即可。
  • reduce执行完毕, reducer出故障, 结果也是可访问的, 所以不予考虑.
  • worker向disk输出结果(包括中间文件、job result)出错。这里没有GFS, 只是简单地让worker crash。所以也等价于执行task中出错。

所以我们只需要考虑: task执行失败如何处理。

实现

我们首先看test_test.go文件:

func TestOneFailure(t *testing.T) {    mr := setup()    // Start 2 workers that fail after 10 tasks    go RunWorker(mr.address, port("worker"+strconv.Itoa(0)),        MapFunc, ReduceFunc, 10, nil)    go RunWorker(mr.address, port("worker"+strconv.Itoa(1)),        MapFunc, ReduceFunc, -1, nil)    mr.Wait()    check(t, mr.files)    checkWorker(t, mr.stats)    cleanup(mr)}func TestManyFailures(t *testing.T) {    mr := setup()    i := 0    done := false    for !done {        select {        case done = <-mr.doneChannel:            check(t, mr.files)            cleanup(mr)            break        default:            // Start 2 workers each sec. The workers fail after 10 tasks            w := port("worker" + strconv.Itoa(i))            go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil)            i++            w = port("worker" + strconv.Itoa(i))            go RunWorker(mr.address, w, MapFunc, ReduceFunc, 10, nil)            i++            time.Sleep(1 * time.Second)        }    }}

TestOneFailure() 使得worker0在10个rpc后dump掉,

TestManyFailure()每秒启动2两个worker, 都是在10个rpc后崩掉, 模拟频繁故障的场景.

第一个测试导致如下错误:

Schedule: mapPhase doneSchedule: 10 reducePhase tasks (20 I/Os)/var/tmp/824-1000/mr28342-worker1: given reducePhase task #0 on file 824-mrinput-0.txt (nios: 20)/var/tmp/824-1000/mr28342-worker1: reducePhase task #0 doneSchedule: reducePhase doneMaster: RPC /var/tmp/824-1000/mr28342-worker0 shutdown errorMerge: read mrtmp.test-res-0Merge: read mrtmp.test-res-12019/09/29 15:38:28 Merge: open mrtmp.test-res-1: no such file or directory

我们看到, Master提示worker0 shutdown errot后, 就开始调用Merge().
这句提示来自master.go: KillWorkers(), 其关键部分如下:

var reply ShutdownReplyok := call(w, "Worker.Shutdown", new(struct{}), &reply)if ok == false {    fmt.Printf("Master: RPC %s shutdown error\n", w)} else {    ntasks = append(ntasks, reply.Ntasks)}

call()返回false时, 就表示Worker关闭出错. 正常情况下它返回Worker完成的task数量.

通过调试发现, 在reduce阶段, 除了worker1分配到了一个任务外, 每次迭代从registerChan中取到的都是worker0的rpc地址.

根据P3的实现, 直接调用call()而没有处理返回值, 所以即使call()发现超时, 也只会正常返回并且把worker0的地址放入registerChan, 所以worker0的名字一直霸占着registerChan.

实现1

如下代码, 用一个[]bool跟踪task完成情况, 通过goroutine和chan来接受完成的Task的索引.
实现1没有实现如何重新分配task, 所以不能通过测试。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {    // ...    var (        wg            sync.WaitGroup        RPCMethodName = "Worker.DoTask"        TaskTracker   = make([]bool, ntasks) // should be protected by mutex        inform        = make(chan int)       // get index of task done .    )    // ------------------------------------------------------------------ init    for i := 0; i < ntasks; i++ {        TaskTracker[i] = false    }    // ------------------------------------------------------------------ tracker.    go func(tracker *[]bool, informChan chan int) {        NumTaskDone := 0        for {            i := <-informChan // get index of task finished.            if !TaskTracker[i] {                TaskTracker[i] = true                NumTaskDone++                log.Println("Task i = ", i, " done.")            }            if NumTaskDone >= ntasks {                break            }        }        log.Println("All task done.")    }(&TaskTracker, inform)    for i := 0; i < ntasks; i++ {        if !TaskTracker[i] {            workerName := <-registerChan            go func(TaskIndex int, waitG *sync.WaitGroup, InformChan chan int) {                waitG.Add(1)                defer waitG.Done()                args := DoTaskArgs{                    JobName:       jobName,                    File:          mapFiles[TaskIndex],                    Phase:         phase,                    TaskNumber:    TaskIndex,                    NumOtherPhase: nOther,                }                isDone := call(workerName, RPCMethodName, args, nil)                if isDone {                    go func() {                        registerChan <- workerName                        InformChan <- TaskIndex   // ---------------------- New                    }()                }                 return            }(i, &wg, inform)        }    }    wg.Wait()    fmt.Printf("Schedule: %v done\n", phase)}

实现2

在实现1基础上添加重新分配Task功能.

构思如下:

不需要采用互斥锁, main派生出dispatcher和tracker两个goroutine。这两个线程通过同步chan与Controller形成MPSC队列,来对TaskTracker进行访问。

代码:

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {    var ntasks int    var nOther int // number of inputs (for reduce) or outputs (for map)    switch phase {    case mapPhase:        ntasks = len(mapFiles)        nOther = nReduce    case reducePhase:        ntasks = nReduce        nOther = len(mapFiles)    }    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther)    // All ntasks tasks have to be scheduled on workers. Once all tasks    // have completed successfully, schedule() should return.    //    // Your code here (Part III, Part IV).    //    var (        wg            sync.WaitGroup        RPCMethodName = "Worker.DoTask"        TaskTracker   = make([]bool, ntasks) // should be protected by mutex        TaskDoneChan  = make(chan int)       // get index of task done .        ToDoChan      = make(chan int)        NumTaskDone   = 0    )    // init    for i := 0; i < ntasks; i++ {        TaskTracker[i] = false    }    // --------------------------------------------------------------- tracker.    go func() {        for {            i := <-TaskDoneChan // get index of task finished.            TaskState := TaskTracker[i]            if TaskState == false {                TaskTracker[i] = true                NumTaskDone++                log.Println("Task i = ", i, " done.")            }            if NumTaskDone >= ntasks {                break            }        }        log.Println("All task done.")    }()    // --------------------------------------------------------------- Dispatcher.    go func() {        for {            i := <-ToDoChan            // We should locking it.            TaskState := TaskTracker[i]            if TaskState == false {                workerName := <-registerChan                // Controller.                go func(TaskIndex int, waitG *sync.WaitGroup) {                    waitG.Add(1)                    defer waitG.Done()                    //log.Println("Assign task", i, " to ", workerName)                    args := DoTaskArgs{                        JobName:       jobName,                        File:          mapFiles[TaskIndex],                        Phase:         phase,                        TaskNumber:    TaskIndex,                        NumOtherPhase: nOther,                    }                    isDone := call(workerName, RPCMethodName, args, nil)                    if isDone {                        go func() {                            registerChan <- workerName                            TaskDoneChan <- TaskIndex                        }()                        // set TaskTracker is done.                        //go func(){                    } else { // The worker may crashed.                        go func() {                            ToDoChan <- i                        }()                    }                    return                }(i, &wg)            }        }    }()    for i := 0; i < ntasks; i++ {        ToDoChan <- i    }    // In case of main exit before dispatcher and tracker start working.    time.Sleep(time.Duration(100) * time.Millisecond)    wg.Wait()    fmt.Printf("Schedule: %v done\n", phase)}

通过测试:

/var/tmp/824-1000/mr10741-master: Map/Reduce task completedPASSok      mapreduce    3.255s