关于spark:Spark参数调优

3次阅读

共计 11833 个字符,预计需要花费 30 分钟才能阅读完成。

Spark 参数调优

参考 spark 官网文档:http://spark.apache.org/docs/…

Shuffle 相干

Shuffle 操作大略是对 Spark 性能影响最大的步骤之一(因为可能波及到排序,磁盘 IO,网络 IO 等泛滥 CPU 或 IO 密集的操作),这也是为什么在 Spark 1.1 的代码中对整个 Shuffle 框架代码进行了重构,将 Shuffle 相干读写操作形象封装到 Pluggable 的 Shuffle Manager 中,便于试验和实现不同的 Shuffle 功能模块。例如为了解决 Hash Based 的 Shuffle Manager 在文件读写效率方面的问题而实现的 Sort Base 的 Shuffle Manager。

spark.shuffle.manager

用来配置所应用的 Shuffle Manager,目前可选的 Shuffle Manager 包含默认的 org.apache.spark.shuffle.sort.HashShuffleManager(配置参数值为 hash)和新的 org.apache.spark.shuffle.sort.SortShuffleManager(配置参数值为 sort)。

这两个 ShuffleManager 如何抉择呢,首先须要理解他们在实现形式上的区别。

HashShuffleManager,故名思义也就是在 Shuffle 的过程中写数据时不做排序操作,只是将数据依据 Hash 的后果,将各个 Reduce 分区的数据写到各自的磁盘文件中。带来的问题就是如果 Reduce 分区的数量比拟大的话,将会产生大量的磁盘文件。如果文件数量特地微小,对文件读写的性能会带来比拟大的影响,此外因为同时关上的文件句柄数量泛滥,序列化,以及压缩等操作须要调配的长期内存空间也可能会迅速收缩到无奈承受的境地,对内存的应用和 GC 带来很大的压力,在 Executor 内存比拟小的状况下尤为突出,例如 Spark on Yarn 模式。

SortShuffleManager,是 1.1 版本之后实现的一个试验性(也就是一些性能和接口还在开发演变中)的 ShuffleManager,它在写入分区数据的时候,首先会依据理论状况对数据采纳不同的形式进行排序操作,底线是至多依照 Reduce 分区 Partition 进行排序,这样来至于同一个 Map 工作 Shuffle 到不同的 Reduce 分区中去的所有数据都能够写入到同一个内部磁盘文件中去,用简略的 Offset 标记不同 Reduce 分区的数据在这个文件中的偏移量。这样一个 Map 工作就只须要生成一个 shuffle 文件,从而防止了上述 HashShuffleManager 可能遇到的文件数量微小的问题

两者的性能比拟,取决于内存,排序,文件操作等因素的综合影响。

对于不须要进行排序的 Shuffle 操作来说,如 repartition 等,如果文件数量不是特地微小,HashShuffleManager 面临的内存问题不大,而 SortShuffleManager 须要额定的依据 Partition 进行排序,显然 HashShuffleManager 的效率会更高。

而对于原本就须要在 Map 端进行排序的 Shuffle 操作来说,如 ReduceByKey 等,应用 HashShuffleManager 尽管在写数据时不排序,但在其它的步骤中依然须要排序,而 SortShuffleManager 则能够将写数据和排序两个工作合并在一起执行,因而即便不思考 HashShuffleManager 的内存应用问题,SortShuffleManager 仍旧可能更快。

spark.shuffle.sort.bypassMergeThreshold

这个参数仅实用于 SortShuffleManager,如前所述,SortShuffleManager 在解决不须要排序的 Shuffle 操作时,因为排序带来性能的降落。这个参数决定了在这种状况下,当 Reduce 分区的数量小于多少的时候,在 SortShuffleManager 外部不应用 Merge Sort 的形式解决数据,而是与 Hash Shuffle 相似,间接将分区文件写入独自的文件,不同的是,在最初一步还是会将这些文件合并成一个独自的文件。这样通过去除 Sort 步骤来放慢处理速度,代价是须要并发关上多个文件,所以内存消耗量减少,实质上是绝对 HashShuffleMananger 一个折衷方案。这个参数的默认值是 200 个分区,如果内存 GC 问题重大,能够升高这个值。

