Spark 调优之 Shuffle 调优
本节开始先解说 Shuffle 外围概念;而后针对HashShuffle、SortShuffle 进行调优;接下来对 map 端、reduce 端 调优;再针对 Spark 中的 数据歪斜 问题进行分析及调优;最初是 Spark 运行过程中的 故障排除。
本文首发于公众号【五分钟学大数据】,本公号专一于大数据技术,分享高质量大数据原创技术文章。
一、Shuffle 的外围概念
1. ShuffleMapStage 与 ResultStage
在划分 stage 时,最初一个 stage 称为 FinalStage,它实质上是一个 ResultStage 对象,后面的所有 stage 被称为 ShuffleMapStage。
ShuffleMapStage 的完结随同着 shuffle 文件的写磁盘。
ResultStage 基本上对应代码中的 action 算子,行将一个函数利用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行完结。
2. Shuffle 中的工作个数
咱们晓得,Spark Shuffle 分为 map 阶段和 reduce 阶段,或者称之为 ShuffleRead 阶段和 ShuffleWrite 阶段,那么对于一次 Shuffle,map 过程和 reduce 过程都会由若干个 task 来执行,那么 map task 和 reduce task 的数量是如何确定的呢?
假如 Spark 工作从 HDFS 中读取数据,那么 初始 RDD 分区个数由该文件的 split 个数决定 ,也就是 一个 split 对应生成的 RDD 的一个 partition,咱们假如初始 partition 个数为 N。
初始 RDD 通过一系列算子计算后(假如没有执行 repartition 和 coalesce 算子进行重分区,则分区个数不变,仍为 N,如果通过重分区算子,那么分区个数变为 M),咱们假如分区个数不变,当执行到 Shuffle 操作时,map 端的 task 个数和 partition 个数统一,即 map task 为 N 个。
reduce 端的 stage 默认取 spark.default.parallelism
这个配置项的值作为分区数,如果没有配置,则以 map 端的最初一个 RDD 的分区数作为其分区数(也就是 N),那么分区数就决定了 reduce 端的 task 的个数。
3. reduce 端数据的读取
依据 stage 的划分咱们晓得,map 端 task 和 reduce 端 task 不在雷同的 stage 中,map task 位于 ShuffleMapStage,reduce task 位于 ResultStage,map task 会先执行,那么后执行的 reduce task 如何晓得从哪里去拉取 map task 落盘后的数据呢?
reduce 端的数据拉取过程如下:
- map task 执行结束后会将计算状态以及磁盘小文件地位等信息封装到 MapStatus 对象中,而后由本过程中的 MapOutPutTrackerWorker 对象将 mapStatus 对象发送给 Driver 过程的 MapOutPutTrackerMaster 对象;
- 在 reduce task 开始执行之前会先让本过程中的 MapOutputTrackerWorker 向 Driver 过程中的 MapoutPutTrakcerMaster 动员申请,申请磁盘小文件地位信息;
- 当所有的 Map task 执行结束后,Driver 过程中的 MapOutPutTrackerMaster 就把握了所有的磁盘小文件的地位信息。此时 MapOutPutTrackerMaster 会通知 MapOutPutTrackerWorker 磁盘小文件的地位信息;
- 实现之前的操作之后,由 BlockTransforService 去 Executor0 所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过 48M(reduce task 每次最多拉取 48M 数据,将拉来的数据存储到 Executor 内存的 20% 内存中)。
二、HashShuffle 解析
以下的探讨都假如每个 Executor 有 1 个 cpu core。
1. 未经优化的 HashShuffleManager
shuffle write 阶段,次要就是在一个 stage 完结计算之后,为了下一个 stage 能够执行 shuffle 类的算子(比方 reduceByKey),而将每个 task 解决的数据按 key 进行“划分”。所谓“划分”,就是 对雷同的 key 执行 hash 算法 ,从而将雷同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于上游 stage 的一个 task。 在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。
下一个 stage 的 task 有多少个,以后 stage 的每个 task 就要创立多少份磁盘文件 。比方下一个 stage 总共有 100 个 task,那么以后 stage 的每个 task 都要创立 100 份磁盘文件。如果以后 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,那么每个 Executor 上总共就要创立 500 个磁盘文件,所有 Executor 上会创立 5000 个磁盘文件。由此可见, 未经优化的 shuffle write 操作所产生的磁盘文件的数量是极其惊人的。
shuffle read 阶段,通常就是一个 stage 刚开始时要做的事件。此时该 stage 的 每一个 task 就须要将上一个 stage 的计算结果中的所有雷同 key,从各个节点上通过网络都拉取到本人所在的节点上,而后进行 key 的聚合或连贯等操作。因为 shuffle write 的过程中,map task 给上游 stage 的每个 reduce task 都创立了一个磁盘文件,因而 shuffle read 的过程中,每个 reduce task 只有从上游 stage 的所有 map task 所在节点上,拉取属于本人的那一个磁盘文件即可。
shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个本人的 buffer 缓冲,每次都只能拉取与 buffer 缓冲雷同大小的数据,而后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最初将所有数据到拉取完,并失去最终的后果。
未优化的 HashShuffleManager 工作原理 如下图所示:
2. 优化后的 HashShuffleManager
为了优化 HashShuffleManager 咱们能够设置一个参数:spark.shuffle.consolidateFiles
,该参数默认值为 false,将其设置为 true 即可开启优化机制,通常来说,如果咱们应用 HashShuffleManager,那么都倡议开启这个选项。
开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为上游 stage 的每个 task 创立一个磁盘文件了,此时会呈现 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与上游 stage 的 task 数量是雷同的。一个 Executor 上有多少个 cpu core,就能够并行执行多少个 task。而第一批并行执行的每个 task 都会创立一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。
当 Executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包含其中的磁盘文件,也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因而,consolidate 机制容许不同的 task 复用同一批磁盘文件,这样就能够无效将多个 task 的磁盘文件进行肯定水平上的合并,从而大幅度缩小磁盘文件的数量,进而晋升 shuffle write 的性能。
假如第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 Executor(Executor CPU 个数为 1),每个 Executor 执行 5 个 task。那么本来应用未经优化的 HashShuffleManager 时,每个 Executor 会产生 500 个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。然而此时通过优化之后,每个 Executor 创立的磁盘文件的数量的计算公式为:cpu core 的数量 * 下一个 stage 的 task 数量
,也就是说,每个 Executor 此时只会创立 100 个磁盘文件,所有 Executor 只会创立 1000 个磁盘文件。
优化后的 HashShuffleManager 工作原理 如下图所示:
三、SortShuffle 解析
SortShuffleManager 的运行机制次要分成两种,一种是 一般运行机制 ,另一种是bypass 运行机制。当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold
参数的值时(默认为 200),就会启用 bypass 机制。
1. 一般运行机制
在该模式下,数据会先写入一个内存数据结构中 ,此时依据不同的 shuffle 算子,可能选用不同的数据结构。 如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存 ; 如果是 join 这种一般的 shuffle 算子,那么会选用 Array 数据结构,间接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。
在溢写到磁盘文件之前,会先依据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的模式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输入流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样能够缩小磁盘 IO 次数,晋升性能。
一个 task 将所有数据写入内存数据结构的过程中,会产生屡次磁盘溢写操作,也就会产生多个临时文件。最初会将之前所有的长期磁盘文件都进行合并,这就是 merge 过程,此时会将之前所有长期磁盘文件中的数据读取进去,而后顺次写入最终的磁盘文件之中。此外,因为一个 task 就只对应一个磁盘文件,也就意味着该 task 为上游 stage 的 task 筹备的数据都在这一个文件中,因而还会独自写一份 索引文件,其中标识了上游各个 task 的数据在文件中的 start offset 与 end offset。
SortShuffleManager 因为有一个磁盘文件 merge 的过程,因而大大减少了文件数量。比方第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。因为每个 task 最终只有一个磁盘文件,因而此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。
一般运行机制的 SortShuffleManager 工作原理 如下图所示:
2. bypass 运行机制
bypass 运行机制的触发条件如下:
- shuffle map task 数量小于
spark.shuffle.sort.bypassMergeThreshold=200
参数的值。 - 不是聚合类的 shuffle 算子。
此时,每个 task 会为每个上游 task 都创立一个长期磁盘文件,并将数据按 key 进行 hash 而后依据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最初,同样会将所有长期磁盘文件都合并成一个磁盘文件,并创立一个独自的索引文件。
该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是截然不同的,因为都要创立数量惊人的磁盘文件,只是在最初会做一个磁盘文件的合并而已。因而大量的最终磁盘文件,也让该机制绝对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
而该机制与一般 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大益处在于,shuffle write 过程中,不须要进行数据的排序操作,也就节俭掉了这部分的性能开销。
bypass 运行机制的 SortShuffleManager 工作原理 如下图所示:
四、map 和 reduce 端缓冲区大小
在 Spark 工作运行过程中,如果 shuffle 的 map 端解决的数据量比拟大,然而 map 端缓冲的大小是固定的,可能会呈现 map 端缓冲数据频繁 spill 溢写到磁盘文件中的状况,使得性能十分低下,通过调节 map 端缓冲的大小,能够防止频繁的磁盘 IO 操作,进而晋升 Spark 工作的整体性能。
map 端缓冲的默认配置是 32KB,如果每个 task 解决 640KB 的数据,那么会产生 640/32 = 20 次溢写,如果每个 task 解决 64000KB 的数据,即会产生 64000/32=2000 次溢写,这对于性能的影响是十分重大的。
map 端缓冲的配置办法:
val conf = new SparkConf()
.set("spark.shuffle.file.buffer", "64")
Spark Shuffle 过程中,shuffle reduce task 的 buffer 缓冲区大小决定了 reduce task 每次可能缓冲的数据量,也就是每次可能拉取的数据量,如果内存资源较为短缺,适当减少拉取数据缓冲区的大小,能够缩小拉取数据的次数,也就能够缩小网络传输的次数,进而晋升性能。
reduce 端数据拉取缓冲区的大小能够通过 spark.reducer.maxSizeInFlight
参数进行设置,默认为 48MB。该参数的设置办法如下:
reduce 端数据拉取缓冲区配置:
val conf = new SparkConf()
.set("spark.reducer.maxSizeInFlight", "96")
五、reduce 端重试次数和等待时间距离
Spark Shuffle 过程中,reduce task 拉取属于本人的数据时,如果因为网络异样等起因导致失败会主动进行重试。对于那些蕴含了特地耗时的 shuffle 操作的作业,倡议减少重试最大次数 (比方 60 次),以防止因为 JVM 的 full gc 或者网络不稳固等因素导致的数据拉取失败。 在实践中发现,对于针对超大数据量(数十亿~ 上百亿)的 shuffle 过程,调节该参数能够大幅度晋升稳定性。
reduce 端拉取数据重试次数能够通过 spark.shuffle.io.maxRetries
参数进行设置,该参数就代表了能够重试的最大次数。如果在指定次数之内拉取还是没有胜利,就可能会导致作业执行失败,默认为 3,该参数的设置办法如下:
reduce 端拉取数据重试次数配置:
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
Spark Shuffle 过程中,reduce task 拉取属于本人的数据时,如果因为网络异样等起因导致失败会主动进行重试,在一次失败后,会期待肯定的工夫距离再进行重试,能够通过加大距离时长(比方 60s),以减少 shuffle 操作的稳定性。
reduce 端拉取数据期待距离能够通过 spark.shuffle.io.retryWait
参数进行设置,默认值为 5s,该参数的设置办法如下:
reduce 端拉取数据期待距离配置:
val conf = new SparkConf()
.set("spark.shuffle.io.retryWait", "60s")
六、bypass 机制开启阈值
对于 SortShuffleManager,如果 shuffle reduce task 的数量小于某一阈值则 shuffle write 过程中不会进行排序操作,而是间接依照未经优化的 HashShuffleManager 的形式去写数据,然而最初会将每个 task 产生的所有长期磁盘文件都合并成一个文件,并会创立独自的索引文件。
当你 应用 SortShuffleManager 时,如果确实不须要排序操作,那么倡议将这个参数调大一些,大于 shuffle read task 的数量,那么此时 map-side 就不会进行排序了,缩小了排序的性能开销,然而这种形式下,仍然会产生大量的磁盘文件,因而 shuffle write 性能有待进步。
SortShuffleManager 排序操作阈值的设置能够通过 spark.shuffle.sort.bypassMergeThreshold
这一参数进行设置,默认值为 200,该参数的设置办法如下:
reduce 端拉取数据期待距离配置:
val conf = new SparkConf()
.set("spark.shuffle.sort.bypassMergeThreshold", "400")
数据歪斜
就是数据分到各个区的数量不太平均, 能够自定义分区器, 想怎么分就怎么分。
Spark 中的数据歪斜问题次要指 shuffle 过程中呈现的数据歪斜问题,是因为不同的 key 对应的数据量不同导致的不同 task 所解决的数据量不同的问题。
例如,reduced 端一共要解决 100 万条数据,第一个和第二个 task 别离被调配到了 1 万条数据,计算 5 分钟内实现,第三个 task 调配到了 98 万数据,此时第三个 task 可能须要 10 个小时实现,这使得整个 Spark 作业须要 10 个小时能力运行实现,这就是数据歪斜所带来的结果。
留神,要辨别开 数据歪斜 与数据适量 这两种状况,数据歪斜是指多数 task 被调配了绝大多数的数据,因而多数 task 运行迟缓;数据适量是指所有 task 被调配的数据量都很大,相差不多,所有 task 都运行迟缓。
数据歪斜的体现:
- Spark 作业的大部分 task 都执行迅速,只有无限的几个 task 执行的十分慢,此时可能呈现了数据歪斜,作业能够运行,然而运行得十分慢;
- Spark 作业的大部分 task 都执行迅速,然而有的 task 在运行过程中会忽然报出 OOM,重复执行几次都在某一个 task 报出 OOM 谬误,此时可能呈现了数据歪斜,作业无奈失常运行。
定位数据歪斜问题:
- 查阅代码中的 shuffle 算子,例如 reduceByKey、countByKey、groupByKey、join 等算子,依据代码逻辑判断此处是否会呈现数据歪斜;
- 查看 Spark 作业的 log 文件,log 文件对于谬误的记录会准确到代码的某一行,能够依据异样定位到的代码地位来明确谬误产生在第几个 stage,对应的 shuffle 算子是哪一个;
1. 预聚合原始数据
1. 防止 shuffle 过程
绝大多数状况下,Spark 作业的数据起源都是 Hive 表,这些 Hive 表根本都是通过 ETL 之后的昨天的数据。
为了防止数据歪斜,咱们能够思考防止 shuffle 过程,如果防止了 shuffle 过程,那么从根本上就打消了产生数据歪斜问题的可能。
如果 Spark 作业的数据来源于 Hive 表,那么能够先在 Hive 表中对数据进行聚合,例如依照 key 进行分组,将同一 key 对应的所有 value 用一种非凡的格局拼接到一个字符串里去,这样,一个 key 就只有一条数据了;之后,对一个 key 的所有 value 进行解决时,只须要进行 map 操作即可,无需再进行任何的 shuffle 操作。通过上述形式就防止了执行 shuffle 操作,也就不可能会产生任何的数据歪斜问题。
对于 Hive 表中数据的操作,不肯定是拼接成一个字符串,也能够是间接对 key 的每一条数据进行累计计算。
要辨别开,解决的数据量大和数据歪斜的区别。
2. 增大 key 粒度(减小数据歪斜可能性,增大每个 task 的数据量)
如果没有方法对每个 key 聚合进去一条数据,在特定场景下,能够思考扩充 key 的聚合粒度。
例如,目前有 10 万条用户数据,以后 key 的粒度是(省,城市,区,日期),当初咱们思考扩充粒度,将 key 的粒度扩充为(省,城市,日期),这样的话,key 的数量会缩小,key 之间的数据量差别也有可能会缩小,由此能够加重数据歪斜的景象和问题。(此办法只针对特定类型的数据无效,当利用场景不合适时,会减轻数据歪斜)
2. 预处理导致歪斜的 key
1. 过滤
如果在 Spark 作业中容许抛弃某些数据,那么能够思考将可能导致数据歪斜的 key 进行过滤,滤除可能导致数据歪斜的 key 对应的数据,这样,在 Spark 作业中就不会产生数据歪斜了。
2. 应用随机 key
当应用了相似于 groupByKey、reduceByKey 这样的算子时,能够思考应用随机 key 实现双重聚合,如下图所示:
首先,通过 map 算子给每个数据的 key 增加随机数前缀,对 key 进行打散,将原先一样的 key 变成不一样的 key,而后进行第一次聚合,这样就能够让本来被一个 task 解决的数据扩散到多个 task 下来做部分聚合;随后,去除掉每个 key 的前缀,再次进行聚合。
此办法对于由 groupByKey、reduceByKey 这类算子造成的数据歪斜有比拟好的成果,仅仅实用于聚合类的 shuffle 操作,适用范围绝对较窄。如果是 join 类的 shuffle 操作,还得用其余的解决方案。
此办法也是前几种计划没有比拟好的成果时要尝试的解决方案。
3. sample 采样对歪斜 key 独自进行 join
在 Spark 中,如果某个 RDD 只有一个 key,那么在 shuffle 过程中会默认将此 key 对应的数据打散,由不同的 reduce 端 task 进行解决。
所以当由单个 key 导致数据歪斜时,可有将产生数据歪斜的 key 独自提取进去,组成一个 RDD,而后用这个原本会导致歪斜的 key 组成的 RDD 和其余 RDD 独自 join,此时,依据 Spark 的运行机制,此 RDD 中的数据会在 shuffle 阶段被扩散到多个 task 中去进行 join 操作。
歪斜 key 独自 join 的流程如下图所示:
实用场景剖析:
对于 RDD 中的数据,能够将其转换为一个两头表,或者是间接应用 countByKey()的形式,看一下这个 RDD 中各个 key 对应的数据量,此时如果你发现整个 RDD 就一个 key 的数据量特地多,那么就能够思考应用这种办法。
当数据量十分大时,能够思考应用 sample 采样获取 10% 的数据,而后剖析这 10% 的数据中哪个 key 可能会导致数据歪斜,而后将这个 key 对应的数据独自提取进去。
不实用场景剖析:
如果一个 RDD 中导致数据歪斜的 key 很多,那么此计划不实用。
3. 进步 reduce 并行度
当计划一和计划二对于数据歪斜的解决没有很好的成果时,能够思考进步 shuffle 过程中的 reduce 端并行度,reduce 端并行度的进步就减少了 reduce 端 task 的数量,那么每个 task 调配到的数据量就会相应缩小,由此缓解数据歪斜问题。
1. reduce 端并行度的设置
在大部分的 shuffle 算子中,都能够传入一个并行度的设置参数,比方 reduceByKey(500),这个参数会决定 shuffle 过程中 reduce 端的并行度,在进行 shuffle 操作的时候,就会对应着创立指定数量的 reduce task。对于 Spark SQL 中的 shuffle 类语句,比方 group by、join 等,须要设置一个参数,即spark.sql.shuffle.partitions
,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。
减少 shuffle read task 的数量,能够让本来调配给一个 task 的多个 key 调配给多个 task,从而让每个 task 解决比原来更少的数据。
举例来说,如果本来有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是调配给一个 task 的,那么这个 task 就要解决 50 条数据。而减少了 shuffle read task 当前,每个 task 就调配到一个 key,即每个 task 就解决 10 条数据,那么天然每个 task 的执行工夫都会变短了。
2. reduce 端并行度设置存在的缺点
进步 reduce 端并行度并没有从根本上扭转数据歪斜的实质和问题(计划一和计划二从根本上防止了数据歪斜的产生),只是尽可能地去缓解和加重 shuffle reduce task 的数据压力,以及数据歪斜的问题,实用于有较多 key 对应的数据量都比拟大的状况。
该计划通常无奈彻底解决数据歪斜,因为如果呈现一些极其状况,比方某个 key 对应的数据量有 100 万,那么无论你的 task 数量减少到多少,这个对应着 100 万数据的 key 必定还是会调配到一个 task 中去解决,因而注定还是会产生数据歪斜的。所以这种计划只能说是在发现数据歪斜时尝试应用的一种伎俩,尝试去用最简略的办法缓解数据歪斜而已,或者是和其余计划联合起来应用。
在现实状况下,reduce 端并行度晋升后,会在肯定水平上加重数据歪斜的问题,甚至根本打消数据歪斜;然而,在一些状况下,只会让原来因为数据歪斜而运行迟缓的 task 运行速度稍有晋升,或者防止了某些 task 的 OOM 问题,然而,依然运行迟缓,此时,要及时放弃计划三,开始尝试前面的计划。
4. 应用 map join
失常状况下,join 操作都会执行 shuffle 过程,并且执行的是 reduce join,也就是先将所有雷同的 key 和对应的 value 汇聚到一个 reduce task 中,而后再进行 join。一般 join 的过程如下图所示:
一般的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将雷同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。然而如果一个 RDD 是比拟小的,则能够采纳播送小 RDD 全量数据 +map 算子来实现与 join 同样的成果,也就是 map join,此时就不会产生 shuffle 操作,也就不会产生数据歪斜。
留神:RDD 是并不能间接进行播送的,只能将 RDD 外部的数据通过 collect 拉取到 Driver 内存而后再进行播送。
1. 外围思路:
不应用 join 算子进行连贯操作,而应用 broadcast 变量与 map 类算子实现 join 操作,进而齐全躲避掉 shuffle 类的操作,彻底防止数据歪斜的产生和呈现。将较小 RDD 中的数据间接通过 collect 算子拉取到 Driver 端的内存中来,而后对其创立一个 broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 broadcast 变量中获取较小 RDD 的全量数据,与以后 RDD 的每一条数据依照连贯 key 进行比对,如果连贯 key 雷同的话,那么就将两个 RDD 的数据用你须要的形式连接起来。
根据上述思路,基本不会产生 shuffle 操作,从根本上杜绝了 join 操作可能导致的数据歪斜问题。
当 join 操作有数据歪斜问题并且其中一个 RDD 的数据量较小时,能够优先思考这种形式,成果十分好。
map join 的过程如下图所示:
2. 不实用场景剖析:
因为 Spark 的播送变量是在每个 Executor 中保留一个正本,如果两个 RDD 数据量都比拟大,那么如果将一个数据量比拟大的 RDD 做成播送变量,那么很有可能会造成内存溢出。
故障排除
1. 防止 OOM-out of memory
在 Shuffle 过程,reduce 端 task 并不是等到 map 端 task 将其数据全副写入磁盘后再去拉取,而是map 端写一点数据,reduce 端 task 就会拉取一小部分数据,而后立刻进行前面的聚合、算子函数的应用等操作。
reduce 端 task 可能拉取多少数据,由 reduce 拉取数据的缓冲区 buffer 来决定,因为 拉取过去的数据都是先放在 buffer 中,而后再进行后续的解决,buffer 的默认大小为 48MB。
reduce 端 task 会一边拉取一边计算,不肯定每次都会拉满 48MB 的数据,可能大多数时候拉取一部分数据就解决掉了。
尽管说增大 reduce 端缓冲区大小能够缩小拉取次数,晋升 Shuffle 性能,然而有时 map 端的数据量十分大,写出的速度十分快,此时 reduce 端的所有 task 在拉取的时候,有可能全副达到本人缓冲的最大极限值,即 48MB,此时,再加上 reduce 端执行的聚合函数的代码,可能会创立大量的对象,这可能会导致内存溢出,即OOM。
如果一旦呈现 reduce 端内存溢出的问题,咱们能够思考减小 reduce 端拉取数据缓冲区的大小,例如缩小为 12MB。
在理论生产环境中是呈现过这种问题的,这是典型的 以性能换执行的原理。reduce 端拉取数据的缓冲区减小,不容易导致 OOM,然而相应的,reudce 端的拉取次数减少,造成更多的网络传输开销,造成性能的降落。
留神,要保障工作可能运行,再思考性能的优化。
2. 防止 GC 导致的 shuffle 文件拉取失败
在 Spark 作业中,有时会呈现 shuffle file not found
的谬误,这是十分常见的一个报错,有时呈现这种谬误当前,抉择从新执行一遍,就不再报出这种谬误。
呈现上述问题可能的起因 是 Shuffle 操作中,前面 stage 的 task 想要去上一个 stage 的 task 所在的 Executor 拉取数据,后果对方正在执行 GC,执行 GC 会导致 Executor 内所有的工作现场全副进行 ,比方 BlockManager、基于 netty 的网络通信等,这就会导致前面的 task 拉取数据拉取了半天都没有拉取到,就会报出shuffle file not found
的谬误,而第二次再次执行就不会再呈现这种谬误。
能够通过调整 reduce 端拉取数据重试次数和 reduce 端拉取数据工夫距离这两个参数来对 Shuffle 性能进行调整,增大参数值,使得 reduce 端拉取数据的重试次数减少,并且每次失败后期待的工夫距离加长。
JVM GC 导致的 shuffle 文件拉取失败调整数据重试次数和 reduce 端拉取数据工夫距离:
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
.set("spark.shuffle.io.retryWait", "60s")
3. YARN-CLIENT 模式导致的网卡流量激增问题
在 YARN-client 模式下,Driver 启动在本地机器上,而 Driver 负责所有的任务调度,须要与 YARN 集群上的多个 Executor 进行频繁的通信。
假如有 100 个 Executor,1000 个 task,那么每个 Executor 调配到 10 个 task,之后,Driver 要频繁地跟 Executor 上运行的 1000 个 task 进行通信,通信数据十分多,并且通信品类特地高。这就导致有可能在 Spark 工作运行过程中,因为频繁大量的网络通讯,本地机器的网卡流量会激增。
留神,YARN-client 模式只会在测试环境中应用,而之所以应用 YARN-client 模式,是因为能够看到具体全面的 log 信息,通过查看 log,能够锁定程序中存在的问题,防止在生产环境下产生故障。
在生产环境下,应用的肯定是 YARN-cluster 模式。在 YARN-cluster 模式下,就不会造成本地机器网卡流量激增问题,如果 YARN-cluster 模式下存在网络通信的问题,须要运维团队进行解决。
4. YARN-CLUSTER 模式的 JVM 栈内存溢出无奈执行问题
当 Spark 作业中蕴含 SparkSQL 的内容时,可能会碰到 YARN-client 模式下能够运行,然而 YARN-cluster 模式下无奈提交运行(报出 OOM 谬误)的状况。
YARN-client 模式下,Driver 是运行在本地机器上的,Spark 应用的 JVM 的 PermGen 的配置,是本地机器上的 spark-class 文件,JVM 永恒代的大小是 128MB,这个是没有问题的,然而在YARN-cluster 模式下,Driver 运行在 YARN 集群的某个节点上,应用的是没有通过配置的默认设置,PermGen 永恒代大小为 82MB。
SparkSQL 的外部要进行很简单的 SQL 的语义解析、语法树转换等等,非常复杂,如果 sql 语句自身就非常复杂,那么很有可能会导致性能的损耗和内存的占用,特地是对 PermGen 的占用会比拟大。
所以,此时如果 PermGen 占用好过了 82MB,然而又小于 128MB,就会呈现 YARN-client 模式下能够运行,YARN-cluster 模式下无奈运行的状况。
解决上述问题的办法是减少 PermGen(永恒代)的容量 ,须要在spark-submit 脚本中对相干参数进行设置,设置办法如下:
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
通过上述办法就设置了 Driver 永恒代的大小,默认为 128MB,最大 256MB,这样就能够防止下面所说的问题。
5. 防止 SparkSQL JVM 栈内存溢出
当 SparkSQL 的 sql 语句有成千盈百的 or 关键字时,就可能会呈现 Driver 端的 JVM 栈内存溢出。
JVM 栈内存溢出基本上就是因为调用的办法层级过多,产生了大量的,十分深的,超出了 JVM 栈深度限度的递归。(咱们猜想 SparkSQL 有大量 or 语句的时候,在解析 SQL 时,例如转换为语法树或者进行执行打算的生成的时候,对于 or 的解决是递归,or 十分多时,会产生大量的递归)
此时,倡议将一条 sql 语句拆分为多条 sql 语句来执行,每条 sql 语句尽量保障 100 个以内的子句。依据理论的生产环境试验,一条 sql 语句的 or 关键字管制在 100 个以内,通常不会导致 JVM 栈内存溢出。
更多大数据好文,欢送关注公众号【五分钟学大数据】
–end–
文章举荐:
Spark 性能调优 -RDD 算子调优篇