乐趣区

MIT6824Lab1P1

Part I: Map/Reduce input and output

添加代码:

  • common_map.go: doMap()
  • common_reduce.go: doReduce()

完成后测试:

cd "<project_position>/src/mapreduce"
go test -run Sequential

我们需要事先掌握如下:

  • 保存读取 json 文件
  • 文件输入输出.
  • defer 语句, 用于释放资源.

common_map.go: doMap()

要干什么?

  1. doMap()管理一个 Map Task.
  2. 调用 mapF()将输入文件的内容变成一个[]KeyValue.
  3. 用划分函数将 mapF()的输出划分到 nReduce 个中间文件.
  4. 每个中间文件命由 <JobName>-<mapTask>-<reduceTask> 三部分组成.

代码构思:

  1. 创建 nReduce 个 json 文件, 将其句柄放入 jsonArr 这个数组中, 其类型为[][]KeyValue.
  2. 读取文件将其内容转为 String, 输入 mapF()得到输出 kvList.
  3. 对于 kvList 中每个 kv, 用 index = ihash(kv.Key) % nReduce 来将其加入数组 jsonArr[index] 中.
  4. jsonArr 每一个数组 jsonArr[i] 保存到文件 jobName-mapTask-i 中(调用reduceName(..)).
  5. 中间文件的格式由自己设计, 只要 doReduce() 能够正常 decode 就好.

实现:

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,) {contentsByte, err := ioutil.ReadFile(inFile)
    if err != nil {log.Fatal(err)
    }

    contents := string(contentsByte)

    kvList := mapF(inFile, contents)

    jsonArr := make([][]KeyValue, nReduce)

    for _, kv := range kvList {index := ihash(kv.Key) % nReduce // Key or Value ?
        jsonArr[index] = append(jsonArr[index], kv)

    }

    // Save as json file.
    for i := 0; i < nReduce; i++ {filename := reduceName(jobName, mapTask, i)

        file, err := os.Create(filename)
        if err != nil {log.Fatal(err)
        }

        enc := json.NewEncoder(file)
        ok := enc.Encode(&jsonArr[i]) 
        if ok != nil {log.Fatal(ok)
        }
        file.Close()}
}

common_reduce.go: doReduce()

要干什么?

  1. doReduce()管理一个 reduce 任务r.
  2. 从每一个 MapTask 的 ReduceTask 编号为 r 的中间文件中 (共nMap 个), 读取 kv 对, 转为 String.
  3. 将 kv 对按照 key 排序.
  4. 对每个不同的 key 调用reduceF(), 将输出结果写入磁盘

调用 sort 包来排序, 教程.

实现

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {table := make(map[string][]string)

    // Read nMap immediate files.
    for i := 0; i < nMap; i++ {file, err := os.Open(reduceName(jobName, i, reduceTask))
        if err != nil {log.Fatal(err)
        }

        enc := json.NewDecoder(file)
        var tmp []KeyValue
        decErr := enc.Decode(&tmp)
        if decErr != nil {log.Fatal(decErr)
        }

        for _, kv := range tmp {table[kv.Key] = append(table[kv.Key], kv.Value)
        }

        file.Close()}

    // Sort them by Key
    sortedKeyArr := make([]string, 0)
    for k := range table {sortedKeyArr = append(sortedKeyArr, k)

    }
    sort.Strings(sortedKeyArr)

    outputFile, err := os.Create(outFile)

    if err != nil {log.Fatal(err)

    }
    defer outputFile.Close()

    outputJSON := json.NewEncoder(outputFile)
    for i := 0; i < len(sortedKeyArr); i++ {key := sortedKeyArr[i]
        val := table[key]
        kv := KeyValue{key, reduceF(key, val)}
        encErr := outputJSON.Encode(kv)
        if encErr != nil {log.Fatal(encErr)
        }
    }
}

可通过测试.

退出移动版