关于大数据:大数据开发技术之Spark-Job物理执行解析

12次阅读

共计 10115 个字符,预计需要花费 26 分钟才能阅读完成。

一个简单 job 逻辑执行图:

代码贴在本章最初。给定这样一个简单数据依赖图,如何正当划分 stage,并未确定 task 的类型和个数?
一个直观想法是将前后关联的 RDDs 组成一个 stage,大数据培训每个箭头生成一个 task。对于两个 RDD 聚合成一个 RDD 的状况,这三个 RDD 组成一个 stage。这样尽管能够解决问题,但显然效率不高。除了效率问题,这个想法还有一个更重大的问题:大量两头数据须要存储。对于 task 来说,其执行后果要么要存到磁盘,要么存到内存,或者两者皆有。如果每个箭头都是 task 的话,每个 RDD 外面的数据都须要存起来,占用空间可想而知。
仔细观察一下逻辑执行图会发现:在每个地位 RDD 中,每个 partition 是独立的,也就是说在 RDD 外部,每个 partition 数据依赖各自不会互相烦扰。因而,一个大胆的想法是将整个流程图看成一个 stage,为最初一个 finalRDD 中的每个 partition 调配一个 task。图示如下:

所有的粗箭头组合成第一个 task,该 task 计算完结后顺便将 CoGroupedRDD 曾经计算失去的第二个和第三个 partition 存起来。之后第二个 task(细实线)只需计算两步,第三个 task(细线)也只须要计算两步,最初失去后果。
这个想法有两个不靠谱的中央:
• 第一个 task 太大,碰到 ShuffleDependency 后,不得不计算 shuffle 依赖的 RDDs 的所有 partitions,而且都在这一个 task 外面计算。
• 须要设计奇妙的算法来判断哪个 RDD 中的哪些 partition 须要 cache。而且 cache 会占用存储空间。

尽管这是个不靠谱的想法,但有一个可取之处,即 pipeline 思维:数据用的时候再算,而且数据是流到要计算的地位的。比方在第一个 task 中,从 FlatMappedValuesRDD 中的 partition 向前推算,只计算要用的(依赖的)RDDs 及 partitions。在第二个 task 中,从 CoGroupedRDD 到 FlatMappedValuesRDD 计算过程中,不须要存储两头后果(MappedValuesRDD 中 partition 的全副数据)。
更进一步,从 record 粒度来讲,如下图中,第一个 pattern 中先算 g(f(record1)),而后原始的 record1 和 f(record1) 都能够丢掉,而后再算 g(f(record2)),丢掉两头后果,最初算 g(f(record3))。对于第二个 pattern 中的 g,record1 进入 g 前面,实践上能够丢掉(除非被手动 cache)。其余 pattern 同理。

回到 stage 和 task 的划分问题,下面不靠谱的想法的次要问题是碰到 ShuffleDependency 后无奈进行 pipeline。那么只有在 ShuffleDependency 处断开,就只剩 NarrowDependency,而 NarrowDependency chain 是能够进行 pipeline 的。依照此思维,下面 ComplexJob 的划分图如下:

所以划分算法就是:从后往前推算,遇到 ShuffleDependency 就断开,遇到 NarrowDependency 就将其退出该 stage。每个 stage 外面 task 的数目由该 stage 最初一个 RDD 中的 partition 个数决定。
粗箭头示意 task。因为是从后往前推算,因而最初一个 stage 的 id 是 0,stage 1 和 stage 2 都是 stage 0 的 parents。如果 stage 最初要产生 result,那么该 stage 外面的 task 都是 ResultTask,否则都是 ShuffleMapTask。之所以称为 ShuffleMapTask 是因为其计算结果须要 shuffle 到下一个 stage,实质上相当于 MapReduce 中的 mapper。ResultTask 相当于 MapReduce 中的 reducer(如果须要从 parent stage 那里 shuffle 数据),也相当于一般 mapper(如果该 stage 没有 parent stage)。
还有一个问题:算法中提到 NarrowDependency chain 能够 pipeline,可是这里的 ComplexJob 只展现了 OneToOneDependency 和 RangeDependency 的 pipeline,一般 NarrowDependency 如何 pipeline?
回忆上一章外面 cartesian(otherRDD) 外面简单的 NarrowDependency,图示如下:

