共计 13417 个字符,预计需要花费 34 分钟才能阅读完成。
Spark 调优
spark 调优常见伎俩,在生产中经常会遇到各种各样的问题,有事先起因,有事中起因,也有不标准起因,spark 调优总结下来能够从上面几个点来调优。
1. 调配更多的资源
调配更多的资源:它是性能优化调优的王道,就是减少和调配更多的资源,这对于性能和速度上的晋升是不言而喻的,基本上,在肯定范畴之内,减少资源与性能的晋升,是成正比的;写完了一个简单的 spark 作业之后,进行性能调优的时候,首先第一步,就是要来调节最优的资源配置;在这个根底之上,如果说你的 spark 作业,可能调配的资源达到了你的能力范畴的顶端之后,无奈再调配更多的资源了,公司资源无限;那么才是思考去做前面的这些性能调优的点。相干问题:(1)调配哪些资源?(2)在哪里能够设置这些资源?(3)分析为什么调配这些资源之后,性能能够失去晋升?
1.1 调配哪些资源
executor-memory、executor-cores、driver-memory
1.2 在哪里能够设置这些资源
在理论的生产环境中,提交 spark 工作时,应用 spark-submit shell 脚本,在外面调整对应的参数。提交工作的脚本:
spark-submit \
--master spark://node1:7077 \
--class com.hoult.WordCount \
--num-executors 3 \ 配置 executor 的数量
--driver-memory 1g \ 配置 driver 的内存(影响不大)--executor-memory 1g \ 配置每一个 executor 的内存大小
--executor-cores 3 \ 配置每一个 executor 的 cpu 个数
/export/servers/wordcount.jar
1.2 参数调节到多大,算是最大
- ==Standalone 模式 ==
先计算出公司 spark 集群上的所有资源 每台节点的内存大小和 cpu 核数,比方:一共有 20 台 worker 节点,每台节点 8g 内存,10 个 cpu。理论工作在给定资源的时候,能够给 20 个 executor、每个 executor 的内存 8g、每个 executor 的应用的 cpu 个数 10。
- ==Yarn 模式 ==
先计算出 yarn 集群的所有大小,比方一共 500g 内存,100 个 cpu;这个时候能够调配的最大资源,比方给定 50 个 executor、每个 executor 的内存大小 10g, 每个 executor 应用的 cpu 个数为 2。
- 应用准则
在资源比拟短缺的状况下,尽可能的应用更多的计算资源,尽量去调节到最大的大小
1.3 为什么调大资源当前性能能够晋升
--executor-memory
--total-executor-cores
2. 进步并行度
2.1 Spark 的并行度指的是什么
spark 作业中,各个 stage 的 task 的数量,也就代表了 spark 作业在各个阶段 stage 的并行度!当调配完所能调配的最大资源了,而后对应资源去调节程序的并行度,如果并行度没有与资源相匹配,那么导致你调配上来的资源都节约掉了。同时并行运行,还能够让每个 task 要解决的数量变少(很简略的原理。正当设置并行度,能够充分利用集群资源,缩小每个 task 解决数据量,而减少性能放慢运行速度。)
2.2 如何进步并行度
2.2.1 能够设置 task 的数量
至多设置成与 spark Application 的总 cpu core 数量雷同。最现实状况,150 个 core,调配 150task,一起运行,差不多同一时间运行结束
官网举荐,task 数量,设置成 spark Application 总 cpu core 数量的 2~3 倍。比方 150 个 cpu core,根本设置 task 数量为 300~500. 与现实状况不同的,有些 task 会运行快一点,比方 50s 就完了,有些 task 可能会慢一点,要一分半才运行完,所以如果你的 task 数量,刚好设置的跟 cpu core 数量雷同,可能会导致资源的节约。因为比方 150 个 task 中 10 个先运行完了,残余 140 个还在运行,然而这个时候,就有 10 个 cpu core 闲暇进去了,导致节约。如果设置 2~3 倍,那么一个 task 运行完当前,另外一个 task 马上补上来,尽量让 cpu core 不要闲暇。同时尽量晋升 spark 运行效率和速度。晋升性能。
2.2.2 如何设置 task 数量来进步并行度
设置参数 spark.default.parallelism
默认是没有值的,如果设置了值为 10,它会在 shuffle 的过程才会起作用。比方 val rdd2 = rdd1.reduceByKey(_+_)
此时 rdd2 的分区数就是 10
能够通过在构建 SparkConf 对象的时候设置,例如:new SparkConf().set("spark.defalut.parallelism","500")
2.2.3 给 RDD 从新设置 partition 的数量
应用 rdd.repartition 来从新分区,该办法会生成一个新的 rdd,使其分区数变大。此时因为一个 partition 对应一个 task,那么对应的 task 个数越多,通过这种形式也能够进步并行度。
2.2.4 进步 sparksql 运行的 task 数量
http://spark.apache.org/docs/2.3.3/sql-programming-guide.html
通过设置参数 spark.sql.shuffle.partitions=500 默认为 200;能够适当增大,来进步并行度。比方设置为 spark.sql.shuffle.partitions=500
专门针对 sparkSQL 来设置的
3. RDD 的重用和长久化
3.1 理论开发遇到的状况阐明
如上图所示的计算逻辑:(1)当第一次应用 rdd2 做相应的算子操作失去 rdd3 的时候,就会从 rdd1 开始计算,先读取 HDFS 上的文件,而后对 rdd1 做对应的算子操作失去 rdd2, 再由 rdd2 计算之后失去 rdd3。同样为了计算失去 rdd4,后面的逻辑会被从新计算。(3)默认状况下屡次对一个 rdd 执行算子操作,去获取不同的 rdd,都会对这个 rdd 及之前的父 rdd 全副从新计算一次。这种状况在理论开发代码的时候会常常遇到,然而咱们肯定要防止一个 rdd 反复计算屡次,否则会导致性能急剧升高。总结:能够把屡次应用到的 rdd,也就是公共 rdd 进行长久化,防止后续须要,再次从新计算,晋升效率。
3.2 如何对 rdd 进行长久化
- 能够调用 rdd 的 cache 或者 persist 办法。
(1)cache 办法默认是把数据长久化到内存中,例如:rdd.cache,其本质还是调用了 persist 办法(2)persist 办法中有丰盛的缓存级别,这些缓存级别都定义在 StorageLevel 这个 object 中,能够结合实际的利用场景正当的设置缓存级别。例如:rdd.persist(StorageLevel.MEMORY_ONLY), 这是 cache 办法的实现。
3.3 rdd 长久化的时能够采纳序列化
(1)如果失常将数据长久化在内存中,那么可能会导致内存的占用过大,这样的话,兴许会导致 OOM 内存溢出。(2)当纯内存无奈撑持公共 RDD 数据齐全寄存的时候,就优先思考应用序列化的形式在纯内存中存储。将 RDD 的每个 partition 的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。(3)序列化的形式,惟一的毛病就是,在获取数据的时候,须要反序列化。然而能够缩小占用的空间和便于网络传输(4)如果序列化纯内存形式,还是导致 OOM,内存溢出;就只能思考磁盘的形式,内存 + 磁盘的一般形式(无序列化)。(5)为了数据的高可靠性,而且内存短缺,能够应用双正本机制,进行长久化
长久化的双正本机制,长久化后的一个正本,因为机器宕机了,正本丢了,就还是得从新计算一次;长久化的每个数据单元,存储一份正本,放在其余节点下面,从而进行容错;一个正本丢了,不必从新计算,还能够应用另外一份正本。这种形式,仅仅针对你的内存资源极度短缺。比方: StorageLevel.MEMORY_ONLY_2
4. 播送变量的应用
4.1 场景形容
在理论工作中可能会遇到这样的状况,因为要解决的数据量十分大,这个时候可能会在一个 stage 中呈现大量的 task,比方有 1000 个 task,这些 task 都须要一份雷同的数据来解决业务,这份数据的大小为 100M,该数据会拷贝 1000 份正本,通过网络传输到各个 task 中去,给 task 应用。这里会波及大量的网络传输开销,同时至多须要的内存为 1000*100M=100G,这个内存开销是十分大的。不必要的内存的耗费和占用,就导致了你在进行 RDD 长久化到内存,兴许就没法齐全在内存中放下;就只能写入磁盘,最初导致后续的操作在磁盘 IO 上耗费性能;这对于 spark 工作解决来说就是一场劫难。因为内存开销比拟大,task 在创建对象的时候,可能会呈现堆内存放不下所有对象,就会导致频繁的垃圾回收器的回收 GC。GC 的时候肯定是会导致工作线程进行,也就是导致 Spark 暂停工作那么一点工夫。频繁 GC 的话,对 Spark 作业的运行的速度会有相当可观的影响。
4.2 播送变量引入
Spark 中分布式执行的代码须要传递到各个 executor 的 task 上运行。对于一些只读、固定的数据, 每次都须要 Driver 播送到各个 Task 上,这样效率低下。播送变量容许将变量只播送给各个 executor。该 executor 上的各个 task 再从所在节点的 BlockManager(负责管理某个 executor 对应的内存和磁盘上的数据) 获取变量,而不是从 Driver 获取变量,从而晋升了效率。
播送变量,初始的时候,就在 Drvier 上有一份正本。通过在 Driver 把共享数据转换成播送变量。task 在运行的时候,想要应用播送变量中的数据,此时首先会在本人本地的 Executor 对应的 BlockManager 中,尝试获取变量正本;如果本地没有,那么就从 Driver 近程拉取播送变量正本,并保留在本地的 BlockManager 中;尔后这个 executor 上的 task,都会间接应用本地的 BlockManager 中的正本。那么这个时候所有该 executor 中的 task 都会应用这个播送变量的正本。也就是说一个 executor 只须要在第一个 task 启动时,取得一份播送变量数据,之后的 task 都从本节点的 BlockManager 中获取相干数据。executor 的 BlockManager 除了从 driver 上拉取,也可能从其余节点的 BlockManager 上拉取变量正本,网络间隔越近越好。
4.3 应用播送变量后的性能剖析
比方一个工作须要 50 个 executor,1000 个 task,共享数据为 100M。(1) 在不应用播送变量的状况下,1000 个 task,就须要该共享数据的 1000 个正本,也就是说有 1000 份数须要大量的网络传输和内存开销存储。消耗的内存大小 1000*100=100G.
(2) 应用了播送变量后,50 个 executor 就只须要 50 个正本数据,而且不肯定都是从 Driver 传输到每个节点,还可能是就近从最近的节点的 executor 的 blockmanager 上拉取播送变量正本,网络传输速度大大增加;内存开销 50*100M=5G
总结:不应用播送变量的内存开销为 100G,应用后的内存开销 5G,这里就相差了 20 倍左右的网络传输性能损耗和内存开销,应用播送变量后对于性能的晋升和影响,还是很可观的。播送变量的应用不肯定会对性能产生决定性的作用。比方运行 30 分钟的 spark 作业,可能做了播送变量当前,速度快了 2 分钟,或者 5 分钟。然而一点一滴的调优,千里之行; 始于足下。最初还是会有成果的。
4.4 播送变量应用注意事项
(1)能不能将一个 RDD 应用播送变量播送进来?不能,因为 RDD 是不存储数据的。能够将 RDD 的后果播送进来。(2)播送变量只能在 Driver 端定义,不能在 Executor 端定义。(3)在 Driver 端能够批改播送变量的值,在 Executor 端无奈批改播送变量的值。(4)如果 executor 端用到了 Driver 的变量,如果不应用播送变量在 Executor 有多少 task 就有多少 Driver 端的变量正本。(5)如果 Executor 端用到了 Driver 的变量,如果应用播送变量在每个 Executor 中只有一份 Driver 端的变量正本。
4.5 如何应用播送变量
- 例如
(1) 通过 sparkContext 的 broadcast 办法把数据转换成播送变量,类型为 Broadcast,val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
(2) 而后 executor 上的 BlockManager 就能够拉取该播送变量的正本获取具体的数据。获取播送变量中的值能够通过调用其 value 办法
val array: Array[Int] = broadcastArray.value
5. 尽量避免应用 shuffle 类算子
5.1 shuffle 形容
spark 中的 shuffle 波及到数据要进行大量的网络传输,上游阶段的 task 工作须要通过网络拉取上阶段 task 的输入数据,shuffle 过程,简略来说,就是将散布在集群中多个节点上的同一个 key,拉取到同一个节点上,进行聚合或 join 等操作。比方 reduceByKey、join 等算子,都会触发 shuffle 操作。如果有可能的话,要尽量避免应用 shuffle 类算子。因为 Spark 作业运行过程中,最耗费性能的中央就是 shuffle 过程。
5.2 哪些算子操作会产生 shuffle
spark 程序在开发的过程中应用 reduceByKey、join、distinct、repartition 等算子操作,这里都会产生 shuffle,因为 shuffle 这一块是十分消耗性能的,理论开发中尽量应用 map 类的非 shuffle 算子。这样的话,没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业,能够大大减少性能开销。
5.3 如何防止产生 shuffle
- 小案例
// 谬误的做法:// 传统的 join 操作会导致 shuffle 操作。// 因为两个 RDD 中,雷同的 key 都须要通过网络拉取到一个节点上,由一个 task 进行 join 操作。val rdd3 = rdd1.join(rdd2)
// 正确的做法:// Broadcast+map 的 join 操作,不会导致 shuffle 操作。// 应用 Broadcast 将一个数据量较小的 RDD 作为播送变量。val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在 rdd1.map 算子中,能够从 rdd2DataBroadcast 中,获取 rdd2 的所有数据。// 而后进行遍历,如果发现 rdd2 中某条数据的 key 与 rdd1 的以后数据的 key 是雷同的,那么就断定能够进行 join。// 此时就能够依据本人须要的形式,将 rdd1 以后数据与 rdd2 中能够连贯的数据,拼接在一起(String 或 Tuple)。val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 留神,以上操作,倡议仅仅在 rdd2 的数据量比拟少(比方几百 M,或者一两 G)的状况下应用。// 因为每个 Executor 的内存中,都会驻留一份 rdd2 的全量数据。
5.4 应用 map-side 预聚合的 shuffle 操作
- map-side 预聚合
如果因为业务须要,肯定要应用 shuffle 操作,无奈用 map 类的算子来代替,那么尽量应用能够 map-side 预聚合的算子。所谓的 map-side 预聚合,说的是在每个节点本地对雷同的 key 进行一次聚合操作,相似于 MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条雷同的 key,因为多条雷同的 key 都被聚合起来了。其余节点在拉取所有节点上的雷同 key 时,就会大大减少须要拉取的数据数量,从而也就缩小了磁盘 IO 以及网络传输开销。通常来说,在可能的状况下,倡议应用 reduceByKey 或者 aggregateByKey 算子来代替掉 groupByKey 算子。因为 reduceByKey 和 aggregateByKey 算子都会应用用户自定义的函数对每个节点本地的雷同 key 进行预聚合。而 groupByKey 算子是不会进行预聚合的,全量的数据会在集群的各个节点之间散发和传输,性能相对来说比拟差。比方如下两幅图,就是典型的例子,别离基于 reduceByKey 和 groupByKey 进行单词计数。其中第一张图是 groupByKey 的原理图,能够看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是 reduceByKey 的原理图,能够看到,每个节点本地的雷同 key 数据,都进行了预聚合,而后才传输到其余节点上进行全局聚合。
- ==groupByKey 进行单词计数原理 ==
- ==reduceByKey 单词计数原理 ==
6. 应用高性能的算子
6.1 应用 reduceByKey/aggregateByKey 代替 groupByKey
- reduceByKey/aggregateByKey 能够进行预聚合操作,缩小数据的传输量,晋升性能
- groupByKey 不会进行预聚合操作,进行数据的全量拉取,性能比拟低
6.2 应用 mapPartitions 代替一般 map
mapPartitions 类的算子,一次函数调用会解决一个 partition 所有的数据,而不是一次函数调用解决一条,性能相对来说会高一些。然而有的时候,应用 mapPartitions 会呈现 OOM(内存溢出)的问题。因为单次函数调用就要解决掉一个 partition 所有的数据,如果内存不够,垃圾回收时是无奈回收掉太多对象的,很可能呈现 OOM 异样。所以应用这类操作时要谨慎!
6.3 应用 foreachPartition 代替 foreach
原理相似于“应用 mapPartitions 代替 map”,也是一次函数调用解决一个 partition 的所有数据,而不是一次函数调用解决一条数据。在实践中发现,foreachPartitions 类的算子,对性能的晋升还是很有帮忙的。比方在 foreach 函数中,将 RDD 中所有数据写 MySQL,那么如果是一般的 foreach 算子,就会一条数据一条数据地写,每次函数调用可能就会创立一个数据库连贯,此时就势必会频繁地创立和销毁数据库连贯,性能是十分低下;然而如果用 foreachPartitions 算子一次性解决一个 partition 的数据,那么对于每个 partition,只有创立一个数据库连贯即可,而后执行批量插入操作,此时性能是比拟高的。实际中发现,对于 1 万条左右的数据量写 MySQL,性能能够晋升 30% 以上。
6.4 应用 filter 之后进行 coalesce 操作
通常对一个 RDD 执行 filter 算子过滤掉 RDD 中较多数据后(比方 30% 以上的数据),倡议应用 coalesce 算子,手动缩小 RDD 的 partition 数量,将 RDD 中的数据压缩到更少的 partition 中去。因为 filter 之后,RDD 的每个 partition 中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个 task 解决的 partition 中的数据量并不是很多,有一点资源节约,而且此时解决的 task 越多,可能速度反而越慢。因而用 coalesce 缩小 partition 数量,将 RDD 中的数据压缩到更少的 partition 之后,只有应用更少的 task 即可解决完所有的 partition。在某些场景下,对于性能的晋升会有肯定的帮忙。
6.5 应用 repartitionAndSortWithinPartitions 代替 repartition 与 sort 类操作
repartitionAndSortWithinPartitions 是 Spark 官网举荐的一个算子,官网倡议,如果须要在 repartition 重分区之后,还要进行排序,倡议间接应用 repartitionAndSortWithinPartitions 算子。因为该算子能够一边进行重分区的 shuffle 操作,一边进行排序。shuffle 与 sort 两个操作同时进行,比先 shuffle 再 sort 来说,性能可能是要高的。
7. 应用 Kryo 优化序列化性能
7.1 spark 序列化介绍
Spark 在进行工作计算的时候,会波及到数据跨过程的网络传输、数据的长久化,这个时候就须要对数据进行序列化。Spark 默认采纳 Java 的序列化器。默认 java 序列化的优缺点如下:
其益处:解决起来不便,不须要咱们手动做其余操作,只是在应用一个对象和变量的时候,须要实现 Serializble 接口。其毛病:默认的序列化机制的效率不高,序列化的速度比较慢;序列化当前的数据,占用的内存空间绝对还是比拟大。Spark 反对应用 Kryo 序列化机制。Kryo 序列化机制,比默认的 Java 序列化机制,速度要快,序列化后的数据要更小,大略是 Java 序列化机制的 1 /10。所以 Kryo 序列化优化当前,能够让网络传输的数据变少;在集群中消耗的内存资源大大减少。
7.2 Kryo 序列化启用后失效的中央
Kryo 序列化机制,一旦启用当前,会失效的几个中央:(1)算子函数中应用到的内部变量
算子中的内部变量可能来着与 driver 须要波及到网络传输,就须要用到序列化。最终能够优化网络传输的性能,优化集群中内存的占用和耗费(2)长久化 RDD 时进行序列化,StorageLevel.MEMORY_ONLY_SER
将 rdd 长久化时,对应的存储级别里,须要用到序列化。最终能够优化内存的占用和耗费;长久化 RDD 占用的内存越少,task 执行的时候,创立的对象,就不至于频繁的占满内存,频繁产生 GC。(3)产生 shuffle 的中央,也就是宽依赖
上游的 stage 中的 task,拉取上游 stage 中的 task 产生的后果数据,跨网络传输,须要用到序列化。最终能够优化网络传输的性能
7.3 如何开启 Kryo 序列化机制
// 创立 SparkConf 对象。val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为 KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
8. 应用 fastutil 优化数据格式
8.1 fastutil 介绍
fastutil 是扩大了 Java 规范汇合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了非凡类型的 map、set、list 和 queue;fastutil 可能提供更小的内存占用,更快的存取速度;咱们应用 fastutil 提供的汇合类,来代替本人平时应用的 JDK 的原生的 Map、List、Set.
8.2 fastutil 益处
fastutil 汇合类,能够减小内存的占用,并且在进行汇合的遍历、依据索引(或者 key)获取元素的值和设置元素的值的时候,提供更快的存取速度
8.3 Spark 中利用 fastutil 的场景和应用
8.3.1 算子函数应用了内部变量
(1)你能够应用 Broadcast 播送变量优化;(2)能够应用 Kryo 序列化类库,晋升序列化性能和效率;(3)如果内部变量是某种比拟大的汇合,那么能够思考应用 fastutil 改写内部变量;首先从源头上就缩小内存的占用 (fastutil),通过播送变量进一步缩小内存占用,再通过 Kryo 序列化类库进一步缩小内存占用。
8.3.2 算子函数里应用了比拟大的汇合 Map/List
在你的算子函数里,也就是 task 要执行的计算逻辑外面,如果有逻辑中,呈现,要创立比拟大的 Map、List 等汇合,可能会占用较大的内存空间,而且可能波及到耗费性能的遍历、存取等汇合操作;那么此时,能够思考将这些汇合类型应用 fastutil 类库重写,应用了 fastutil 汇合类当前,就能够在肯定水平上,缩小 task 创立进去的汇合类型的内存占用。防止 executor 内存频繁占满,频繁唤起 GC,导致性能降落。
8.3.3 fastutil 的应用
第一步:在 pom.xml 中援用 fastutil 的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
第二步:平时应用 List(Integer)的替换成 IntList 即可。List<Integer> 的 list 对应的到 fastutil 就是 IntList 类型
应用阐明:根本都是相似于 IntList 的格局,前缀就是汇合的元素类型;非凡的就是 Map,Int2IntMap,代表了 key-value 映射的元素类型。
9. 调节数据本地化期待时长
Spark 在 Driver 上对 Application 的每一个 stage 的 task 进行调配之前,都会计算出每个 task 要计算的是哪个分片数据,RDD 的某个 partition;Spark 的 task 调配算法,优先会心愿每个 task 正好调配到它要计算的数据所在的节点,这样的话就不必在网络间传输数据;然而通常来说,有时大失所望,可能 task 没有机会调配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以这种时候,通常来说,Spark 会期待一段时间,默认状况下是 3 秒(不是相对的,还有很多种状况,对不同的本地化级别,都会去期待),到最初切实是期待不了了,就会抉择一个比拟差的本地化级别,比如说将 task 调配到间隔要计算的数据所在节点比拟近的一个节点,而后进行计算。
9.1 本地化级别
(1)PROCESS_LOCAL:过程本地化
代码和数据在同一个过程中,也就是在同一个 executor 中;计算数据的 task 由 executor 执行,数据在 executor 的 BlockManager 中;性能最好(2)NODE_LOCAL:节点本地化
代码和数据在同一个节点中;比如说数据作为一个 HDFS block 块,就在节点上,而 task 在节点上某个 executor 中运行;或者是数据和 task 在一个节点上的不同 executor 中;数据须要在过程间进行传输;性能其次(3)RACK_LOCAL:机架本地化
数据和 task 在一个机架的两个节点上;数据须要通过网络在节点之间进行传输;性能比拟差(4)ANY:无限度
数据和 task 可能在集群中的任何中央,而且不在一个机架中;性能最差
9.2 数据本地化期待时长
spark.locality.wait,默认是 3s
首先采纳最佳的形式,期待 3s 后降级, 还是不行,持续降级..., 最初还是不行,只可能采纳最差的。
9.3 如何调节参数并且测试
批改 spark.locality.wait 参数,默认是 3s,能够减少
上面是每个数据本地化级别的等待时间,默认都是跟 spark.locality.wait 工夫雷同,默认都是 3s(可查看 spark 官网对应参数阐明,如下图所示)
spark.locality.wait.node
spark.locality.wait.process
spark.locality.wait.rack
在代码中设置:new SparkConf().set("spark.locality.wait","10")
而后把程序提交到 spark 集群中运行,留神察看日志,spark 作业的运行日志,举荐大家在测试的时候,先用 client 模式,在本地就间接能够看到比拟全的日志。日志外面会显示,starting task .... PROCESS LOCAL、NODE LOCAL.....
例如:Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes)
察看大部分 task 的数据本地化级别
如果大多都是 PROCESS_LOCAL,那就不必调节了。如果是发现,好多的级别都是 NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的期待时长。应该是要重复调节,每次调节完当前,再来运行,察看日志
看看大部分的 task 的本地化级别有没有晋升;看看整个 spark 作业的运行工夫有没有缩短。留神留神:在调节参数、运行工作的时候,别轻重倒置,本地化级别倒是晋升了,然而因为大量的期待时长,spark 作业的运行工夫反而减少了,那就还是不要调节了。
10. 基于 Spark 内存模型调优
10.1 spark 中 executor 内存划分
-
Executor 的内存次要分为三块
- 第一块是让 task 执行咱们本人编写的代码时应用;
- 第二块是让 task 通过 shuffle 过程拉取了上一个 stage 的 task 的输入后,进行聚合等操作时应用
- 第三块是让 RDD 缓存时应用
10.2 spark 的内存模型
在 spark1.6 版本以前 spark 的 executor 应用的动态内存模型,然而在 spark1.6 开始,多减少了一个对立内存模型。通过 spark.memory.useLegacyMode 这个参数去配置
默认这个值是 false,代表用的是新的动态内存模型;如果想用以前的动态内存模型,那么就要把这个值改为 true。
10.2.1 动态内存模型
实际上就是把咱们的一个 executor 分成了三局部,一部分是 Storage 内存区域,一部分是 execution 区域,还有一部分是其余区域。如果应用的动态内存模型,那么用这几个参数去管制:spark.storage.memoryFraction:默认 0.6
spark.shuffle.memoryFraction:默认 0.2
所以第三局部就是 0.2
如果咱们 cache 数据量比拟大,或者是咱们的播送变量比拟大,那咱们就把 spark.storage.memoryFraction 这个值调大一点。然而如果咱们代码外面没有播送变量,也没有 cache,shuffle 又比拟多,那咱们要把 spark.shuffle.memoryFraction 这值调大。
- 动态内存模型的毛病
咱们配置好了 Storage 内存区域和 execution 区域后,咱们的一个工作假如 execution 内存不够用了,然而它的 Storage 内存区域是闲暇的,两个之间不能相互借用,不够灵便,所以才进去咱们新的对立内存模型。
10.2.2 对立内存模型
动态内存模型先是预留了 300m 内存,避免内存溢出。动态内存模型把整体内存分成了两局部,由这个参数示意 spark.memory.fraction 这个指的默认值是 0.6 代表另外的一部分是 0.4,
而后 spark.memory.fraction 这部分又划分成为两个小局部。这两小局部共占整体内存的 0.6 . 这两局部其实就是:Storage 内存和 execution 内存。由 spark.memory.storageFraction 这个参数去调配,因为两个共占 0.6。如果 spark.memory.storageFraction 这个值配的是 0.5, 那阐明这 0.6 外面 storage 占了 0.5,也就是 executor 占了 0.3。
- 对立内存模型有什么特点呢?
Storage 内存和 execution 内存 能够互相借用。不必像动态内存模型那样死板,然而是有规定的
为什么受伤的都是 storage 呢?是因为 execution 外面的数据是马上就要用的,而 storage 里的数据不肯定马上就要用。
10.2.3 工作提交脚本参考
- 以下是一份 spark-submit 命令的示例,大家能够参考一下,并依据本人的理论状况进行调节
bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
10.2.4 集体教训
java.lang.OutOfMemoryError
ExecutorLostFailure
Executor exit code 为 143
executor lost
hearbeat time out
shuffle file lost
如果遇到以上问题,很有可能就是内存除了问题,能够先尝试减少内存。如果还是解决不了,那么请听下一次数据歪斜调优的课。
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注
正文完