后面曾经讲了每个 action 算子,都会进行 stage 切分,而后把每个 stage 依据分区创立 TaskSet,依据资源运行 Task,那这个 stage 就运行完结了。
那 stage 怎么晓得本人各个分区曾经运行完结了,stage 与 stage 间接数据是怎么传递的,每个具备依赖的 stage 是怎么执行的,是本章的重点。
这里用之前的例子持续解说。
MapOutputTrackerMaster
MapOutputTracker 用于跟踪 map 工作的输入状态,在 Drvier 中的 MapOutputTracker 叫做 MapOutputTrackerMaster,在 Worker 中的 MapOutputTracker 叫做 MapOutputTrackerWorker。
在 stage 进行切分的时候,每个 ShuffleMapStage 都会把他的 shuffleId 以及本人的分区数量,注册到 MapOutputTrackerMaster。MapOutputTrackerMaster 中保护着一个 map,这个 map 的 key 就是 shuffleId,他的 value 是 ShuffleStatus。所以咱们之前的例子,就会有 0 到 3 的 shuffleId 注册到 MapOutputTrackerMaster 的 map 中。
每个 ShuffleStatus 依据分区数,都会生成等同长度的类型为 MapStatus 数组 mapStatuses,MapStatus 示意 ShuffleMapTask 返回给 TaskScheduler 的执行后果。因为当初还没运行,所以数组 mapStatuses 是空的。这里只画了 ShuffleStatus0,其余的 ShuffleStatus 也是一样的构造。
waitingStages
这里补充一下 TaskSet 提交对于 waitingStages 的局部,因为 waitingStages 的作用是在这个篇幅中。
stage 切分完结后,开始提交 stage。此时 stage4 发现他有父类 stage0 和 stage3,于是就先提交父类。
stage0 并没有其余父类,于是他就提交给 TaskSchedulerImpl。
stage3 有两个父类 stage1 和 stage2,于是就先提交父类。
stage1 并没有其余父类,于是他就提交给 TaskSchedulerImpl,这个时候,stage0 和 stage1 都曾经提交了。
stage2 并没有其余父类,于是他就提交给 TaskSchedulerImpl,这个时候,stage0 和 stage1、stage2 都曾经提交了。
stage3 的两个父类 stage1 和 stage2 都解决了,于是他就退出到退出到 waitingStages。
stage4 的两个父类 stage0 和 stage3 都解决了,于是他就退出到退出到 waitingStages。waitingStages 里的 stage 并没有理论提交,所以此时是 stage0 和 stage1、stage2 开始运行,stage3、stage4 期待运行。
执行流程
Executor 执行完 task 后,通过 DriverEndpointRfe 把发消息给 Drvier 的 DriverEndpointRef,告知以后的 task 运行完结。
Drvier 收到音讯后,发现状态是 FINISHED,依据 shuffleId 和分区,更新 MapOutputTrackerMaster 的 shuffleStatuses 里的 mapStatuses 数组。咱们假如是分区 1 实现了,此时 mapStatuses[1] 就有后果数据了,而后_numAvailableOutputs 就加 1。
如果_numAvailableOutputs 和分区数量 numPartitions 相等,即都等于 4,阐明这个 stage 就实现了。如果执行失败,就会把还没有实现的分区从新提交。
当这个 stage0 胜利后,发现 waitingStages 中有本人的子 stage4,就把 stage4 从 waitingStages 拿进去,开始尝试提交,后果他发现 stage4 另外一个父 stage3 还没执行,于是就持续退出到 waitingStages。
持续下面的流程,当 stage1 和 stage2 都执行完后,就会从 waitingStages 中拿出 stage3 继续执行,当 stage3 执行完后,stage4 的两个父类都执行完了,所以 stage4 此时才开始执行。
从这里能够看出,具备依赖关系的会等依赖的先执行,是串行的,如果没有依赖关系的,会并发执行,这个是并行的。
MapOutputTrackerWorker
下面的 stage3 依赖着 stage1 和 stage2,那 stage3 开始执行的时候,是须要 stage1 和 stage2 的执行后果的。
Executor 的 MapOutputTrackerWorker 保护着 map,key 是 shuffleId,value 是 MapStatus 数组,对应着下面的 mapStatuses。
假如 stage3 拿 stage1 的时候,发现他并没有存储 stage1 的数据,于是就持有 MapOutputTrackerMasterEndpoint 援用的 trackerEndpoint 给 MapOutputTrackerMasterEndpoint 发送 GetMapOutputStatuses 音讯。
MapOutputTrackerMasterEndpoint 收到音讯后,就把音讯发送给 MapOutputTrackerMaster 的阻塞队列 mapOutputRequests。MapOutputTrackerMaster 里有一个线程池,保护着线程来解决这个阻塞队列。当线程发送阻塞队列有音讯后,就会开始解决这个音讯,并把后果返回给申请方。
Executor 拿到数据后,就开始对数据进行计算。