spark.shuffle.consolidateFiles

这个配置参数仅实用于 HashShuffleMananger 的实现,同样是为了解决生成过多文件的问题,采纳的形式是在不同批次运行的 Map 工作之间重用 Shuffle 输入文件,也就是说合并的是不同批次的 Map 工作的输入数据,然而每个 Map 工作所须要的文件还是取决于 Reduce 分区的数量,因而,它并不缩小同时关上的输入文件的数量,因而对内存使用量的缩小并没有帮忙。只是 HashShuffleManager 里的一个折中的解决方案。

须要留神的是,这部分的代码实现只管原理上说很简略,然而波及到底层具体的文件系统的实现和限度等因素,例如在并发拜访等方面,须要解决的细节很多,因而始终存在着这样那样的 bug 或者问题,导致在例如 EXT3 上应用时,特定状况下性能反而可能降落,因而从 Spark 0.8 的代码开始,始终到 Spark 1.1 的代码为止也还没有被标记为 Stable,不是默认采纳的形式。此外因为并不缩小同时关上的输入文件的数量,因而对性能具体能带来多大的改善也取决于具体的文件数量的状况。所以即便你面临着 Shuffle 文件数量微小的问题,这个配置参数是否应用,在什么版本中能够应用,也最好还是理论测试当前再决定。

spark.shuffle.spill

shuffle 的过程中,如果波及到排序,聚合等操作,势必会须要在内存中保护一些数据结构,进而占用额定的内存。如果内存不够用怎么办,那只有两条路能够走,一就是 out of memory 出错了,二就是将局部数据长期写到内部存储设备中去,最初再合并到最终的 Shuffle 输入文件中去。

这里 spark.shuffle.spill 决定是否 Spill 到内部存储设备(默认关上), 如果你的内存足够应用,或者数据集足够小,当然也就不须要 Spill,毕竟 Spill 带来了额定的磁盘操作。

spark.shuffle.memoryFraction / spark.shuffle.safetyFraction

在启用 Spill 的状况下,spark.shuffle.memoryFraction(1.1 后默认为 0.2)决定了当 Shuffle 过程中应用的内存达到总内存多少比例的时候开始 Spill。

通过 spark.shuffle.memoryFraction 能够调整 Spill 的触发条件,即 Shuffle 占用内存的大小,进而调整 Spill 的频率和 GC 的行为。总的来说,如果 Spill 太过频繁,能够适当减少 spark.shuffle.memoryFraction 的大小,减少用于 Shuffle 的内存,缩小 Spill 的次数。当然这样一来为了防止内存溢出,对应的可能须要缩小 RDD cache 占用的内存,即减小 spark.storage.memoryFraction 的值,这样 RDD cache 的容量缩小,有可能带来性能影响,因而须要综合思考。

因为 Shuffle 数据的大小是估算进去的,一来为了升高开销,并不是每减少一个数据项都残缺的估算一次,二来估算也会有误差,所以理论暂用的内存可能比估算值要大,这里 spark.shuffle.safetyFraction(默认为 0.8)用来作为一个保险系数,升高理论 Shuffle 应用的内存阀值,减少肯定的缓冲,升高理论内存占用超过用户配置值的概率。

spark.shuffle.spill.compress / spark.shuffle.compress

这两个配置参数都是用来设置 Shuffle 过程中是否应用压缩算法对 Shuffle 数据进行压缩,前者针对 Spill 的两头数据,后者针对最终的 shuffle 输入文件,默认都是 True

