上一篇内容讲了资源参数优化,本篇持续阐明spark driver以及spark shuffle相干的参数调优。

扩大spark driver

动静资源分配

在 Facebook,Spark 集群启用了动静资源分配(Dynamic Executor Allocation),以便更好的应用集群资源,而且在 Facebook 外部,Spark 是运行在多租户的集群上,所以这个也是十分适合的。比方典型的配置如下:

  • spark.dynamicAllocation.enabled = true
  • spark.dynamicAllocation.executorIdleTimeout = 2m
  • spark.dynamicAllocation.minExecutors = 1
  • spark.dynamicAllocation.maxExecutors = 2000

点击增加图片形容(最多60个字)编辑

多线程事件处理

在 Spark 2.3 版本之前,事件处理是单线程的架构,也就是说,事件队列外面的事件得一个一个解决。如果你的作业很大,并且有很多 tasks,很可能会导致事件处理呈现提早,进一步导致作业性能呈现问题,甚至使以后作业失败。为了解决这个问题,SPARK-18838 这个 ISSUE 引入了多线程事件处理架构,每个事件都有其独自的单线程 executor service 去解决,这样就能够大大减少事件处理延时的问题。另外,因为每类事件都有独自的事件队列,所以会减少 Driver 端的内存占用。

点击增加图片形容(最多60个字)编辑

更好的 Fetch 失败解决

在 Spark 2.3 版本之前,如果 Spark 探测到 fetch failure,那么它会把产生这个 shuffle 文件的 Executor 移除掉。然而如果这个 Executor 所在的机器有很多 Executor,而且是因为这台机器挂掉导致 fetch failure,那么会导致很多的 fetch 重试,这种解决机制很低下。SPARK-19753 这个 ISSUE 使得 Spark 能够把上述场景所有 Executor 的 shuffle 文件移除,也就是不再去重试就晓得 shuffle 文件不可用。

点击增加图片形容(最多60个字)编辑

另外,Spark 最大 Fetch 重试次数也能够通过 spark.max.fetch.failures.per.stage 参数配置。

FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,而后由 executor 通过 statusUpdate 传到 driver 端,理论的解决会在 DAGScheduler.handleTaskCompletion,它会从新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。

RPC 服务线程调优

当 Spark 同时运行大量的 tasks 时,Driver 很容易呈现 OOM,这是因为在 Driver 端的 Netty 服务器上产生大量 RPC 的申请积压,咱们能够通过加大 RPC 服务的线程数解决 OOM 问题,比方 spark.rpc.io.serverThreads = 64。

spark shuffle相干的参数调优

spark.shuffle.file.buffer

默认值:32k

参数阐明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

调优倡议:如果作业可用的内存资源较为短缺的话,能够适当减少这个参数的大小(比方64k),从而缩小shuffle write过程中溢写磁盘文件的次数,也就能够缩小磁盘IO次数,进而晋升性能。在实践中发现,正当调节该参数,性能会有1%~5%的晋升。

spark.reducer.maxSizeInFlight

默认值:48m

参数阐明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可能拉取多少数据。

调优倡议:如果作业可用的内存资源较为短缺的话,能够适当减少这个参数的大小(比方96m),从而缩小拉取数据的次数,也就能够缩小网络传输的次数,进而晋升性能。在实践中发现,正当调节该参数,性能会有1%~5%的晋升。

spark.shuffle.io.maxRetries

默认值:3

参数阐明:shuffle read task从shuffle write task所在节点拉取属于本人的数据时,如果因为网络异样导致拉取失败,是会主动进行重试的。该参数就代表了能够重试的最大次数。如果在指定次数之内拉取还是没有胜利,就可能会导致作业执行失败。

调优倡议:对于那些蕴含了特地耗时的shuffle操作的作业,倡议减少重试最大次数(比方60次),以防止因为JVM的full gc或者网络不稳固等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度晋升稳定性。

spark.shuffle.io.retryWait

默认值:5s

参数阐明:具体解释同上,该参数代表了每次重试拉取数据的期待距离,默认是5s。

调优倡议:倡议加大距离时长(比方60s),以减少shuffle操作的稳定性。

spark.shuffle.memoryFraction

默认值:0.2

参数阐明:该参数代表了Executor内存中,调配给shuffle read task进行聚合操作的内存比例,默认是20%。

调优倡议:如果内存短缺,而且很少应用长久化操作,倡议调高这个比例,给shuffle read的聚合操作更多内存,以防止因为内存不足导致聚合过程中频繁读写磁盘。在实践中发现,正当调节该参数能够将性能晋升10%左右。

spark.shuffle.manager

默认值:sort

参数阐明:该参数用于设置ShuffleManager的类型。Spark 1.5当前,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,然而Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,然而应用了tungsten打算中的堆外内存管理机制,内存应用效率更高。

调优倡议:因为SortShuffleManager默认会对数据进行排序,因而如果你的业务逻辑中须要该排序机制的话,则应用默认的SortShuffleManager就能够;而如果你的业务逻辑不须要对数据进行排序,那么倡议参考前面的几个参数调优,通过bypass机制或优化的HashShuffleManager来防止排序操作,同时提供较好的磁盘读写性能。这里要留神的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数阐明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是间接依照未经优化的HashShuffleManager的形式去写数据,然而最初会将每个task产生的所有长期磁盘文件都合并成一个文件,并会创立独自的索引文件。

调优倡议:当你应用SortShuffleManager时,如果确实不须要排序操作,那么倡议将这个参数调大一些,大于shuffle read task的数量。那么此时就会主动启用bypass机制,map-side就不会进行排序了,缩小了排序的性能开销。然而这种形式下,仍然会产生大量的磁盘文件,因而shuffle write性能有待进步。

spark.shuffle.consolidateFiles

默认值:false

参数阐明:如果应用HashShuffleManager,该参数无效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输入文件,对于shuffle read task数量特地多的状况下,这种办法能够极大地缩小磁盘IO开销,晋升性能。

调优倡议:如果确实不须要SortShuffleManager的排序机制,那么除了应用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,应用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

spark.reducer.maxBlocksInFlightPerAddress

默认值:Int.MaxValue(2的31次方-1)

限度了每个主机每次reduce能够被多少台近程主机拉取文件块,调低这个参数能够无效加重node manager的负载。

spark.reducer.maxReqsInFlight

默认值:Int.MaxValue(2的31次方-1)

限度近程机器拉取本机器文件块的申请数,随着集群增大,须要对此做出限度。否则可能会使本机负载过大而挂掉。。

spark.reducer.maxReqSizeShuffleToMem

默认值:Long.MaxValue

shuffle申请的文件块大小 超过这个参数值,就会被强行落盘,避免一大堆并发申请把内存占满。

spark.shuffle.compress

默认压缩 true

是否压缩map输入文件

spark.shuffle.spill.compress

默认:true

shuffle过程中溢出的文件是否压缩,应用spark.io.compression.codec压缩。

理解更多