乐趣区

关于spark:Spark的jobstage和task的机制论述

Spark 任务调度机制阐述

在生产环境下,Spark 集群的部署形式个别为 YARN-Cluster 模式。Driver 线程次要是初始化 SparkContext 对象,筹备运行所需的上下文,而后一方面放弃与 ApplicationMaster 的 RPC 连贯,通过 ApplicationMaster 申请资源,另一方面依据用户业务逻辑开始调度工作,将工作下发到已有的闲暇 Executor 上。
当 ResourceManager 向 ApplicationMaster 返回 Container 资源时,ApplicationMaster 就尝试在对应的 Container 上启动 Executor 过程,Executor 过程起来后,会向 Driver 反向注册,注册胜利后放弃与 Driver 的心跳,同时期待 Driver 散发工作,当散发的工作执行结束后,将工作状态上报给 Driver。

  1. Spark 任务调度概述

1.1 根底概念

当 Driver 起来后,Driver 则会依据用户程序逻辑筹备工作,并依据 Executor 资源状况逐渐散发工作。在具体论述任务调度前,首先阐明下 Spark 里的几个概念。一个 Spark 应用程序包含 Job、Stage 以及 Task 三个概念:

job:以 action 办法为界,一个 action 触发一个 job

stage:它是 job 的子集,以 RDD 宽依赖为界,遇到宽依赖即划分 stage

task:它是 stage 的子集,以分区数来掂量,分区数多少,task 就有多少

1.2 任务调度

spark 工作从发动到执行可用下图示意

1.3 Client—>ResourceManage

(1). Client 端通过 spark-submit + 参数 发动工作,即向 ResourceManage 提交 application,留神该 application 蕴含了一堆参数,如 Executor 数,Executor 内存,Driver 内存等;

(2). ResourceManage 须要先判断当初资源是否能满足该 application,如果满足,则响应该 application,如果不满足,报错;

(3). 如果资源满足,Client 端筹备 ApplicationMaster 的启动上下文,并交给 ResourceManage;

(4). 并且循环监控 application 的状态;

1.4 ResourceManage—>ApplicationMaster

(1). ResourceManage 找一个 worker 启动 ApplicationMaster;

(2). ApplicationMaster 向 ResourceManage 申请 Container;

(3). ResourceManage 收集可用资源,并通知 ApplicationMaster;

(4). ApplicationMaster 尝试在对应的 Container 上启动 Executor 过程;

1.5 ApplicationMaster-Driver

(1). 有了资源,ApplicationMaster 启动 Driver;

//Driver 线程次要是初始化 SparkContext 对象,筹备运行所需上下文,并放弃与 ApplicationMaster 的 RPC 连贯,通过 ApplicationMaster 申请资源

(2). Driver 启动胜利后,通知 ApplicationMaster;

1.6 Driver-Executor

(1). Executor 启动胜利后,反向注册到 Driver 上,并继续向 Driver 发送心跳;

(2). Driver 启动 task,分发给 Executor,并监控 task 状态;

(3). 当 Executor 工作执行结束后,将工作状态发送给 Driver;

spark 的外围就是资源申请和任务调度,次要通过 ApplicationMaster、Driver、Executor 来实现

spark 任务调度分为两层,一层是 stage 级的调度,一层是 task 级的调度

RDD 间的血缘关系,代表了计算的流程,形成了 有向无环图,即 DAG;

最初通过 action 触发 job 并调度执行;

DAGScheduler 负责 stage 级的调度,次要是将 DAG 切分成多个 stage,并将 stage 打包成 TaskSet 交给 TaskScheduler;

TaskScheduler 负责 task 级的调度,将 DAGScheduler 发过来的 TaskSet 依照指定的调度策略发送给 Executor;

SchedulerBackend 负责给 调度策略 提供可用资源,调度策略决定把 task 发送给哪个 Executor;【其中 SchedulerBackend 有多种实现,别离对接不同的资源管理零碎】

基于上述认知,再来看一张图

Driver 在启动过程中,除了初始化 SparkContext 外,也初始化了 DAGScheduler、TaskScheduler、SchedulerBackend 3 个调度对象,同时初始化了 HeartbeatReceiver 心跳接收器;

并且各个线程之间保留通信;

SchedulerBackend 向 ApplicationMaster 申请资源,并不间断地从 TaskScheduler 获取 task 并发送给 适合的 Executor;

HeartbeatReceiver 负责接管 Executor 心跳报文,监控 Executor 存活状态;

  1. Spark Stage 级调度

Spark 的任务调度是从 DAG 切割开始,次要是由 DAGScheduler 来实现。当遇到一个 Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交,下图是波及到 Job 提交的相干办法调用流程图。

