Spark任务调度机制阐述

在生产环境下,Spark集群的部署形式个别为YARN-Cluster模式。 Driver线程次要是初始化SparkContext对象,筹备运行所需的上下文,而后一方面放弃与ApplicationMaster的RPC连贯,通过ApplicationMaster申请资源,另一方面依据用户业务逻辑开始调度工作,将工作下发到已有的闲暇Executor上。
当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor过程,Executor过程起来后,会向Driver反向注册,注册胜利后放弃与Driver的心跳,同时期待Driver散发工作,当散发的工作执行结束后,将工作状态上报给Driver。

  1. Spark任务调度概述

1.1 根底概念

当Driver起来后,Driver则会依据用户程序逻辑筹备工作,并依据Executor资源状况逐渐散发工作。在具体论述任务调度前,首先阐明下Spark里的几个概念。一个Spark应用程序包含Job、Stage以及Task三个概念:

job:以 action 办法为界,一个 action 触发一个 job

stage:它是 job 的子集,以 RDD 宽依赖为界,遇到宽依赖即划分 stage

task:它是 stage 的子集,以分区数来掂量,分区数多少,task 就有多少

1.2 任务调度

spark 工作从发动到执行可用下图示意

1.3 Client—>ResourceManage

(1). Client 端通过 spark-submit + 参数 发动工作,即向ResourceManage 提交 application,留神该 application 蕴含了一堆参数,如 Executor 数,Executor 内存,Driver 内存等;

(2). ResourceManage 须要先判断当初资源是否能满足该 application,如果满足,则响应该 application,如果不满足,报错;

(3). 如果资源满足,Client 端筹备 ApplicationMaster 的启动上下文,并交给 ResourceManage;

(4). 并且循环监控 application 的状态;

1.4 ResourceManage—>ApplicationMaster

(1). ResourceManage 找一个 worker 启动 ApplicationMaster;

(2). ApplicationMaster 向 ResourceManage 申请 Container;

(3). ResourceManage 收集可用资源,并通知 ApplicationMaster;

(4). ApplicationMaster 尝试在对应的 Container 上启动 Executor 过程;

1.5 ApplicationMaster-Driver

(1). 有了资源,ApplicationMaster 启动 Driver;

//Driver 线程次要是初始化 SparkContext 对象,筹备运行所需上下文,并放弃与 ApplicationMaster 的 RPC 连贯,通过 ApplicationMaster 申请资源

(2). Driver 启动胜利后,通知 ApplicationMaster;

1.6 Driver-Executor

(1). Executor 启动胜利后,反向注册到 Driver 上,并继续向 Driver 发送心跳;

(2). Driver 启动 task,分发给 Executor,并监控 task 状态;

(3). 当 Executor 工作执行结束后,将工作状态发送给 Driver;

spark 的外围就是资源申请和任务调度,次要通过 ApplicationMaster、Driver、Executor 来实现

spark 任务调度分为两层,一层是 stage 级的调度,一层是 task 级的调度

RDD 间的血缘关系,代表了计算的流程,形成了 有向无环图,即 DAG;

最初通过 action 触发 job 并调度执行;

DAGScheduler 负责 stage 级的调度,次要是将 DAG 切分成多个 stage,并将 stage 打包成 TaskSet 交给 TaskScheduler;

TaskScheduler 负责 task 级的调度,将 DAGScheduler 发过来的 TaskSet 依照指定的调度策略发送给 Executor;

SchedulerBackend 负责给 调度策略 提供可用资源,调度策略决定把 task 发送给哪个 Executor;【其中 SchedulerBackend 有多种实现,别离对接不同的资源管理零碎】

基于上述认知,再来看一张图

Driver 在启动过程中,除了初始化 SparkContext 外,也初始化了 DAGScheduler、TaskScheduler、 SchedulerBackend 3个调度对象,同时初始化了 HeartbeatReceiver 心跳接收器;

并且各个线程之间保留通信;

SchedulerBackend 向 ApplicationMaster 申请资源,并不间断地从 TaskScheduler 获取 task 并发送给 适合的 Executor;

HeartbeatReceiver 负责接管 Executor 心跳报文,监控 Executor 存活状态;

  1. Spark Stage级调度

Spark的任务调度是从DAG切割开始,次要是由DAGScheduler来实现。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是波及到Job提交的相干办法调用流程图。

1) Job由最终的RDD和Action办法封装而成;

2) SparkContext将Job交给DAGScheduler提交,它会依据RDD的血缘关系形成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD一直通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,能够进行pipeline式的计算。划分的Stages分两类,一类叫做ResultStage,为DAG最上游的Stage,由Action办法决定,另一类叫做ShuffleMapStage,为上游Stage筹备数据,上面看一个简略的例子WordCount。

Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile办法组成,依据RDD之间的依赖关系从RDD-3开始回溯搜寻,直到没有依赖的RDD-0,在回溯搜寻过程中,RDD-3依赖RDD-2,并且是宽依赖,所以在RDD-2和RDD-3之间划分Stage,RDD-3被划到最初一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,所以将RDD-0、RDD-1和RDD-2划分到同一个Stage,造成pipeline操作,。即ShuffleMapStage中,理论执行的时候,数据记录会零打碎敲地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜寻(Depth First Search)算法。