实践上说,spark.shuffle.compress 设置为 True 通常都是正当的,因为如果应用千兆以下的网卡,网络带宽往往最容易成为瓶颈。此外,目前的 Spark 任务调度实现中,以 Shuffle 划分 Stage,下一个 Stage 的工作是要期待上一个 Stage 的工作全副实现当前能力开始执行,所以 shuffle 数据的传输和 CPU 计算工作之间通常不会重叠,这样 Shuffle 数据传输量的大小和所需的工夫就间接影响到了整个工作的实现速度。然而压缩也是要耗费大量的 CPU 资源的,所以关上压缩选项会减少 Map 工作的执行工夫,因而如果在 CPU 负载的影响远大于磁盘和网络带宽的影响的场合下,也可能将 spark.shuffle.compress 设置为 False 才是最佳的计划

对于 spark.shuffle.spill.compress 而言,状况相似,然而 spill 数据不会被发送到网络中,仅仅是长期写入本地磁盘,而且在一个工作中同时须要执行压缩和解压缩两个步骤,所以对 CPU 负载的影响会更大一些,而磁盘带宽(如果标配 12HDD 的话)可能往往不会成为 Spark 利用的次要问题,所以这个参数相对而言,或者更有机会须要设置为 False。

总之,Shuffle 过程中数据是否应该压缩,取决于 CPU/DISK/NETWORK 的理论能力和负载,应该综合思考。

Storage 相干配置参数

spark.local.dir

这个看起来很简略,就是 Spark 用于写两头数据,如 RDD Cache,Shuffle,Spill 等数据的地位,那么有什么能够留神的呢。

首先,最根本的当然是咱们能够配置多个门路(用逗号分隔)到多个磁盘上减少整体 IO 带宽,这个大家都晓得。

其次,目前的实现中,Spark 是通过对文件名采纳 hash 算法散布到多个门路下的目录中去,如果你的存储设备有快有慢,比方 SSD+HDD 混合应用,那么你能够通过在 SSD 上配置更多的目录门路来增大它被 Spark 应用的比例,从而更好地利用 SSD 的 IO 带宽能力。当然这只是一种变通的办法,终极解决方案还是应该像目前 HDFS 的实现方向一样,让 Spark 可能感知具体的存储设备类型,针对性的应用。

须要留神的是,在 Spark 1.0 当前,SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) 参数会笼罩这个配置。比方 Spark On YARN 的时候,Spark Executor 的本地门路依赖于 Yarn 的配置,而不取决于这个参数。

spark.executor.memory

Executor 内存的大小,和性能自身当然并没有间接的关系,然而简直所有运行时性能相干的内容都或多或少间接和内存大小相干。这个参数最终会被设置到 Executor 的 JVM 的 heap 尺寸上,对应的就是 Xmx 和 Xms 的值

实践上 Executor 内存当然是多多益善,然而理论受机器配置,以及运行环境,资源共享,JVM GC 效率等因素的影响,还是有可能须要为它设置一个正当的大小。多大算正当,要看理论状况

Executor 的内存基本上是 Executor 外部所有工作共享的,而每个 Executor 上能够反对的工作的数量取决于 Executor 所治理的 CPU Core 资源的多少,因而你须要理解每个工作的数据规模的大小,从而推算出每个 Executor 大抵须要多少内存即可满足根本的需要。

如何晓得每个工作所需内存的大小呢,这个很难对立的掂量,因为除了数据集自身的开销,还包含算法所需各种长期内存空间的应用,而依据具体的代码算法等不同,长期内存空间的开销也不同。然而数据集自身的大小,对最终所需内存的大小还是有肯定的参考意义的。

通常来说每个分区的数据集在内存中的大小,可能是其在磁盘上源数据大小的若干倍(不思考源数据压缩,Java 对象绝对于原始裸数据也还要算上用于治理数据的数据结构的额定开销),须要精确的晓得大小的话,能够将 RDD cache 在内存中,从 BlockManager 的 Log 输入能够看到每个 Cache 分区的大小(其实也是估算进去的,并不齐全精确)

如:BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134 (size: 495.3 MB)

反过来说,如果你的 Executor 的数量和内存大小受机器物理配置影响绝对固定,那么你就须要正当布局每个分区工作的数据规模,例如采纳更多的分区,用减少工作数量(进而须要更多的批次来运算所有的工作)的形式来减小每个工作所需解决的数据大小。