通过算法划分后后果如下:

图中粗箭头展现了第一个 ResultTask,其余的 task 依此类推。因为该 stage 的 task 间接输入 result,所以这个图蕴含 6 个 ResultTasks。与 OneToOneDependency 不同的是这里每个人 ResultTask 须要计算 3 个 RDD,读取两个 data block,而整个读取和计算这三个 RDD 的过程在一个 task 外面实现。当计算 CartesianRDD 中的 partition 时,须要从两个 RDD 获取 records,因为都在一个 task 外面,不须要 shuffle。这个图阐明:不论是 1:1 还是 N:1 的 NarrowDependency,只有是 NarrowDependency chain,就能够进行 pipeline,生成的 task 个数与该 stage 最初一个 RDD 的 partition 个数雷同。
物理图的执行生成了 stage 和 task 当前,下一个问题就是 task 如何执行来生成最初的 result?
回到 ComplexJob 物理执行图,如果依照 MapReduce 的逻辑,从前到后执行,map() 产生两头数据 map outpus,通过 partition 后放到本地磁盘上。再通过 shuffle-sort-aggregate 后生成 reduce inputs,最初 reduce() 执行失去 result。执行流程如下:

整个执行流程没有问题,但不能间接套用在 Spark 的物理执行图上,因为 MapReduce 流程图简略、固定,而且没有 pipeline。
回忆 pipeline 的思维是 数据用的时候再算,而且数据是流到要计算的地位的。Result 产生的中央的就是要计算的地位,要确定“须要计算的数据”,咱们能够从后往前推,须要哪个 partition 就计算哪个 partition,如果 partition 外面没有数据,就持续向前推,造成 computing chain。这样推下去,后果就是:须要首先计算出每个 stage 最右边的 RDD 中的某些 partition。
对于没有 parent stage 的 stage,该 stage 最右边的 RDD 是能够立刻计算的,而且每计算出一个 record 后便能够流入 f 或 g(见后面图中的 patterns)。如果 f 中的 record 关系是 1:1 的,那么 f(record1) 计算结果能够立刻顺着 computing chain 流入 g 中。如果 f 的 record 关系是 N:1,record1 进入 f() 后也能够被回收。总结一下,computing chain 从后到前建设,而理论计算出的数据从前到后流动,而且计算出的第一个 record 流动到不能再流动后,再计算下一个 record。这样,尽管是要计算后续 RDD 的 partition 中的 records,但并不是要求以后 RDD 的 partition 中所有 records 计算失去后再整体向后流动。
对于有 parent stage 的 stage,先等着所有 parent stages 中 final RDD 中数据计算好,而后通过 shuffle 后,问题就又回到了计算“没有 parent stage 的 stage”。
代码实现:每个 RDD 蕴含的 getDependency() 负责确立 RDD 的数据依赖,compute() 办法负责接管 parent RDDs 或者 data block 流入的 records,进行计算,而后输入 record。常常能够在 RDD 中看到这样的代码 firstParent[T].iterator(split, context).map(f)。firstParent 示意该 RDD 依赖的第一个 parent RDD,iterator() 示意 parentRDD 中的 records 是一个一个流入该 RDD 的,map(f) 示意每流入一个 recod 就对其进行 f(record) 操作,输入 record。为了对立接口,这段 compute() 依然返回一个 iterator,来迭代 map(f) 输入的 records。

总结一下:整个 computing chain 依据数据依赖关系自后向前建设,遇到 ShuffleDependency 后造成 stage。在每个 stage 中,每个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records 一个个 fetch 过去。
如果要本人设计一个 RDD,那么须要留神的是 compute() 只负责定义 parent RDDs => output records 的计算逻辑,具体依赖哪些 parent RDDs 由 getDependency() 定义,具体依赖 parent RDD 中的哪些 partitions 由 dependency.getParents() 定义。
例如,在 CartesianRDD 中,
// RDD x = (RDD a).cartesian(RDD b)

// 定义 RDD x 应该蕴含多少个 partition,每个 partition 是什么类型

