上一篇内容讲了资源参数优化,本篇持续阐明 spark driver 以及 spark shuffle 相干的参数调优。
扩大 spark driver
动静资源分配
在 Facebook,Spark 集群启用了动静资源分配(Dynamic Executor Allocation),以便更好的应用集群资源,而且在 Facebook 外部,Spark 是运行在多租户的集群上,所以这个也是十分适合的。比方典型的配置如下:
- spark.dynamicAllocation.enabled = true
- spark.dynamicAllocation.executorIdleTimeout = 2m
- spark.dynamicAllocation.minExecutors = 1
- spark.dynamicAllocation.maxExecutors = 2000
点击增加图片形容(最多 60 个字)编辑
多线程事件处理
在 Spark 2.3 版本之前,事件处理是单线程的架构,也就是说,事件队列外面的事件得一个一个解决。如果你的作业很大,并且有很多 tasks,很可能会导致事件处理呈现提早,进一步导致作业性能呈现问题,甚至使以后作业失败。为了解决这个问题,SPARK-18838 这个 ISSUE 引入了多线程事件处理架构,每个事件都有其独自的单线程 executor service 去解决,这样就能够大大减少事件处理延时的问题。另外,因为每类事件都有独自的事件队列,所以会减少 Driver 端的内存占用。
点击增加图片形容(最多 60 个字)编辑
更好的 Fetch 失败解决
在 Spark 2.3 版本之前,如果 Spark 探测到 fetch failure,那么它会把产生这个 shuffle 文件的 Executor 移除掉。然而如果这个 Executor 所在的机器有很多 Executor,而且是因为这台机器挂掉导致 fetch failure,那么会导致很多的 fetch 重试,这种解决机制很低下。SPARK-19753 这个 ISSUE 使得 Spark 能够把上述场景所有 Executor 的 shuffle 文件移除,也就是不再去重试就晓得 shuffle 文件不可用。
点击增加图片形容(最多 60 个字)编辑
另外,Spark 最大 Fetch 重试次数也能够通过 spark.max.fetch.failures.per.stage 参数配置。
FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,而后由 executor 通过 statusUpdate 传到 driver 端,理论的解决会在 DAGScheduler.handleTaskCompletion,它会从新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。
RPC 服务线程调优
当 Spark 同时运行大量的 tasks 时,Driver 很容易呈现 OOM,这是因为在 Driver 端的 Netty 服务器上产生大量 RPC 的申请积压,咱们能够通过加大 RPC 服务的线程数解决 OOM 问题,比方 spark.rpc.io.serverThreads = 64。
spark shuffle 相干的参数调优
spark.shuffle.file.buffer
默认值:32k
参数阐明: 该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优倡议: 如果作业可用的内存资源较为短缺的话,能够适当减少这个参数的大小(比方 64k),从而缩小 shuffle write 过程中溢写磁盘文件的次数,也就能够缩小磁盘 IO 次数,进而晋升性能。在实践中发现,正当调节该参数,性能会有 1%~5% 的晋升。
spark.reducer.maxSizeInFlight
默认值:48m
参数阐明: 该参数用于设置 shuffle read task 的 buffer 缓冲大小,而这个 buffer 缓冲决定了每次可能拉取多少数据。
调优倡议: 如果作业可用的内存资源较为短缺的话,能够适当减少这个参数的大小(比方 96m),从而缩小拉取数据的次数,也就能够缩小网络传输的次数,进而晋升性能。在实践中发现,正当调节该参数,性能会有 1%~5% 的晋升。
spark.shuffle.io.maxRetries
默认值:3
参数阐明:shuffle read task 从 shuffle write task 所在节点拉取属于本人的数据时,如果因为网络异样导致拉取失败,是会主动进行重试的。该参数就代表了能够重试的最大次数。如果在指定次数之内拉取还是没有胜利,就可能会导致作业执行失败。
调优倡议: 对于那些蕴含了特地耗时的 shuffle 操作的作业,倡议减少重试最大次数(比方 60 次),以防止因为 JVM 的 full gc 或者网络不稳固等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~ 上百亿)的 shuffle 过程,调节该参数能够大幅度晋升稳定性。
spark.shuffle.io.retryWait
默认值:5s
参数阐明: 具体解释同上,该参数代表了每次重试拉取数据的期待距离,默认是 5s。
调优倡议: 倡议加大距离时长(比方 60s),以减少 shuffle 操作的稳定性。
spark.shuffle.memoryFraction
默认值:0.2
参数阐明: 该参数代表了 Executor 内存中,调配给 shuffle read task 进行聚合操作的内存比例,默认是 20%。
调优倡议: 如果内存短缺,而且很少应用长久化操作,倡议调高这个比例,给 shuffle read 的聚合操作更多内存,以防止因为内存不足导致聚合过程中频繁读写磁盘。在实践中发现,正当调节该参数能够将性能晋升 10% 左右。
spark.shuffle.manager
默认值:sort
参数阐明: 该参数用于设置 ShuffleManager 的类型。Spark 1.5 当前,有三个可选项:hash、sort 和 tungsten-sort。HashShuffleManager 是 Spark 1.2 以前的默认选项,然而 Spark 1.2 以及之后的版本默认都是 SortShuffleManager 了。tungsten-sort 与 sort 相似,然而应用了 tungsten 打算中的堆外内存管理机制,内存应用效率更高。
调优倡议: 因为 SortShuffleManager 默认会对数据进行排序,因而如果你的业务逻辑中须要该排序机制的话,则应用默认的 SortShuffleManager 就能够;而如果你的业务逻辑不须要对数据进行排序,那么倡议参考前面的几个参数调优,通过 bypass 机制或优化的 HashShuffleManager 来防止排序操作,同时提供较好的磁盘读写性能。这里要留神的是,tungsten-sort 要慎用,因为之前发现了一些相应的 bug。
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数阐明: 当 ShuffleManager 为 SortShuffleManager 时,如果 shuffle read task 的数量小于这个阈值(默认是 200),则 shuffle write 过程中不会进行排序操作,而是间接依照未经优化的 HashShuffleManager 的形式去写数据,然而最初会将每个 task 产生的所有长期磁盘文件都合并成一个文件,并会创立独自的索引文件。
调优倡议: 当你应用 SortShuffleManager 时,如果确实不须要排序操作,那么倡议将这个参数调大一些,大于 shuffle read task 的数量。那么此时就会主动启用 bypass 机制,map-side 就不会进行排序了,缩小了排序的性能开销。然而这种形式下,仍然会产生大量的磁盘文件,因而 shuffle write 性能有待进步。
spark.shuffle.consolidateFiles
默认值:false
参数阐明: 如果应用 HashShuffleManager,该参数无效。如果设置为 true,那么就会开启 consolidate 机制,会大幅度合并 shuffle write 的输入文件,对于 shuffle read task 数量特地多的状况下,这种办法能够极大地缩小磁盘 IO 开销,晋升性能。
调优倡议: 如果确实不须要 SortShuffleManager 的排序机制,那么除了应用 bypass 机制,还能够尝试将 spark.shffle.manager 参数手动指定为 hash,应用 HashShuffleManager,同时开启 consolidate 机制。在实践中尝试过,发现其性能比开启了 bypass 机制的 SortShuffleManager 要高出 10%~30%。
spark.reducer.maxBlocksInFlightPerAddress
默认值:Int.MaxValue(2 的 31 次方 -1)
限度了每个主机每次 reduce 能够被多少台近程主机拉取文件块,调低这个参数能够无效加重 node manager 的负载。
spark.reducer.maxReqsInFlight
默认值:Int.MaxValue(2 的 31 次方 -1)
限度近程机器拉取本机器文件块的申请数,随着集群增大,须要对此做出限度。否则可能会使本机负载过大而挂掉。。
spark.reducer.maxReqSizeShuffleToMem
默认值:Long.MaxValue
shuffle 申请的文件块大小 超过这个参数值,就会被强行落盘,避免一大堆并发申请把内存占满。
spark.shuffle.compress
默认压缩 true
是否压缩 map 输入文件
spark.shuffle.spill.compress
默认:true
shuffle 过程中溢出的文件是否压缩,应用 spark.io.compression.codec 压缩。
理解更多