共计 1949 个字符,预计需要花费 5 分钟才能阅读完成。
MapReduce Scheduling
Scheduler
并行
Map
任务splitting and sharding data
Map
任务相互独立
将数据从
Map
传输到Reduce
- 相同
key
的Map
输出会分配给同一个Reduce
任务 - 利用了
partition
函数,比如hash(key) % number_of_reducers
- 相同
并行
Reduce
任务Reduce
任务相互独立
实现存储
- 数据通常会有三个副本位于三个不同的服务器上
Map Input
: 来自分布式文件系统Map Output
:Map
节点的本地磁盘(本地文件系统)- 中间数据对外部用户不可见,也不必写到分布式文件系统上
Reduce Input
: 远程磁盘(本地文件系统)Reduce Output
: 分布式文件系统
理论上,Reduce
阶段只能在所有 Map
阶段结束之后启动 (未结束的Map
任务可能产生新的 key/value
对,对应该 key
的Reduce
任务需要等待 Map
完成)。这种两个阶段之间的隔离操作叫做barrier
。
事实上部分 Reduce
任务是可以提早开始的。MapReduce
中也是这样实现的。但是这种操作不利于我们理解 MapReduce
范式,所以我们先忽略这件事。
Barrier
不成立的原因之一,是在 Map
阶段和 Reduce
阶段之间存在 Shuffle
阶段。Shuffle
可以和 Map
并行执行。
PS. 推荐两篇文章《MapReduce: 详解 Shuffle 过程》《MapReduce 的 shuffle 过程详解(分片、分区、合并、归并)》,对这段 shuffle 的梳理实在是妙。大致解释一下:
Map 任务的结果不会立刻写入磁盘,而是写到一个叫 环形内存缓冲区 的地方(这个操作叫
spill
)。spill
的时候,会根据 key 进行分区(partition)
。缓冲区默认最大是100M
,当写入达到阈值(默认是80%
) 的时候,会启动一个线程将缓冲区文件写到磁盘临时文件。而这个线程会执行一个排序(sort)
和一个合并(combine)
操作。整个 spill 执行完之后,会对所有临时文件进行归并(merge)
。merge
时会继续进行sort
和combine
来减少最终输出大小。上面这段流程就是
map
端的shuffle
操作,里面的combine
是可选的,部分情况下其实执行的是reduce
。
所以,spill
时首先进行 partition
,然后partition
内sort
、combine
,最后写出到磁盘。而 combine
可以是 reduce
,所以Map
和Reduce
之间不存在Barrier
。
YARN
YARN = Yet Another Resource Negotiator
. YARN
是从Hadoop 2.x
开始引入的资源调度器。
YARN
将每个服务器看成一组 容器(container)
。Container = some CPU + some memory
。每个容器可以执行一个任务
如果服务器有 4 个 CPU 和 4GB 内存,而每个容器中有一个 CPU 和 1GB 的 RAM。那么这个服务器有 4 个容器,可以运行四个任务。
YARN 有三个主要部分:
Resource Manager
资源管理器RM
Resource Manager
是全局进程- 负责调度
Node Manager
节点管理器NM
Node Manager
在每个server
都有一个- 作为守护进程和运行特定服务器进程(比如,任务监控)
Application Master
应用管理AM
- 应用级别
per-application(job)
- 负责
container
与Resource Manager
、Node Manager
之间协商通信 - 与
Node Manager
通信,检测任务挂起和重新调度
- 应用级别
YARN
分配container
两台服务器 A、B:每个服务器有一个 Node Manager
在运行
两个任务 1、2:每个任务有一个Application Master
全局有一个 Resource Manager
在运行
Timeline:
sequence | environment | action |
---|---|---|
0 | 开始时,Job2(App2) 刚刚运行结束,Job1(App1) 即将启动 | N/A |
1 | Job1(App1) 即将启动 | Application Master1(AM_1) 通知 Resource Manager(RM) <App1 即将启动,需要分配一个container > |
2 | RM 收到 AM_1 的消息,但无可分配的container | RM 将 AM_1 消息放入队列挂起,随后 Node Manager B(NM_B) 向RM 发送消息 <container 空闲 > |
3 | RM 收到 NM_2 的消息 | RM 通知 AM_1 , node B 有空闲container |
4 | AM_1 收到 RM 消息 | AM_1 通知 NM_B 执行Job1 |
实际运行中,每个任务会申请多个 container,Resource Manager 会根据申请的顺序分配 container