override def getPartitions: Array[Partition] = {

// create the cross product split

val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)

for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {

  val idx = s1.index * numPartitionsInRdd2 + s2.index

  array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)

}

array

}

// 定义 RDD x 中的每个 partition 怎么计算失去

override def compute(split: Partition, context: TaskContext) = {

val currSplit = split.asInstanceOf[CartesianPartition]

// s1 示意 RDD x 中的 partition 依赖 RDD a 中的 partitions(这里只依赖一个)// s2 示意 RDD x 中的 partition 依赖 RDD b 中的 partitions(这里只依赖一个)for (x <- rdd1.iterator(currSplit.s1, context);

     y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)

}

// 定义 RDD x 中的 partition i 依赖于哪些 RDD 中的哪些 partitions

//

// 这里 RDD x 依赖于 RDD a,同时依赖于 RDD b,都是 NarrowDependency

// 对于第一个依赖,RDD x 中的 partition i 依赖于 RDD a 中的

// 第 List(i / numPartitionsInRdd2) 个 partition

// 对于第二个依赖,RDD x 中的 partition i 依赖于 RDD b 中的

// 第 List(id % numPartitionsInRdd2) 个 partition

override def getDependencies: Seq[Dependency[_]] = List(

new NarrowDependency(rdd1) {def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)

},

new NarrowDependency(rdd2) {def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)

}

)

生成 Job 后面介绍了逻辑和物理执行图的生成原理,那么,怎么触发 job 的生成?曾经介绍了 task,那么 job 是什么?
下表列出了能够触发执行图生成的典型 action(),其中第二列是 processPartition(),定义如何计算 partition 中的 records 失去 result。第三列是 resultHandler(),定义如何对从各个 partition 收集来的 results 进行计算来失去最终后果。

Action

finalRDD(records)=>result

compute(results)

reduce(func)

(record1, record2) => result, (result, record i) => result

(result1, result 2) => result, (result, result i) => result

collect()

Array[records] => result

Array[result]

count()

count(records) => result

sum(result)

foreach(f)

f(records) => result

Array[result]

take(n)

record (i result

Array[result]

first()

record 1 => result

Array[result]

takeSample()

selected records => result

Array[result]

takeOrdered(n, [ordering])

TopN(records) => result

TopN(results)

saveAsHadoopFile(path)

records => write(records)

null

countByKey()

(K, V) => Map(K, count(K))

(Map, Map) => Map(K, count(K))

用户的 driver 程序中一旦呈现 action(),就会生成一个 job,比方 foreach() 会调用 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f)),向 DAGScheduler 提交 job。如果 driver 程序前面还有 action(),那么其余 action() 也会生成 job 提交。所以,driver 有多少个 action(),就会生成多少个 job。这就是 Spark 称 driver 程序为 application(可能蕴含多个 job)而不是 job 的起因。
每一个 job 蕴含 n 个 stage,最初一个 stage 产生 result。比方,第一章的 GroupByTest 例子中存在两个 job,一共产生了两组 result。在提交 job 过程中,DAGScheduler 会首先划分 stage,而后先提交无 parent stage 的 stages,并在提交过程中确定该 stage 的 task 个数及类型,并提交具体的 task。无 parent stage 的 stage 提交完后,依赖该 stage 的 stage 才可能提交。从 stage 和 task 的执行角度来讲,一个 stage 的 parent stages 执行完后,该 stage 能力执行。
提交 job 的实现细节 上面简略剖析下 job 的生成和提交代码,提交过程在 Architecture 那一章也会有图文并茂的剖析:

  1. rdd.action() 会调用 DAGScheduler.runJob(rdd, processPartition, resultHandler) 来生成 job。
  2. runJob() 会首先通过 rdd.getPartitions() 来失去 finalRDD 中应该存在的 partition 的个数和类型:Array[Partition]。而后依据 partition 个数 new 进去未来要持有 result 的数组 ArrayResult。
  3. 最初调用 DAGScheduler 的 runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler) 来提交 job。cleanedFunc 是 processParittion 通过闭包清理后的后果,这样能够被序列化后传递给不同节点的 task。
  4. DAGScheduler 的 runJob 持续调用 submitJob(rdd, func, partitions, allowLocal, resultHandler) 来提交 job。
  5. submitJob() 首先失去一个 jobId,而后再次包装 func,向 DAGSchedulerEventProcessActor 发送 JobSubmitted 信息,该 actor 收到信息后进一步调用 dagScheduler.handleJobSubmitted() 来解决提交的 job。之所以这么麻烦,是为了合乎事件驱动模型。
  6. handleJobSubmmitted() 首先调用 finalStage = newStage() 来划分 stage,而后 submitStage(finalStage)。因为 finalStage 可能有 parent stages,理论先提交 parent stages,等到他们执行完,finalStage 须要再次提交执行。再次提交由 handleJobSubmmitted() 最初的 submitWaitingStages() 负责。

