Part I: Map/Reduce input and output
添加代码:
common_map.go: doMap()
common_reduce.go: doReduce()
完成后测试:
cd "<project_position>/src/mapreduce"go test -run Sequential
我们需要事先掌握如下:
- 保存读取json文件
- 文件输入输出.
- defer语句, 用于释放资源.
common_map.go: doMap()
要干什么?
- doMap()管理一个Map Task.
- 调用mapF()将输入文件的内容变成一个
[]KeyValue
. - 用划分函数将mapF()的输出划分到
nReduce
个中间文件. - 每个中间文件命由
<JobName>-<mapTask>-<reduceTask>
三部分组成.
代码构思:
- 创建
nReduce
个json文件, 将其句柄放入jsonArr
这个数组中, 其类型为[][]KeyValue
. - 读取文件将其内容转为String, 输入mapF()得到输出kvList.
- 对于kvList中每个kv, 用
index = ihash(kv.Key) % nReduce
来将其加入数组jsonArr[index]
中. - 把
jsonArr
每一个数组jsonArr[i]
保存到文件jobName-mapTask-i
中(调用reduceName(..)
). - 中间文件的格式由自己设计, 只要
doReduce()
能够正常decode就好.
实现:
func doMap( jobName string, // the name of the MapReduce job mapTask int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(filename string, contents string) []KeyValue,) { contentsByte, err := ioutil.ReadFile(inFile) if err != nil { log.Fatal(err) } contents := string(contentsByte) kvList := mapF(inFile, contents) jsonArr := make([][]KeyValue, nReduce) for _, kv := range kvList { index := ihash(kv.Key) % nReduce // Key or Value ? jsonArr[index] = append(jsonArr[index], kv) } // Save as json file. for i := 0; i < nReduce; i++ { filename := reduceName(jobName, mapTask, i) file, err := os.Create(filename) if err != nil { log.Fatal(err) } enc := json.NewEncoder(file) ok := enc.Encode(&jsonArr[i]) if ok != nil { log.Fatal(ok) } file.Close() }}
common_reduce.go: doReduce()
要干什么?
doReduce()
管理一个reduce任务r
.- 从每一个MapTask的
ReduceTask
编号为r
的中间文件中(共nMap
个), 读取kv对, 转为String. - 将kv对按照key排序.
- 对每个不同的key调用
reduceF()
, 将输出结果写入磁盘
调用sort
包来排序, 教程.
实现
func doReduce( jobName string, // the name of the whole MapReduce job reduceTask int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string,) { table := make(map[string][]string) // Read nMap immediate files. for i := 0; i < nMap; i++ { file, err := os.Open(reduceName(jobName, i, reduceTask)) if err != nil { log.Fatal(err) } enc := json.NewDecoder(file) var tmp []KeyValue decErr := enc.Decode(&tmp) if decErr != nil { log.Fatal(decErr) } for _, kv := range tmp { table[kv.Key] = append(table[kv.Key], kv.Value) } file.Close() } // Sort them by Key sortedKeyArr := make([]string, 0) for k := range table { sortedKeyArr = append(sortedKeyArr, k) } sort.Strings(sortedKeyArr) outputFile, err := os.Create(outFile) if err != nil { log.Fatal(err) } defer outputFile.Close() outputJSON := json.NewEncoder(outputFile) for i := 0; i < len(sortedKeyArr); i++ { key := sortedKeyArr[i] val := table[key] kv := KeyValue{key, reduceF(key, val)} encErr := outputJSON.Encode(kv) if encErr != nil { log.Fatal(encErr) } }}
可通过测试.