1) Job 由最终的 RDD 和 Action 办法封装而成;

2) SparkContext 将 Job 交给 DAGScheduler 提交,它会依据 RDD 的血缘关系形成的 DAG 进行切分,将一个 Job 划分为若干 Stages,具体划分策略是,由最终的 RDD 一直通过依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之间被划分到同一个 Stage 中,能够进行 pipeline 式的计算。划分的 Stages 分两类,一类叫做 ResultStage,为 DAG 最上游的 Stage,由 Action 办法决定,另一类叫做 ShuffleMapStage,为上游 Stage 筹备数据,上面看一个简略的例子 WordCount。

Job 由 saveAsTextFile 触发,该 Job 由 RDD- 3 和 saveAsTextFile 办法组成,依据 RDD 之间的依赖关系从 RDD- 3 开始回溯搜寻,直到没有依赖的 RDD-0,在回溯搜寻过程中,RDD- 3 依赖 RDD-2,并且是宽依赖,所以在 RDD- 2 和 RDD- 3 之间划分 Stage,RDD- 3 被划到最初一个 Stage,即 ResultStage 中,RDD- 2 依赖 RDD-1,RDD- 1 依赖 RDD-0,这些依赖都是窄依赖,所以将 RDD-0、RDD- 1 和 RDD- 2 划分到同一个 Stage,造成 pipeline 操作,。即 ShuffleMapStage 中,理论执行的时候,数据记录会零打碎敲地执行 RDD- 0 到 RDD- 2 的转化。不难看出,其本质上是一个深度优先搜寻(Depth First Search)算法。

一个 Stage 是否被提交,须要判断它的父 Stage 是否执行,只有在父 Stage 执行结束能力提交以后 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage 提交时会将 Task 信息(分区信息以及办法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个 Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 失落或者 Task 因为 Fetch 失败才须要从新提交失败的 Stage 以调度运行失败的工作,其余类型的 Task 失败会在 TaskScheduler 的调度过程中重试。

相对来说 DAGScheduler 做的事件较为简单,仅仅是在 Stage 层面上划分 DAG,提交 Stage 并监控相干状态信息。TaskScheduler 则绝对较为简单,上面具体论述其细节。

  1. Spark Task 级调度

Spark Task 的调度是由 TaskScheduler 来实现,由前文可知,DAGScheduler 将 Stage 打包到交给 TaskScheTaskSetduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 退出到调度队列中,TaskSetManager 构造如下图所示。

TaskSetManager 负责监控治理同一个 Stage 中的 Tasks,TaskScheduler 就是以 TaskSetManager 为单元来调度工作。

后面也提到,TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道,接管 Executor 的注册信息,并保护 Executor 的状态,所以说 SchedulerBackend 是管“食粮”的,同时它在启动后会定期地去“询问”TaskScheduler 有没有工作要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler 在 SchedulerBackend“问”它的时候,会从调度队列中依照指定的调度策略抉择 TaskSetManager 去调度运行,大抵办法调用流程如下图所示:

上图中,将 TaskSetManager 退出 rootPool 调度池中之后,调用 SchedulerBackend 的 riviveOffers 办法给 driverEndpoint 发送 ReviveOffer 音讯;driverEndpoint 收到 ReviveOffer 音讯后调用 makeOffers 办法,过滤出沉闷状态的 Executor(这些 Executor 都是工作启动时反向注册到 Driver 的 Executor),而后将 Executor 封装成 WorkerOffer 对象;筹备好计算资源(WorkerOffer)后,taskScheduler 基于这些资源调用 resourceOffer 在 Executor 上调配 task。

3.1 调度策略

TaskScheduler 反对两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。在 TaskScheduler 初始化过程中会实例化 rootPool,示意树的根节点,是 Pool 类型。

(1) FIFO 调度策略

如果是采纳 FIFO 调度策略,则间接简略地将 TaskSetManager 依照先来先到的形式入队,出队时间接拿出最先进队的 TaskSetManager,其树结构如下图所示,TaskSetManager 保留在一个 FIFO 队列中。

(2) FAIR 调度策略

FAIR 调度策略的树结构如下图所示:

FAIR 模式中有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待调配的 TaskSetMagager。

在 FAIR 模式中,须要先对子 Pool 进行排序,再对子 Pool 外面的 TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质,因而应用雷同的排序算法。

排序过程的比拟是基于 Fair-share 来比拟的,每个要排序的对象蕴含三个属性: runningTasks 值(正在运行的 Task 数)、minShare 值、weight 值,比拟时会综合考量 runningTasks 值,minShare 值以及 weight 值。

留神,minShare、weight 的值均在偏心调度配置文件 fairscheduler.xml 中被指定,调度池在构建阶段会读取此文件的相干配置。

1) 如果 A 对象的 runningTasks 大于它的 minShare,B 对象的 runningTasks 小于它的 minShare,那么 B 排在 A 后面;(runningTasks 比 minShare 小的先执行)

