一文详解Spark-Shuffle

8次阅读

共计 3194 个字符,预计需要花费 8 分钟才能阅读完成。

前言

Spark Shuffle 是大众讨论的比较多的话题了。它是 Spark 任务执行过程中最为重要的过程之一。那么什么是 Shuffle 呢?

Shuffle 一般被翻译成数据混洗,是类 MapReduce 分布式计算框架独有的机制,也是这类分布式计算框架最重要的执行机制。接下来会按照两个层面来谈谈 Shuffle 机制。分别为:

  • 逻辑层面
  • 物理层面

逻辑层面主要是从 RDD 的血缘出发,从 DAG 的角度来讲解 Shuffle,另外也会说明 Spark 容错机制。
物理层面是从执行角度来剖析 Shuffle 是如何发生的

1.RDD 血缘与 Spark 容错

从血缘角度出发就需先了解 DAG,DAG 被称之为有向无环图 。在 DAG 中,最初的 RDD 被成为基础 RDD,在基础 RDD 之上使用算子的过程中后续生成 RDD 被成为一个个子 RDD,它们之间存在依赖关系。无论哪个 RDD 出现问题,都可以由这种依赖关系重新计算而成。这种依赖关系就被成为 RDD 血缘。血缘的表现方式主要分为 宽依赖与窄依赖

1.1 窄依赖与宽依赖

窄依赖的标准定义是 :子 RDD 中的分区与父 RDD 中的分区只存在一对一的映射关系。
宽依赖的标准定义是:子 RDD 中分区与父 RDD 中分区存在一对多的映射关系。

从实际算子来说,map,filter,union 等就是窄依赖,而 groupByKey,reduceByKey 就是典型的宽依赖。

宽依赖还有个名字,叫 shuffle 依赖,也就是说宽依赖必然会发生在 shuffle 操作,shuffle 也是划分 stage 的重要依据。而窄依赖由于不需要发生 shuffle,所有计算都是在分区所在节点完成,类似于 MR 中的 ChainMapper。所以说,在如果在程序中选取的算子形成了宽依赖,那么就必然会触发 shuffle。

所以当 RDD 在 shuffle 过程中某个分区出现了故障,只需要找到当前对应的 Stage,而这个 Stage 必然是某个 shuffle 算子所进行划分的,找到了这个算子,就离定位错误原因越来越近了。

如上图所示,如果 P1_0 分区发生故障,那么按照依赖关系,则需要 P0_0 与 P0_1 的分区重算,P0_0 与 P0_1 没有持久化,就会不断回溯,直到找到存在的父分区为止。至于为什么要持久化,原因就是当计算逻辑复杂时,就会引发依赖链过长,如果其中的某个 RDD 发生了问题。若没有进行持久化,Spark 则会根据 RDD 血缘关系进行重头开始计算。重算显然对我们是代价极高的,所以用户可以在计算过程中,适当的调用 RDD 的 checkpoint 方法,保存好当前算好的中间结果,这样依赖关系链就会大大的缩短。因为 checkpoint 其实是会切断血缘的。这就是 RDD 的血缘机制即 RDD 的容错机制。

而 Spark 的容错机制则是主要分为资源管理平台的容错和 Spark 应用的容错。

1.2 Spark 的容错机制

Spark 的应用是基于资源管理平台运行的,所以资源管理平台的容错也是 Spark 容错的一部分,如 Yarn 的 ResourceManager HA 机制。在 Spark 应用执行的过程中,可能会遇到以下几种失败情况:

  • Driver 出错
  • Executor 出错
  • Task 出错

Dirver 执行失败是 Spark 应用最严重的一种情况,因为它标记着整个作业的执行失败,需要开发人员手动重启 Driver。而 Executor 报错通常是所在的 Worker 出错,这时 Driver 就会将执行失败的 Task 调度到另一个 Executor 继续执行,重新执行的 Task 会根据 RDD 的依赖关系继续计算,并将报错的 Executor 从可用的 Executor 列表中移除。
Spark 会对执行失败的 Task 进行重试,重试 3 次后若依然出错,则整个作业就会失败。而在这个过程中,数据恢复和重试都依赖于 RDD 血缘机制。