一个Stage是否被提交,须要判断它的父Stage是否执行,只有在父Stage执行结束能力提交以后Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及办法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor失落或者Task因为Fetch失败才须要从新提交失败的Stage以调度运行失败的工作,其余类型的Task失败会在TaskScheduler的调度过程中重试。

相对来说DAGScheduler做的事件较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相干状态信息。TaskScheduler则绝对较为简单,上面具体论述其细节。

  1. Spark Task级调度

Spark Task的调度是由TaskScheduler来实现,由前文可知,DAGScheduler将Stage打包到交给TaskScheTaskSetduler,TaskScheduler会将TaskSet封装为TaskSetManager退出到调度队列中,TaskSetManager构造如下图所示。

TaskSetManager负责监控治理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度工作。

后面也提到,TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接管Executor的注册信息,并保护Executor的状态,所以说SchedulerBackend是管“食粮”的,同时它在启动后会定期地去“询问”TaskScheduler有没有工作要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中依照指定的调度策略抉择TaskSetManager去调度运行,大抵办法调用流程如下图所示:

上图中,将TaskSetManager退出rootPool调度池中之后,调用SchedulerBackend的riviveOffers办法给driverEndpoint发送ReviveOffer音讯;driverEndpoint收到ReviveOffer音讯后调用makeOffers办法,过滤出沉闷状态的Executor(这些Executor都是工作启动时反向注册到Driver的Executor),而后将Executor封装成WorkerOffer对象;筹备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上调配task。

3.1 调度策略

TaskScheduler反对两种调度策略,一种是FIFO,也是默认的调度策略,另一种是FAIR。在TaskScheduler初始化过程中会实例化rootPool,示意树的根节点,是Pool类型。

(1) FIFO调度策略

如果是采纳FIFO调度策略,则间接简略地将TaskSetManager依照先来先到的形式入队,出队时间接拿出最先进队的TaskSetManager,其树结构如下图所示,TaskSetManager保留在一个FIFO队列中。

(2) FAIR调度策略

FAIR调度策略的树结构如下图所示:

FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待调配的TaskSetMagager。

在FAIR模式中,须要先对子Pool进行排序,再对子Pool外面的TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,因而应用雷同的排序算法。

排序过程的比拟是基于Fair-share来比拟的,每个要排序的对象蕴含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比拟时会综合考量runningTasks值,minShare值以及weight值。

留神,minShare、weight的值均在偏心调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相干配置。

1) 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A后面;(runningTasks比minShare小的先执行)

2) 如果A、B对象的runningTasks都小于它们的minShare,那么就比拟runningTasks与minShare的比值(minShare使用率),谁小谁排后面;(minShare使用率低的先执行)

3) 如果A、B对象的runningTasks都大于它们的minShare,那么就比拟runningTasks与weight的比值(权重使用率),谁小谁排后面。(权重使用率低的先执行)

4) 如果上述比拟均相等,则比拟名字。

整体上来说就是通过minShare和weight这两个参数管制比拟过程,能够做到让minShare使用率和权重使用率少(理论运行task比例较少)的先运行。

FAIR模式排序实现后,所有的TaskSetManager被放入一个ArrayBuffer里,之后顺次被取出并发送给Executor执行。

从调度队列中拿到TaskSetManager后,因为TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager依照肯定的规定一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。

3.2 本地化调度

DAGScheduler切割Job,划分Stage, 通过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks 确定每个须要计算的 task 的preferredLocations,通过调用getPreferrdeLocations()失去partition 的优先地位,因为一个partition对应一个Task,此partition的优先地位就是task的优先地位,对于要提交到TaskScheduler的TaskSet中的每一个Task,该task优先地位与其对应的partition对应的优先地位统一。

从调度队列中拿到TaskSetManager后,那么接下来的工作就是TaskSetManager依照肯定的规定一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。后面也提到,TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task。

依据每个Task的优先地位,确定Task的Locality级别,Locality一共有五种,优先级由高到低程序:

在调度执行时,Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,然而该本地性级别对应的所有节点都没有闲暇资源而启动失败,此时并不会马上升高本地性级别启动而是在某个工夫长度内再次以X本地性级别来启动该task,若超过限时工夫则降级启动,去尝试下一个本地性级别,顺次类推。

能够通过调大每个类别的最大容忍延迟时间,在期待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在肯定水平上提到了运行性能。

3.3 失败重试与黑名单机制

除了抉择适合的Task调度运行外,还须要监控Task的执行状态,后面也提到,与内部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则通知TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并告诉到该TaskSetManager,这样TaskSetManager就晓得Task的失败与胜利状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会应用黑名单机制,防止它被调度到上一次失败的节点上,起到肯定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”工夫,“拉黑”工夫是指这段时间内不要再往这个节点上调度这个Task了。

4. 总结

本图有助于了解job,stage,task工作的原理。Spark通用运行流程图,体现了根本的Spark应用程序在部署中的根本提交流程。

流程依照如下的外围步骤进行工作的:

1) 工作提交后,都会先启动Driver程序;

2) 随后Driver向集群管理器注册应用程序;

3) 之后集群管理器依据此工作的配置文件调配Executor并启动;

4) Driver开始执行main函数,Spark查问为懒执行,当执行到Action算子时开始反向推算,依据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,Taskset中有多个Task,查找可用资源Executor进行调度;

5) 依据本地化准则,Task会被散发到指定的Executor去执行,在工作执行的过程中,Executor也会一直与Driver进行通信,报告工作运行状况。

关键词:大数据培训