spark.storage.memoryFraction

如后面所说 spark.executor.memory 决定了每个 Executor 可用内存的大小,而 spark.storage.memoryFraction 则决定了在这部分内存中有多少能够用于 Memory Store 治理 RDD Cache 数据,剩下的内存用来保障工作运行时各种其它内存空间的须要。

spark.executor.memory 默认值为 0.6,官网文档倡议这个比值不要超过 JVM Old Gen 区域的比值。这也很容易了解,因为 RDD Cache 数据通常都是长期驻留内存的,实践上也就是说最终会被转移到 Old Gen 区域(如果该 RDD 还没有被删除的话),如果这部分数据容许的尺寸太大,势必把 Old Gen 区域占满,造成频繁的 FULL GC。

如何调整这个比值,取决于你的利用对数据的应用模式和数据的规模,粗略的来说,如果频繁产生 Full GC,能够思考升高这个比值,这样 RDD Cache 可用的内存空间缩小(剩下的局部 Cache 数据就须要通过 Disk Store 写到磁盘上了),会带来肯定的性能损失,然而腾出更多的内存空间用于执行工作,缩小 Full GC 产生的次数,反而可能改善程序运行的整体性能

spark.streaming.blockInterval

这个参数用来设置 Spark Streaming 里 Stream Receiver 生成 Block 的工夫距离,默认为 200ms。具体的行为表现是具体的 Receiver 所接管的数据,每隔这里设定的工夫距离,就从 Buffer 中生成一个 StreamBlock 放进队列,期待进一步被存储到 BlockManager 中供后续计算过程应用。实践上来说,为了每个 Streaming Batch 距离里的数据是平均的,这个工夫距离当然应该能被 Batch 的间隔时间长度所整除。总体来说,如果内存大小够用,Streaming 的数据来得及解决,这个 blockInterval 工夫距离的影响不大,当然,如果数据 Cache Level 是 Memory+Ser,即做了序列化解决,那么 BlockInterval 的大小会影响序列化后数据块的大小,对于 Java 的 GC 的行为会有一些影响。

此外 spark.streaming.blockQueueSize 决定了在 StreamBlock 被存储到 BlockMananger 之前,队列中最多能够包容多少个 StreamBlock。默认为 10,因为这个队列 Poll 的工夫距离是 100ms,所以如果 CPU 不是特地忙碌的话,基本上应该没有问题。

压缩和序列化相干

spark.serializer

默认为 org.apache.spark.serializer.JavaSerializer, 可选 org.apache.spark.serializer.KryoSerializer, 实际上只有是 org.apache.spark.serializer 的子类就能够了, 不过如果只是利用, 大略你不会本人去实现一个的。

序列化对于 spark 利用的性能来说, 还是有很大影响的, 在特定的数据格式的状况下,KryoSerializer 的性能能够达到 JavaSerializer 的 10 倍以上, 当然放到整个 Spark 程序中来考量, 比重就没有那么大了, 然而以 Wordcount 为例,通常也很容易达到 30% 以上的性能晋升。而对于一些 Int 之类的根本类型数据,性能的晋升就简直能够疏忽了。KryoSerializer 依赖 Twitter 的 Chill 库来实现,绝对于 JavaSerializer,次要的问题在于不是所有的 Java Serializable 对象都能反对。

须要留神的是,这里可配的 Serializer 针对的对象是 Shuffle 数据,以及 RDD Cache 等场合,而 Spark Task 的序列化是通过 spark.closure.serializer 来配置,然而目前只反对 JavaSerializer,所以等于没法配置啦。

spark.rdd.compress

这个参数决定了 RDD Cache 的过程中,RDD 数据在序列化之后是否进一步进行压缩再贮存到内存或磁盘上。当然是为了进一步减小 Cache 数据的尺寸,对于 Cache 在磁盘上而言,相对大小大略没有太大关系,次要是思考 Disk 的 IO 带宽。而对于 Cache 在内存中,那次要就是思考尺寸的影响,是否可能 Cache 更多的数据,是否能减小 Cache 数据对 GC 造成的压力等。

