学习的时候没有太过注意 Shuffle 这个概念,以至于还认为是不是漏掉了什么知识点,前面看了一些帖子才发现 Shuffle 原来是 map() 办法执行完结到 reduce()办法执行这么一大段过程 ….
小声 BB:本质上该过程蕴含许多环节,不晓得为啥就统称 Shuffle 了 ……
1. Mapper 的写出
图见意现 — 间接上图:
须要明确的点:
- 在环形缓冲区触发溢写时,进行的排序是 先依据每个键值对的分区进行排序,而后再依据 key 进行排序
- 每次写出实际上包含两个文件,一个是 meta 数据写出造成的 index 文件,另一个则是排序后的 data 文件
- 在溢写时产生一次排序,为疾速排序算法;在最初合并为一个整体文件时,因为各小文件已有序,间接归并排序即可
- Reducer 从 Mapper 节点的最终文件 file.out 读取数据,依据 index 文件只读取对应分区的数据
2. Mapper 的分区
分区的作用是将 Mapper 阶段输入的后果分为不同的区域,并传送给不同的 ReduceTask,因而 分区数和 Reduce 的并行度是互相决定的,即 Num Of Partitions = Num Of ReduceTask
设置 ReduceTask 数量:job.setNumReduceTasks(n)
分区形式:
- 默认形式:hash(key) % Num of ReduceTask — 键值哈希取模
- 自定义分区类:定义类继承 Partitioner 类,并重写 getPartition()办法
留神:
- 在没有设置 ReduceTask 数目的状况下,MR 默认只调配一个 ReduceTask,即不论自定义分区办法须要分几个区,最初的后果都只会进入这一个 Reducer,最终生成一个后果文件;
- 如果设置了 ReduceTask 数量大于 1,但该数量小于自定义的分区数,则运行 MR 程序会报错,因为会产生一些 Mapper 的后果数据没有 Reducer 来获取的状况
- 如果设置的分区数 < ReduceTask 的数量,那么程序会失常执行,最初也会生成和 ReduceTask 数相等的后果文件,但其中有 (ReduceTask 数 – 分区数) 个空文件,因为这部分的 Reducer 没有输出
3. Mapper 的排序
- 比较简单,溢写时疾速排序,合并时归并排序;排序规范为对 key 进行字典排序
- 可自定义排序 (在自定义 Bean 对象时应用),须要让自定义的 Bean 类实现WritableComparable 接口并重写 compareTo 办法,实际上和 Java 中的 Comparable 根本一样
4. Reducer 的读入
须要明确的点:
- 一个 ReduceTask 只会读取每个 Mapper 后果的同一个分区内的数据
- 拷贝到 Reducer 节点先是保留在内存中,如果内存不够就刷写到磁盘
- 所有数据拷贝实现后,对内存和磁盘 (如果产生了刷写) 进行 Merge && Sort,这里排序也是默认依据 Key 进行字典排序,目标是让数据中 key 雷同的 KV 对排列在一起,不便 reduce 办法的调用。【reduce 办法调用前会将所有 key 雷同的 value 封装为 iterator 迭代器对象,对于一个 key 调用一次 reduce 办法】
5. Reducer 的排序
Reduce 端的排序同样也是利用于应用了自定义 Bean 对象的场景,通过自定义的排序形式让 Reducer 把咱们指定的 keys 判断为同样的 key(在默认 reduce 排序中会被断定为非同 key)
留神!留神!留神!此处有坑!!!
- Reducer 端的自定义排序须要 自定义排序类,继承 WritableComparator 并重写 compare 办法,并在 Driver 中关联咱们自定义的排序类
- Mapper 端的自定排序须要在咱们定义 Bean 对象的时候 让 Bean 对象继承 WritableComparable 类并重写办法 CompareTo