Part I: Map/Reduce input and output

添加代码:

  • common_map.go: doMap()
  • common_reduce.go: doReduce()

完成后测试:

cd "<project_position>/src/mapreduce"go test -run Sequential

我们需要事先掌握如下:

  • 保存读取json文件
  • 文件输入输出.
  • defer语句, 用于释放资源.

common_map.go: doMap()

要干什么?

  1. doMap()管理一个Map Task.
  2. 调用mapF()将输入文件的内容变成一个[]KeyValue.
  3. 用划分函数将mapF()的输出划分到nReduce个中间文件.
  4. 每个中间文件命由<JobName>-<mapTask>-<reduceTask>三部分组成.

代码构思:

  1. 创建nReduce个json文件, 将其句柄放入jsonArr这个数组中, 其类型为[][]KeyValue.
  2. 读取文件将其内容转为String, 输入mapF()得到输出kvList.
  3. 对于kvList中每个kv, 用index = ihash(kv.Key) % nReduce来将其加入数组jsonArr[index]中.
  4. jsonArr每一个数组jsonArr[i]保存到文件jobName-mapTask-i中(调用reduceName(..)).
  5. 中间文件的格式由自己设计, 只要doReduce()能够正常decode就好.

实现:

func doMap(    jobName string, // the name of the MapReduce job    mapTask int, // which map task this is    inFile string,    nReduce int, // the number of reduce task that will be run ("R" in the paper)    mapF func(filename string, contents string) []KeyValue,) {    contentsByte, err := ioutil.ReadFile(inFile)    if err != nil {        log.Fatal(err)    }    contents := string(contentsByte)    kvList := mapF(inFile, contents)    jsonArr := make([][]KeyValue, nReduce)    for _, kv := range kvList {        index := ihash(kv.Key) % nReduce // Key or Value ?        jsonArr[index] = append(jsonArr[index], kv)    }    // Save as json file.    for i := 0; i < nReduce; i++ {        filename := reduceName(jobName, mapTask, i)        file, err := os.Create(filename)        if err != nil {            log.Fatal(err)        }        enc := json.NewEncoder(file)        ok := enc.Encode(&jsonArr[i])         if ok != nil {            log.Fatal(ok)        }        file.Close()    }}

common_reduce.go: doReduce()

要干什么?

  1. doReduce()管理一个reduce任务r.
  2. 从每一个MapTask的ReduceTask编号为r的中间文件中(共nMap个), 读取kv对, 转为String.
  3. 将kv对按照key排序.
  4. 对每个不同的key调用reduceF(), 将输出结果写入磁盘

调用sort包来排序, 教程.

实现

func doReduce(    jobName string, // the name of the whole MapReduce job    reduceTask int, // which reduce task this is    outFile string, // write the output here    nMap int, // the number of map tasks that were run ("M" in the paper)    reduceF func(key string, values []string) string,) {    table := make(map[string][]string)    // Read nMap immediate files.    for i := 0; i < nMap; i++ {        file, err := os.Open(reduceName(jobName, i, reduceTask))        if err != nil {            log.Fatal(err)        }        enc := json.NewDecoder(file)        var tmp []KeyValue        decErr := enc.Decode(&tmp)        if decErr != nil {            log.Fatal(decErr)        }        for _, kv := range tmp {            table[kv.Key] = append(table[kv.Key], kv.Value)        }        file.Close()    }    // Sort them by Key    sortedKeyArr := make([]string, 0)    for k := range table {        sortedKeyArr = append(sortedKeyArr, k)    }    sort.Strings(sortedKeyArr)    outputFile, err := os.Create(outFile)    if err != nil {        log.Fatal(err)    }    defer outputFile.Close()    outputJSON := json.NewEncoder(outputFile)    for i := 0; i < len(sortedKeyArr); i++ {        key := sortedKeyArr[i]        val := table[key]        kv := KeyValue{key, reduceF(key, val)}        encErr := outputJSON.Encode(kv)        if encErr != nil {            log.Fatal(encErr)        }    }}

可通过测试.