Spark简介
Apache Spark是用于大规模数据处理的对立剖析引擎,基于内存计算,进步了在大数据环境下数据处理的实时性,同时保障了高容错性和高可伸缩性,容许用户将Spark部署在大量硬件之上,造成集群。
Spark源码从1.x的40w行倒退到当初的超过100w行,有1400多位大牛奉献了代码。整个Spark框架源码是一个微小的工程。上面咱们一起来看下spark的底层执行原理。
Spark运行流程
具体运行流程如下:
- SparkContext 向资源管理器注册并向资源管理器申请运行Executor
- 资源管理器调配Executor,而后资源管理器启动Executor
- Executor 发送心跳至资源管理器
- SparkContext 构建DAG有向无环图
- 将DAG分解成Stage(TaskSet)
- 把Stage发送给TaskScheduler
- Executor 向 SparkContext 申请 Task
- TaskScheduler 将 Task 发送给 Executor 运行
- 同时 SparkContext 将利用程序代码发放给 Executor
- Task 在 Executor 上运行,运行结束开释所有资源
1. 从代码角度看DAG图的构建
Val lines1 = sc.textFile(inputPath1).map(...).map(...)Val lines2 = sc.textFile(inputPath2).map(...)Val lines3 = sc.textFile(inputPath3)Val dtinone1 = lines2.union(lines3)Val dtinone = lines1.join(dtinone1)dtinone.saveAsTextFile(...)dtinone.filter(...).foreach(...)
上述代码的DAG图如下所示:
Spark内核会在须要计算产生的时刻绘制一张对于计算门路的有向无环图,也就是如上图所示的DAG。
Spark 的计算产生在RDD的Action操作,而对Action之前的所有Transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。
2. 将DAG划分为Stage外围算法
一个Application能够有多个job多个Stage:
Spark Application中能够因为不同的Action触发泛滥的job,一个Application中能够有很多的job,每个job是由一个或者多个Stage形成的,前面的Stage依赖于后面的Stage,也就是说只有后面依赖的Stage计算结束后,前面的Stage才会运行。
划分根据:
Stage划分的根据就是宽依赖,像reduceByKey,groupByKey等算子,会导致宽依赖的产生。
回顾下宽窄依赖的划分准则:
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖。即一对一或者多对一的关系,可了解为独生子女。 常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned)等。
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(波及到shuffle)。即一对多的关系,可了解为超生。 常见的宽依赖有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned)等。
外围算法:回溯算法
从后往前回溯/反向解析,遇到窄依赖退出本Stage,遇见宽依赖进行Stage切分。
Spark内核会从触发Action操作的那个RDD开始从后往前推,首先会为最初一个RDD创立一个Stage,而后持续倒推,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创立一个新的Stage,那个RDD就是新的Stage的最初一个RDD。
而后顺次类推,持续倒推,依据窄依赖或者宽依赖进行Stage的划分,直到所有的RDD全副遍历实现为止。
3. 将DAG划分为Stage分析
一个Spark程序能够有多个DAG(有几个Action,就有几个DAG,上图最初只有一个Action(图中未体现),那么就是一个DAG)。
一个DAG能够有多个Stage(依据宽依赖/shuffle进行划分)。
同一个Stage能够有多个Task并行执行(task数=分区数,如上图,Stage1 中有三个分区P1、P2、P3,对应的也有三个 Task)。
能够看到这个DAG中只reduceByKey操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage。
同时咱们能够留神到,在图中Stage1中,从textFile到flatMap到map都是窄依赖,这几步操作能够造成一个流水线操作,通过flatMap操作生成的partition能够不必期待整个RDD计算完结,而是持续进行map操作,这样大大提高了计算的效率。
4. 提交Stages
调度阶段的提交,最终会被转换成一个工作集的提交,DAGScheduler通过TaskScheduler接口提交工作集,这个工作集最终会触发TaskScheduler构建一个TaskSetManager的实例来治理这个工作集的生命周期,对于DAGScheduler来说,提交调度阶段的工作到此就实现了。
而TaskScheduler的具体实现则会在失去计算资源的时候,进一步通过TaskSetManager调度具体的工作到对应的Executor节点上进行运算。
5. 监控Job、Task、Executor
- DAGScheduler监控Job与Task:
要保障相互依赖的作业调度阶段可能失去顺利的调度执行,DAGScheduler须要监控以后作业调度阶段乃至工作的实现状况。
这通过对外裸露一系列的回调函数来实现的,对于TaskScheduler来说,这些回调函数次要包含工作的开始完结失败、工作集的失败,DAGScheduler依据这些工作的生命周期信息进一步保护作业和调度阶段的状态信息。
- DAGScheduler监控Executor的生命状态:
TaskScheduler通过回调函数告诉DAGScheduler具体的Executor的生命状态,如果某一个Executor解体了,则对应的调度阶段工作集的ShuffleMapTask的输入后果也将标记为不可用,这将导致对应工作集状态的变更,进而从新执行相干计算工作,以获取失落的相干数据。
6. 获取工作执行后果
- 后果DAGScheduler:
一个具体的工作在Executor中执行结束后,其后果须要以某种模式返回给DAGScheduler,依据工作类型的不同,工作后果的返回形式也不同。
- 两种后果,两头后果与最终后果:
对于FinalStage所对应的工作,返回给DAGScheduler的是运算后果自身。
而对于两头调度阶段对应的工作ShuffleMapTask,返回给DAGScheduler的是一个MapStatus里的相干存储信息,而非后果自身,这些存储地位信息将作为下一个调度阶段的工作获取输出数据的根据。
- 两种类型,DirectTaskResult与IndirectTaskResult:
依据工作后果大小的不同,ResultTask返回的后果又分为两类:
如果后果足够小,则间接放在DirectTaskResult对象内中。
如果超过特定尺寸则在Executor端会将DirectTaskResult先序列化,再把序列化的后果作为一个数据块寄存在BlockManager中,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并通过BlockManager最终获得对应的DirectTaskResult。
7. 任务调度总体诠释
一张图阐明工作总体调度:
Spark运行架构特点
1. Executor过程专属
每个Application获取专属的Executor过程,该过程在Application期间始终驻留,并以多线程形式运行Tasks。
Spark Application不能跨应用程序共享数据,除非将数据写入到内部存储系统。如图所示:
2. 反对多种资源管理器
Spark与资源管理器无关,只有可能获取Executor过程,并能放弃互相通信就能够了。
Spark反对资源管理器蕴含: Standalone、On Mesos、On YARN、Or On EC2。如图所示:
3. Job提交就近准则
提交SparkContext的Client应该凑近Worker节点(运行Executor的节点),最好是在同一个Rack(机架)里,因为Spark Application运行过程中SparkContext和Executor之间有大量的信息替换;
如果想在近程集群中运行,最好应用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。
如图所示:
4. 挪动程序而非挪动数据的准则执行
挪动程序而非挪动数据的准则执行,Task采纳了数据本地性和揣测执行的优化机制。
要害办法:taskIdToLocations、getPreferedLocations。
如图所示: