共计 2960 个字符,预计需要花费 8 分钟才能阅读完成。
MapReduce 详细工作流程之 Map 阶段
如上图所示
- 首先有一个 200M 的待处理文件
- 切片:在客户端提交之前,根据参数配置,进行任务规划,将文件按 128M 每块进行切片
- 提交:提交可以提交到本地工作环境或者 Yarn 工作环境,本地只需要提交切片信息和 xml 配置文件,Yarn 环境还需要提交 jar 包;本地环境一般只作为测试用
- 提交时会将每个任务封装为一个 job 交给 Yarn 来处理(详细见后边的 Yarn 工作流程介绍),计算出 MapTask 数量(等于切片数量),每个 MapTask 并行执行
MapTask 中执行 Mapper 的 map 方法,此方法需要 k 和 v 作为输入参数,所以会首先获取 kv 值;
- 首先调用 InputFormat 方法,默认为 TextInputFormat 方法,在此方法调用 createRecoderReader 方法,将每个块文件封装为 k,v 键值对,传递给 map 方法
- map 方法首先进行一系列的逻辑操作,执行完成后最后进行写操作
map 方法如果直接写给 reduce 的话,相当于直接操作磁盘,太多的 IO 操作,使得效率太低,所以在 map 和 reduce 中间还有一个 shuffle 操作
- map 处理完成相关的逻辑操作之后,首先通过 outputCollector 向环形缓冲区写入数据,环形缓冲区主要两部分,一部分写入文件的元数据信息,另一部分写入文件的真实内容
- 环形缓冲区的默认大小是 100M,当缓冲的容量达到默认大小的 80% 时,进行 反向 溢写
- 在溢写之前会将缓冲区的数据按照指定的分区规则进行分区和排序,之所以反向溢写是因为这样就可以边接收数据边往磁盘溢写数据
- 在分区和排序之后,溢写到磁盘,可能发生多次溢写,溢写到多个文件
- 对所有溢写到磁盘的文件进行归并排序
在 9 到 10 步之间还可以有一个 Combine 合并操作,意义是对每个 MapTask 的输出进行局部汇总,以减少网络传输量
- Map 阶段的进程数比 Reduce 阶段要多,所以放在 Map 阶段处理效率更高
- Map 阶段合并之后,传递给 Reduce 的数据就会少很多
- 但是 Combiner 能够应用的前提是不能影响最终的业务逻辑,而且 Combiner 的输出 kv 要和 Reduce 的输入 kv 类型对应起来
整个 MapTask 分为 Read 阶段,Map 阶段,Collect 阶段,溢写(spill)阶段和 combine 阶段
- Read 阶段:MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value
- Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value
- Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中
- Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个 临时文件 。需要 注意 的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作
MapReduce 详细工作流程之 Reduce 阶段
如上图所示
- 所有的 MapTask 任务完成后,启动相应数量的 ReduceTask(和分区数量相同),并告知 ReduceTask 处理数据的范围
- ReduceTask 会将 MapTask 处理完的数据拷贝一份到磁盘中,并合并文件和归并排序
- 最后将数据传给 reduce 进行处理,一次读取一组数据
- 最后通过 OutputFormat 输出
整个 ReduceTask 分为 Copy 阶段,Merge 阶段,Sort 阶段(Merge 和 Sort 可以合并为一个),Reduce 阶段。
- Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
- Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多
- Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可
- Reduce 阶段:reduce()函数将计算结果写到 HDFS 上
Shuffle 机制
Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。shuffle 流程详解如下:
- MapTask 收集 map()方法输出的 kv 对,放到环形缓冲区中
- 从环形缓冲区不断溢出到本地磁盘文件,可能会溢出多个文件
- 多个溢出文件会被合并成大的溢出文件
- 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
- ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
- ReduceTask 将取到的来自同一个分区不同 MapTask 的结果文件进行归并排序
- 合并成大文件后,shuffle 过程也就结束了,进入 reduce 方法
Yarn 工作机制
job 提交全过程
- MR 程序提交到客户端所在的节点,YarnRunner 向 ResourceManager 申请一个 Application
- RM 将该 Application 的资源路径和作业 id 返回给 YarnRunner
- YarnRunner 将运行 job 所需资源提交到 HDFS 上
- 程序资源提交完毕后,申请运行 mrAppMaster
- RM 将用户的请求初始化成一个 Task
- 其中一个 NodeManager 领取到 Task 任务
- 该 NodeManager 创建容器 Container,并产生 MRAppmaster
- Container 从 HDFS 上拷贝资源到本地
- MRAppmaster 向 RM 申请运行 MapTask 资源
- RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器
- MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序
- MrAppMaster 等待所有 MapTask 运行完毕后,向 RM 申请容器,运行 ReduceTask
- ReduceTask 向 MapTask 获取相应分区的数据
- 程序运行完毕后,MR 会向 RM 申请注销自己
进度和状态更新:
YARN 中的任务将其进度和状态 (包括 counter) 返回给应用管理器, 客户端每秒 (通过 mapreduce.client.progressmonitor.pollinterval 设置) 向应用管理器请求进度更新, 展示给用户
作业完成:
除了向应用管理器请求作业进度外, 客户端每 5 秒都会通过调用 waitForCompletion()来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后, 应用管理器和 Container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查