乐趣区

MIT6824Lab1P3

Part III: Distributing MapReduce tasks

要干什么?

前面的都是串行执行 MRTask, 这次我们要用多线程模拟分布式环境, 来进行分布式 MR.
具体的任务是: 完善schedule.go: schedule():

  1. 从 registerChan 读取已注册 worker, 它会返回一个包含 worker 的 rpc 地址的字符串.
  2. 给每一个 worker 分配一系列任务
  3. 等待所有 Task 完成后, 返回
  4. schedule()应该使用全部 worker, 有一些 worker 可能在 schedule() 执行时才启动.
  5. schedul()通过 Worker.DoTask() 让 worker 执行任务.

前置条件

因为设计到并发编程, 所以我们可能要用到:

  • goroutine
  • channel
  • go 的 RPC 库, 用来和 Worker 通信
  • sync.WaitGroup
  • Go 的 race detector.
  • select语句, 用来检查超时

我们还要了解如下文件:

  • mapreduce/common_rpc.go
  • mapreduce/master.go
  • mapreduce/worker.go

代码构思

为每个 worker 分配若干个 task

这是作者最初的思路:

  • 创建布尔数组追踪每个 job 是否完成
  • 对每个 worker, 用 goroutine 调用 call()来分配 Task
  • 对于每个 call(), 设定 timeout, 如果 timeout 内返回 true, 则标记该 Task 完成; 否则重新分配该 Task 给另一个 worker.
  • 如果所有 Task 完成, 则 break.
  • 注意: registerChan返回的是已注册的 worker 的 RPC 地址,

不等 y 于 空闲的 worker! 要自己管理这些 worker!

这里笔者把问题想复杂了, 导致代码一团糟, 且出现很多多线程 bug. 实验 P3 前提是分布式无差错环境,不用考虑容错。

为每个 task 分配一个 worker

参考了这篇博客。
一个重要的思路: 每个 worker 完成 task 后, 将其名字放入 registerChan, 日后再用.
一个小坑: 最后一个 goroutine 中把名字放入 chan, 这时没人来取它了,会导致阻塞。
通过把:

registerChan <- workerName // 阻塞, 会导致任务完成但 goroutine 阻塞不返回

改为:

// 最后一个 task 时会阻塞但是没问题, 主线程退出,它也就结束了。go func(){registerChan <- workerName}

代码:

RPCMethodName := "Worker.DoTask"
var wg sync.WaitGroup
// For each task, assign it to a worker.
// Not for each worker , assign many tasks to it.
for i := 0; i < ntasks; i++ {
    workerName := <-registerChan
    go func(TaskIndex int, waitG *sync.WaitGroup) {waitG.Add(1)
        defer waitG.Done()

        args := DoTaskArgs{
            JobName:       jobName,
            File:          mapFiles[TaskIndex],
            Phase:         phase,
            TaskNumber:    TaskIndex,
            NumOtherPhase: nOther,
        }
        call(workerName, RPCMethodName, args, nil)
        
        // For the last task, the goroutine will block. But it will be killed while main exiting.
        go func() {registerChan <- workerName}()

        return
    }(i, &wg)
}

wg.Wait()

可以通过测试.

退出移动版