后面曾经讲了每个action算子,都会进行stage切分,而后把每个stage依据分区创立TaskSet,依据资源运行Task,那这个stage就运行完结了。
那stage怎么晓得本人各个分区曾经运行完结了,stage与stage间接数据是怎么传递的,每个具备依赖的stage是怎么执行的,是本章的重点。
这里用之前的例子持续解说。
MapOutputTrackerMaster
MapOutputTracker用于跟踪map工作的输入状态,在Drvier中的MapOutputTracker叫做MapOutputTrackerMaster,在Worker中的MapOutputTracker叫做MapOutputTrackerWorker。
在stage进行切分的时候,每个ShuffleMapStage都会把他的shuffleId以及本人的分区数量,注册到MapOutputTrackerMaster。MapOutputTrackerMaster中保护着一个map,这个map的key就是shuffleId,他的value是ShuffleStatus。所以咱们之前的例子,就会有0到3的shuffleId注册到MapOutputTrackerMaster的map中。
每个ShuffleStatus依据分区数,都会生成等同长度的类型为MapStatus数组mapStatuses,MapStatus示意ShuffleMapTask返回给TaskScheduler的执行后果。因为当初还没运行,所以数组mapStatuses是空的。这里只画了ShuffleStatus0,其余的ShuffleStatus也是一样的构造。
waitingStages
这里补充一下TaskSet提交对于waitingStages的局部,因为waitingStages的作用是在这个篇幅中。
stage切分完结后,开始提交stage。此时stage4发现他有父类stage0和stage3,于是就先提交父类。
stage0并没有其余父类,于是他就提交给TaskSchedulerImpl。
stage3有两个父类stage1和stage2,于是就先提交父类。
stage1并没有其余父类,于是他就提交给TaskSchedulerImpl,这个时候,stage0和stage1都曾经提交了。
stage2并没有其余父类,于是他就提交给TaskSchedulerImpl,这个时候,stage0和stage1、stage2都曾经提交了。
stage3的两个父类stage1和stage2都解决了,于是他就退出到退出到waitingStages。
stage4的两个父类stage0和stage3都解决了,于是他就退出到退出到waitingStages。waitingStages里的stage并没有理论提交,所以此时是stage0和stage1、stage2开始运行,stage3、stage4期待运行。
执行流程
Executor执行完task后,通过DriverEndpointRfe把发消息给Drvier的DriverEndpointRef,告知以后的task运行完结。
Drvier收到音讯后,发现状态是FINISHED,依据shuffleId和分区,更新MapOutputTrackerMaster的shuffleStatuses里的mapStatuses数组。咱们假如是分区1实现了,此时mapStatuses[1]就有后果数据了,而后_numAvailableOutputs就加1。
如果_numAvailableOutputs和分区数量numPartitions相等,即都等于4,阐明这个stage就实现了。如果执行失败,就会把还没有实现的分区从新提交。
当这个stage0胜利后,发现waitingStages中有本人的子stage4,就把stage4从waitingStages拿进去,开始尝试提交,后果他发现stage4另外一个父stage3还没执行,于是就持续退出到waitingStages。
持续下面的流程,当stage1和stage2都执行完后,就会从waitingStages中拿出stage3继续执行,当stage3执行完后,stage4的两个父类都执行完了,所以stage4此时才开始执行。
从这里能够看出,具备依赖关系的会等依赖的先执行,是串行的,如果没有依赖关系的,会并发执行,这个是并行的。
MapOutputTrackerWorker
下面的stage3依赖着stage1和stage2,那stage3开始执行的时候,是须要stage1和stage2的执行后果的。
Executor的MapOutputTrackerWorker保护着map,key是shuffleId,value是MapStatus数组,对应着下面的mapStatuses。
假如stage3拿stage1的时候,发现他并没有存储stage1的数据,于是就持有MapOutputTrackerMasterEndpoint援用的trackerEndpoint给MapOutputTrackerMasterEndpoint发送GetMapOutputStatuses音讯。
MapOutputTrackerMasterEndpoint收到音讯后,就把音讯发送给MapOutputTrackerMaster的阻塞队列mapOutputRequests。MapOutputTrackerMaster里有一个线程池,保护着线程来解决这个阻塞队列。当线程发送阻塞队列有音讯后,就会开始解决这个音讯,并把后果返回给申请方。
Executor拿到数据后,就开始对数据进行计算。