2.Spark Shuffle

很多算子都会引起 RDD 中的数据进行重分区,新的分区被创建,旧的分区被合并或者打碎,在重分区过程中,如果数据发生了跨节点移动,就被称为 Shuffle。Spark 对 Shuffle 的实现方式有两种:Hash Shuffle 与 Sort-based Shuffle,这其实是一个优化的过程。在较老的版本中,Spark Shuffle 的方式可以通过 spark.shuffle.manager 配置项进行配置,而在最新的版本中,已经移除了该配置项,统一称为 Sort-based Shuffle。

2.1 Hash Shuffle

在 Spark 1.6.3 之前,Hash Shuffle 都是 Spark Shuffle 的解决方案之一。Shuffle 的过程一般分为两个部分:Shuffle Write 和 Shuffle Fetch,前者是 Map 任务划分分区,输出中间结果,而后者则是 Reduce 任务获取到的这些中间结果。Hash Shuffle 的过程如图下所示:

图中,Shuffle Write 发生在一个节点上,执行 shuffle 任务的 CPU 核数为 1,可以同时执行两个任务,每个任务输出的分区数与 Reducer 数相同,即为 3。每个分区都有一个缓冲区(bucket)用来接收结果,每个缓冲区的大小由配置 spark.shuffle.file.buffer.kb 决定。这样每个缓冲区写满后,就会输出到一个文件段中。而 Reducer 就会去相应的节点拉取文件。

这样设计起来其实是不复杂的。但问题也很明显,主要有两个:

  • 生成的文件个数太大。理论上,每个 Shuffle 任务输出会产生 R 个文件(由 Reduce 个数决定),而 Shuffle 任务的个数往往是由 Map 任务个数 M 决定的,所以总共会生成 M * R 个中间结果文件,而在大型作业中,若是 M 和 R 都是很大的数字的话,就会出现文件句柄数突破操作系统的限制。
  • 缓冲区占用内存空间过大。单节点在执行 Shuffle 任务时缓存区大小消耗(spark.shuffle.file.buffer.kb) × m × R , m 为该节点运行的 shuffle 个数,如果一个核可以执行一个任务,那么 m 就与 cpu 核数相等。这对于有 32,64 核的服务器来说都是不小的内存开销。

所有为了解决第一个问题,Spark 引入了 Flie Consolidation 机制,指通过共同输出文件以降低文件数,如下图所示:

  • 每当 Shuffle 输出时,同一个 CPU 核心处理的 Map 任务的中间结果会输出到同分区的一个文件中,然后 Reducer 只需要一次性将整个文件拿到即可。这样,Shuffle 产生的文件数为 C(CPU 核数)* R。Spark 的 FileConsolidation 机制默认开启,可以通过 spark.shuffle.consolidateFiles 配置项进行配置。

2.2 Sort-based Shuffle

即便是引入了 FlieConsolidation 后,还是无法根本解决中间文件数太大的问题,这时候 Sort-based Shuffle 才算是真正的引入进来。如图所示:

  • 每个 Map 任务会最后只输出两个文件(其中一个是索引文件),其中间过程采用 MapReduce 一样的归并排序,但是会用索引文件记录每个分区的偏移量,输出完成后,Reducer 会根据索引文件得到属于自己的分区,这种情况下,shuffle 产生的中间结果文件为 2 * M(M 为 Map 任务数)。
  • 在基于排序的 Shuffle 中,Spark 还提供了一种折中方案——Bypass Sort-based Shuffle,当 Reduce 任务小于 spark.shuffle.sort.bypassMergeThreshold 配置(默认 200)时,Spark Shuffle 开始按照 Hash Shuffle 的方式处理数据,而不用进行归并排序,只是在 Shuffle Write 步骤的最后,将其合并为 1 个文件,并生成索引文件。这样实际上还是会生成大量的中间文件,只是最后合并为 1 个文件并省去排序所带来的开销,该方案的准确说法是 Hash Shuffle 的 Shuffle Fetch 优化版。
正文完
 0