这两者,前者通常不会是次要问题,尤其是在 RDD Cache 自身的目标就是谋求速度,缩小重算步骤,用 IO 换 CPU 的状况下。而后者,GC 问题当然是须要考量的,数据量小,占用空间少,GC 的问题大略会加重,然而是否真的须要走到 RDD Cache 压缩这一步,或者用其它形式来解决可能更加无效。

所以这个值默认是敞开的,然而如果在磁盘 IO 确实成为问题或者 GC 问题真的没有其它更好的解决办法的时候,能够思考启用 RDD 压缩。

spark.broadcast.compress

是否对 Broadcast 的数据进行压缩,默认值为 True。

Broadcast 机制是用来缩小运行每个 Task 时,所须要发送给 TASK 的 RDD 所应用到的相干数据的尺寸,一个 Executor 只须要在第一个 Task 启动时,取得一份 Broadcast 数据,之后的 Task 都从本地的 BlockManager 中获取相干数据。在 1.1 最新版本的代码中,RDD 自身也改为以 Broadcast 的模式发送给 Executor(之前的实现 RDD 自身是随每个工作发送的),因而基本上不太须要显式的决定哪些数据须要 broadcast 了。

因为 Broadcast 的数据须要通过网络发送,而在 Executor 端又须要存储在本地 BlockMananger 中,加上最新的实现,默认 RDD 通过 Boradcast 机制发送,因而大大增加了 Broadcast 变量的比重,所以通过压缩减小尺寸,来缩小网络传输开销和内存占用,通常都是有利于进步整体性能的。

什么状况可能不压缩更好呢,大抵上集体感觉同样还是在网络带宽和内存不是问题的时候,如果 Driver 端 CPU 资源很成问题(毕竟压缩的动作根本都在 Driver 端执行),那或者有调整的必要。

spark.io.compression.codec[]

RDD Cache 和 Shuffle 数据压缩所采纳的算法 Codec,默认值已经是应用 LZF 作为默认 Codec,最近因为 LZF 的内存开销的问题,默认的 Codec 曾经改为 Snappy。

LZF 和 Snappy 相比拟,前者压缩率比拟高(当然要看具体数据内容了,通常要高 20% 左右),然而除了内存问题以外,CPU 代价也大一些(大略也差 20%~50%?)

在用于 Shuffle 数据的场合下,内存方面,应该次要是在应用 HashShuffleManager 的时候有可能成为问题,因为如果 Reduce 分区数量微小,须要同时关上大量的压缩数据流用于写文件,进而在 Codec 方面须要大量的 buffer。然而如果应用 SortShuffleManager,因为 shuffle 文件数量大大减少,不会产生大量的压缩数据流,所以内存开销大略不会成为次要问题。

剩下的就是 CPU 和压缩率的衡量取舍,和后面一样,取决于 CPU/ 网络 / 磁盘的能力和负载,集体认为 CPU 通常更容易成为瓶颈。所以要调整性能,要不不压缩,要不应用 Snappy 可能性大一些?

对于 RDD Cache 的场合来说,绝大多数场合都是内存操作或者本地 IO,所以 CPU 负载的问题可能比 IO 的问题更加突出,这也是为什么 spark.rdd.compress 自身默认为不压缩,如果要压缩,大略也是 Snappy 适合一些?

schedule 调度相干

调度相干的参数设置,大多数内容都很直白,其实毋庸过多的额定解释,不过基于这些参数的罕用性(大略会是你针对本人的集群第一步就会配置的参数),这里多少就其外部机制做一些解释。

spark.cores.max

