Part III: Distributing MapReduce tasks
要干什么?
前面的都是串行执行 MRTask, 这次我们要用多线程模拟分布式环境, 来进行分布式 MR.
具体的任务是: 完善schedule.go: schedule()
:
- 从 registerChan 读取已注册 worker, 它会返回一个包含 worker 的 rpc 地址的字符串.
- 给每一个 worker 分配一系列任务
- 等待所有 Task 完成后, 返回
- schedule()应该使用全部 worker, 有一些 worker 可能在 schedule() 执行时才启动.
- schedul()通过
Worker.DoTask()
让 worker 执行任务.
前置条件
因为设计到并发编程, 所以我们可能要用到:
goroutine
channel
- go 的 RPC 库, 用来和 Worker 通信
sync.WaitGroup
- Go 的 race detector.
-
select
语句, 用来检查超时
我们还要了解如下文件:
mapreduce/common_rpc.go
mapreduce/master.go
mapreduce/worker.go
代码构思
为每个 worker 分配若干个 task
这是作者最初的思路:
- 创建布尔数组追踪每个 job 是否完成
- 对每个 worker, 用 goroutine 调用 call()来分配 Task
- 对于每个 call(), 设定 timeout, 如果 timeout 内返回 true, 则标记该 Task 完成; 否则重新分配该 Task 给另一个 worker.
- 如果所有 Task 完成, 则 break.
- 注意:
registerChan
返回的是已注册的 worker 的 RPC 地址,
不等 y 于 空闲的 worker
! 要自己管理这些 worker!
这里笔者把问题想复杂了, 导致代码一团糟, 且出现很多多线程 bug. 实验 P3 前提是分布式无差错环境,不用考虑容错。
为每个 task 分配一个 worker
参考了这篇博客。
一个重要的思路: 每个 worker 完成 task 后, 将其名字放入 registerChan
, 日后再用.
一个小坑: 最后一个 goroutine 中把名字放入 chan, 这时没人来取它了,会导致阻塞。
通过把:
registerChan <- workerName // 阻塞, 会导致任务完成但 goroutine 阻塞不返回
改为:
// 最后一个 task 时会阻塞但是没问题, 主线程退出,它也就结束了。go func(){registerChan <- workerName}
代码:
RPCMethodName := "Worker.DoTask"
var wg sync.WaitGroup
// For each task, assign it to a worker.
// Not for each worker , assign many tasks to it.
for i := 0; i < ntasks; i++ {
workerName := <-registerChan
go func(TaskIndex int, waitG *sync.WaitGroup) {waitG.Add(1)
defer waitG.Done()
args := DoTaskArgs{
JobName: jobName,
File: mapFiles[TaskIndex],
Phase: phase,
TaskNumber: TaskIndex,
NumOtherPhase: nOther,
}
call(workerName, RPCMethodName, args, nil)
// For the last task, the goroutine will block. But it will be killed while main exiting.
go func() {registerChan <- workerName}()
return
}(i, &wg)
}
wg.Wait()
可以通过测试.