2) 如果 A、B 对象的 runningTasks 都小于它们的 minShare,那么就比拟 runningTasks 与 minShare 的比值(minShare 使用率),谁小谁排后面;(minShare 使用率低的先执行)

3) 如果 A、B 对象的 runningTasks 都大于它们的 minShare,那么就比拟 runningTasks 与 weight 的比值(权重使用率),谁小谁排后面。(权重使用率低的先执行)

4) 如果上述比拟均相等,则比拟名字。

整体上来说就是通过 minShare 和 weight 这两个参数管制比拟过程,能够做到让 minShare 使用率和权重使用率少(理论运行 task 比例较少)的先运行。

FAIR 模式排序实现后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后顺次被取出并发送给 Executor 执行。

从调度队列中拿到 TaskSetManager 后,因为 TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理调度这些 Task,那么接下来的工作就是 TaskSetManager 依照肯定的规定一个个取出 Task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到 Executor 上执行。

3.2 本地化调度

DAGScheduler 切割 Job,划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的 tasks,submitStage 会调用 submitMissingTasks,submitMissingTasks 确定每个须要计算的 task 的 preferredLocations,通过调用 getPreferrdeLocations() 失去 partition 的优先地位,因为一个 partition 对应一个 Task,此 partition 的优先地位就是 task 的优先地位,对于要提交到 TaskScheduler 的 TaskSet 中的每一个 Task,该 task 优先地位与其对应的 partition 对应的优先地位统一。

从调度队列中拿到 TaskSetManager 后,那么接下来的工作就是 TaskSetManager 依照肯定的规定一个个取出 task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到 Executor 上执行。后面也提到,TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理调度这些 Task。

依据每个 Task 的优先地位,确定 Task 的 Locality 级别,Locality 一共有五种,优先级由高到低程序:

在调度执行时,Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动,当一个 task 以 X 本地性级别启动,然而该本地性级别对应的所有节点都没有闲暇资源而启动失败,此时并不会马上升高本地性级别启动而是在某个工夫长度内再次以 X 本地性级别来启动该 task,若超过限时工夫则降级启动,去尝试下一个本地性级别,顺次类推。

能够通过调大每个类别的最大容忍延迟时间,在期待阶段对应的 Executor 可能就会有相应的资源去执行此 task,这就在在肯定水平上提到了运行性能。

3.3 失败重试与黑名单机制

除了抉择适合的 Task 调度运行外,还须要监控 Task 的执行状态,后面也提到,与内部打交道的是 SchedulerBackend,Task 被提交到 Executor 启动执行后,Executor 会将执行状态上报给 SchedulerBackend,SchedulerBackend 则通知 TaskScheduler,TaskScheduler 找到该 Task 对应的 TaskSetManager,并告诉到该 TaskSetManager,这样 TaskSetManager 就晓得 Task 的失败与胜利状态,对于失败的 Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中,否则整个 Application 失败。

在记录 Task 失败次数过程中,会记录它上一次失败所在的 Executor Id 和 Host,这样下次再调度这个 Task 时,会应用黑名单机制,防止它被调度到上一次失败的节点上,起到肯定的容错作用。黑名单记录 Task 上一次失败所在的 Executor Id 和 Host,以及其对应的“拉黑”工夫,“拉黑”工夫是指这段时间内不要再往这个节点上调度这个 Task 了。

4. 总结

本图有助于了解 job,stage,task 工作的原理。Spark 通用运行流程图,体现了根本的 Spark 应用程序在部署中的根本提交流程。

流程依照如下的外围步骤进行工作的:

1) 工作提交后,都会先启动 Driver 程序;

2) 随后 Driver 向集群管理器注册应用程序;

3) 之后集群管理器依据此工作的配置文件调配 Executor 并启动;

4) Driver 开始执行 main 函数,Spark 查问为懒执行,当执行到 Action 算子时开始反向推算,依据宽依赖进行 Stage 的划分,随后每一个 Stage 对应一个 Taskset,Taskset 中有多个 Task,查找可用资源 Executor 进行调度;

5) 依据本地化准则,Task 会被散发到指定的 Executor 去执行,在工作执行的过程中,Executor 也会一直与 Driver 进行通信,报告工作运行状况。

关键词:大数据培训

退出移动版