一个集群最重要的参数之一,当然就是 CPU 计算资源的数量。spark.cores.max 这个参数决定了在 Standalone 和 Mesos 模式下,一个 Spark 应用程序所能申请的 CPU Core 的数量。如果你没有并发跑多个 Spark 应用程序的需要,那么能够不须要设置这个参数,默认会应用 spark.deploy.defaultCores 的值(而 spark.deploy.defaultCores 的值默认为 Int.Max,也就是不限度的意思)从而应用程序能够应用所有以后能够取得的 CPU 资源。

针对这个参数须要留神的是,这个参数对 Yarn 模式不起作用,YARN 模式下,资源由 Yarn 对立调度治理,一个利用启动时所申请的 CPU 资源的数量由另外两个间接配置 Executor 的数量和每个 Executor 中 core 数量的参数决定。(历史起因造成,不同运行模式下的一些启动参数集体认为还有待进一步整合)

此外,在 Standalone 模式等后盾调配 CPU 资源时,目前的实现中,在 spark.cores.max 容许的范畴内,基本上是优先从每个 Worker 中申请所能失去的最大数量的 CPU core 给每个 Executor,因而如果人工限度了所申请的 Max Core 的数量小于 Standalone 和 Mesos 模式所治理的 CPU 数量,可能产生利用只运行在集群中局部节点上的状况(因为局部节点所能提供的最大 CPU 资源数量曾经满足利用的要求),而不是均匀散布在集群中。通常这不会是太大的问题,然而如果波及数据本地性的场合,有可能就会带来肯定的必须进行近程数据读取的状况产生。实践上,这个问题能够通过两种路径解决:一是 Standalone 和 Mesos 的资源管理模块主动依据节点资源状况,平均调配和启动 Executor,二是和 Yarn 模式一样,容许用户指定和限度每个 Executor 的 Core 的数量。社区中有一个 PR 试图走第二种路径来解决相似的问题,不过截至我写下这篇文档为止(2014.8),还没有被 Merge。

spark.task.cpus

这个参数在字面上的意思就是调配给每个工作的 CPU 的数量,默认为 1。实际上,这个参数并不能真的管制每个工作理论运行时所应用的 CPU 的数量,比方你能够通过在工作外部创立额定的工作线程来应用更多的 CPU(至多目前为止,未来工作的执行环境是否能通过 LXC 等技术来管制还不好说)。它所施展的作用,只是在作业调度时,每调配出一个工作时,对已应用的 CPU 资源进行计数。也就是说只是实践上用来统计资源的应用状况,便于安顿调度。因而,如果你冀望通过批改这个参数来放慢工作的运行,那还是连忙换个思路吧。这个参数的意义,集体感觉还是在你真的在工作外部本人通过任何伎俩,占用了更多的 CPU 资源时,让调度行为更加精确的一个辅助伎俩。

spark.scheduler.mode

这个参数决定了单个 Spark 利用外部调度的时候应用 FIFO 模式还是 Fair 模式。是的,你没有看错,这个参数只治理一个 Spark 利用外部的多个没有依赖关系的 Job 作业的调度策略。

如果你须要的是多个 Spark 利用之间的调度策略,那么在 Standalone 模式下,这取决于每个利用所申请和取得的 CPU 资源的数量(临时没有取得资源的利用就 Pending 在那里了),基本上就是 FIFO 模式的,谁先申请和取得资源,谁就占用资源直到实现。而在 Yarn 模式下,则多个 Spark 利用间的调度策略由 Yarn 本人的策略配置文件所决定。

那么这个外部的调度逻辑有什么用呢?如果你的 Spark 利用是通过服务的模式,为多个用户提交作业的话,那么能够通过配置 Fair 模式相干参数来调整不同用户作业的调度和资源分配优先级。

spark.locality.wait

spark.locality.wait 和 spark.locality.wait.process,spark.locality.wait.node, spark.locality.wait.rack 这几个参数影响了任务分配时的本地性策略的相干细节。

