MapReduce与Yarn-的详细工作流程分析

45次阅读

共计 2960 个字符,预计需要花费 8 分钟才能阅读完成。

MapReduce 详细工作流程之 Map 阶段

如上图所示

  1. 首先有一个 200M 的待处理文件
  2. 切片:在客户端提交之前,根据参数配置,进行任务规划,将文件按 128M 每块进行切片
  3. 提交:提交可以提交到本地工作环境或者 Yarn 工作环境,本地只需要提交切片信息和 xml 配置文件,Yarn 环境还需要提交 jar 包;本地环境一般只作为测试用
  4. 提交时会将每个任务封装为一个 job 交给 Yarn 来处理(详细见后边的 Yarn 工作流程介绍),计算出 MapTask 数量(等于切片数量),每个 MapTask 并行执行
  5. MapTask 中执行 Mapper 的 map 方法,此方法需要 k 和 v 作为输入参数,所以会首先获取 kv 值;

    • 首先调用 InputFormat 方法,默认为 TextInputFormat 方法,在此方法调用 createRecoderReader 方法,将每个块文件封装为 k,v 键值对,传递给 map 方法
  6. map 方法首先进行一系列的逻辑操作,执行完成后最后进行写操作
  7. map 方法如果直接写给 reduce 的话,相当于直接操作磁盘,太多的 IO 操作,使得效率太低,所以在 map 和 reduce 中间还有一个 shuffle 操作

    • map 处理完成相关的逻辑操作之后,首先通过 outputCollector 向环形缓冲区写入数据,环形缓冲区主要两部分,一部分写入文件的元数据信息,另一部分写入文件的真实内容
    • 环形缓冲区的默认大小是 100M,当缓冲的容量达到默认大小的 80% 时,进行 反向 溢写
  8. 在溢写之前会将缓冲区的数据按照指定的分区规则进行分区和排序,之所以反向溢写是因为这样就可以边接收数据边往磁盘溢写数据
  9. 在分区和排序之后,溢写到磁盘,可能发生多次溢写,溢写到多个文件
  10. 对所有溢写到磁盘的文件进行归并排序
  11. 在 9 到 10 步之间还可以有一个 Combine 合并操作,意义是对每个 MapTask 的输出进行局部汇总,以减少网络传输量

    • Map 阶段的进程数比 Reduce 阶段要多,所以放在 Map 阶段处理效率更高
    • Map 阶段合并之后,传递给 Reduce 的数据就会少很多
    • 但是 Combiner 能够应用的前提是不能影响最终的业务逻辑,而且 Combiner 的输出 kv 要和 Reduce 的输入 kv 类型对应起来

整个 MapTask 分为 Read 阶段,Map 阶段,Collect 阶段,溢写(spill)阶段和 combine 阶段

  • Read 阶段:MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value
  • Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value
  • Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中
  • Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个 临时文件 。需要 注意 的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作

MapReduce 详细工作流程之 Reduce 阶段

如上图所示

  1. 所有的 MapTask 任务完成后,启动相应数量的 ReduceTask(和分区数量相同),并告知 ReduceTask 处理数据的范围
  2. ReduceTask 会将 MapTask 处理完的数据拷贝一份到磁盘中,并合并文件和归并排序
  3. 最后将数据传给 reduce 进行处理,一次读取一组数据
  4. 最后通过 OutputFormat 输出

整个 ReduceTask 分为 Copy 阶段,Merge 阶段,Sort 阶段(Merge 和 Sort 可以合并为一个),Reduce 阶段。

  • Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
  • Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多
  • Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可
  • Reduce 阶段:reduce()函数将计算结果写到 HDFS 上

Shuffle 机制

Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。shuffle 流程详解如下:

  1. MapTask 收集 map()方法输出的 kv 对,放到环形缓冲区中
  2. 从环形缓冲区不断溢出到本地磁盘文件,可能会溢出多个文件
  3. 多个溢出文件会被合并成大的溢出文件
  4. 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
  5. ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
  6. ReduceTask 将取到的来自同一个分区不同 MapTask 的结果文件进行归并排序
  7. 合并成大文件后,shuffle 过程也就结束了,进入 reduce 方法

Yarn 工作机制

job 提交全过程

  1. MR 程序提交到客户端所在的节点,YarnRunner 向 ResourceManager 申请一个 Application
  2. RM 将该 Application 的资源路径和作业 id 返回给 YarnRunner
  3. YarnRunner 将运行 job 所需资源提交到 HDFS 上
  4. 程序资源提交完毕后,申请运行 mrAppMaster
  5. RM 将用户的请求初始化成一个 Task
  6. 其中一个 NodeManager 领取到 Task 任务
  7. 该 NodeManager 创建容器 Container,并产生 MRAppmaster
  8. Container 从 HDFS 上拷贝资源到本地
  9. MRAppmaster 向 RM 申请运行 MapTask 资源
  10. RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器
  11. MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序
  12. MrAppMaster 等待所有 MapTask 运行完毕后,向 RM 申请容器,运行 ReduceTask
  13. ReduceTask 向 MapTask 获取相应分区的数据
  14. 程序运行完毕后,MR 会向 RM 申请注销自己

进度和状态更新:

YARN 中的任务将其进度和状态 (包括 counter) 返回给应用管理器, 客户端每秒 (通过 mapreduce.client.progressmonitor.pollinterval 设置) 向应用管理器请求进度更新, 展示给用户

作业完成:

除了向应用管理器请求作业进度外, 客户端每 5 秒都会通过调用 waitForCompletion()来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后, 应用管理器和 Container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查

正文完
 0