剖析一下 newStage() 如何划分 stage:

  1. 该办法在 new Stage() 的时候会调用 finalRDD 的 getParentStages()。
  2. getParentStages() 从 finalRDD 登程,反向 visit 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 退出到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage。
  3. 一个 ShuffleMapStage(不是最初造成 result 的 stage)造成后,会将该 stage 最初一个 RDD 注册到 MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程须要 MapOutputTrackerMaster 来批示 ShuffleMapTask 输入数据的地位。

剖析一下 submitStage(stage) 如何提交 stage 和 task:

  1. 先确定该 stage 的 missingParentStages,应用 getMissingParentStages(stage)。如果 parentStages 都可能曾经执行过了,那么就为空了。
  2. 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将本人退出到 waitingStages 外面,等到 parent stages 执行完结后,会触发提交 waitingStages 外面的 stage。
  3. 如果 missingParentStages 为空,阐明该 stage 能够立刻执行,那么就调用 submitMissingTasks(stage, jobId) 来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 进去与该 stage 最初一个 RDD 的 partition 数雷同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 进去与 stage 最初一个 RDD 的 partition 个数雷同的 ResultTasks。一个 stage 外面的 task 组成一个 TaskSet,最初调用 taskScheduler.submitTasks(taskSet) 来提交一整个 taskSet。
  4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 外面,每一个 taskSet 被包装成 manager: TaskSetMananger,而后交给 schedulableBuilder.addTaskSetManager(manager)。schedulableBuilder 能够是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最初一步是告诉 backend.reviveOffers() 去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。
  5. SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子类,backend.reviveOffers() 其实是向 DriverActor 发送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的时候,会启动 DriverActor。DriverActor 收到 ReviveOffers 音讯后,会调用 launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) 来 launch tasks。scheduler 就是 TaskSchedulerImpl。scheduler.resourceOffers() 从 FIFO 或者 Fair 调度器那里取得排序后的 TaskSetManager,并通过 TaskSchedulerImpl.resourceOffer(),思考 locality 等因素来确定 task 的全副信息 TaskDescription。调度细节这里暂不探讨。
  6. DriverActor 中的 launchTasks() 将每个 task 序列化,如果序列化大小不超过 Akka 的 akkaFrameSize,那么间接将 task 送到 executor 那里执行 executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))。

Discussion 至此,咱们探讨了:
• driver 程序如何触发 job 的生成
• 如何从逻辑执行图失去物理执行图
• pipeline 思维与实现
• 生成与提交 job 的理论代码

还有很多中央没有深刻探讨,如:
• 连贯 stage 的 shuffle 过程
• task 运行过程及运行地位

从逻辑执行图的建设,到将其转换成物理执行图的过程很经典,过程中的 dependency 划分,pipeline,stage 宰割,task 生成 都是井井有条,有理有据的。
ComplexJob 的源代码:
package internals

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.HashPartitioner

object complexJob {

def main(args: Array[String]) {

val sc = new SparkContext("local", "ComplexJob test")
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'),
  (3, 'c'), (4, 'd'),
  (5, 'e'), (3, 'f'),
  (2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)
val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))
val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
  (3, "C"), (4, "D"))
val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))
val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)
val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)
result.foreachWith(i => i)((x, i) => println("[result" + i + "]" + x))
println(result.toDebugString)

}

}

正文完
 0