Spark 中工作的解决须要思考所波及的数据的本地性的场合,根本就两种,一是数据的起源是 HadoopRDD; 二是 RDD 的数据起源来自于 RDD Cache(即由 CacheManager 从 BlockManager 中读取,或者 Streaming 数据源 RDD)。其它状况下,如果不波及 shuffle 操作的 RDD,不形成划分 Stage 和 Task 的基准,不存在判断 Locality 本地性的问题,而如果是 ShuffleRDD,其本地性始终为 No Prefer,因而其实也无所谓 Locality。

在现实的状况下,工作当然是调配在能够从本地读取数据的节点上时(同一个 JVM 外部或同一台物理机器外部)的运行时性能最佳。然而每个工作的执行速度无奈精确预计,所以很难在当时取得全局最优的执行策略,当 Spark 利用失去一个计算资源的时候,如果没有能够满足最佳本地性需要的工作能够运行时,是退而求其次,运行一个本地性条件稍差一点的工作呢,还是持续期待下一个可用的计算资源已冀望它能更好的匹配工作的本地性呢?

这几个参数一起决定了 Spark 任务调度在失去分配任务时,抉择临时不分配任务,而是期待取得满足过程外部 / 节点外部 / 机架外部这样的不同档次的本地性资源的最长等待时间。默认都是 3000 毫秒。

基本上,如果你的工作数量较大和单个工作运行工夫比拟长的状况下,单个工作是否在数据本地运行,代价区别可能比较显著,如果数据本地性不现实,那么调大这些参数对于性能优化可能会有肯定的益处。反之如果期待的代价超过带来的收益,那就不要思考了。

特地值得注意的是:在解决利用刚启动后提交的第一批工作时,因为当作业调度模块开始工作时,解决工作的 Executors 可能还没有齐全注册结束,因而一部分的工作会被搁置到 No Prefer 的队列中,这部分工作的优先级仅次于数据本地性满足 Process 级别的工作,从而被优先调配到非本地节点执行,如果确实没有 Executors 在对应的节点上运行,或者确实是 No Prefer 的工作(如 shuffleRDD),这样做的确是比拟优化的抉择,然而这里的理论状况只是这部分 Executors 还没来得及注册上而已。这种状况下,即便加大本节中这几个参数的数值也没有帮忙。针对这个状况,有一些曾经实现的和正在进行中的 PR 通过例如动静调整 No Prefer 队列,监控节点注册比例等等形式试图来给出更加智能的解决方案。不过,你也能够依据本身集群的启动状况,通过在创立 SparkContext 之后,被动 Sleep 几秒的形式来简略的解决这个问题。

spark.speculation

spark.speculation 以及 spark.speculation.interval, spark.speculation.quantile, spark.speculation.multiplier 等参数调整 Speculation 行为的具体细节,Speculation 是在任务调度的时候,如果没有适宜以后本地性要求的工作可供运行,将跑得慢的工作在闲暇计算资源上再度调度的行为,这些参数调整这些行为的频率和判断指标,默认是不应用 Speculation 的。

通常来说很难正确的判断是否须要 Speculation,能真正施展 Speculation 用途的场合,往往是某些节点因为运行环境起因,比方 CPU 资源因为某种原因被占用,磁盘损坏导致 IO 迟缓造成工作执行速度异样的状况,当然前提是你的分区工作不存在仅能被执行一次,或者不能同时执行多个拷贝等状况。Speculation 工作参照的指标通常是其它工作的执行工夫,而理论的工作可能因为分区数据尺寸不平均,原本就会有工夫差别,加上肯定的调度和 IO 的随机性,所以如果一致性指标定得过严,Speculation 可能并不能真的发现问题,反而减少了不必要的工作开销,定得过宽,大略又根本相当于没用。

集体感觉,如果你的集群规模比拟大,运行环境简单,确实可能常常产生执行异样,加上数据分区尺寸差别不大,为了程序运行工夫的稳定性,那么能够思考认真调整这些参数。否则还是思考如何排除造成工作执行速度异样的因数比拟靠铺一些。

当然,我没有理论在很大规模的集群上运行过 Spark,所以如果认识有些偏颇,还请有理论教训的 XD 斧正。

正文完
 0