Spark调优
spark调优常见伎俩,在生产中经常会遇到各种各样的问题,有事先起因,有事中起因,也有不标准起因,spark调优总结下来能够从上面几个点来调优。
1. 调配更多的资源
调配更多的资源: 它是性能优化调优的王道,就是减少和调配更多的资源,这对于性能和速度上的晋升是不言而喻的, 基本上,在肯定范畴之内,减少资源与性能的晋升,是成正比的;写完了一个简单的spark作业之后,进行性能调优的时候,首先第一步,就是要来调节最优的资源配置; 在这个根底之上,如果说你的spark作业,可能调配的资源达到了你的能力范畴的顶端之后,无奈再调配更多的资源了,公司资源无限;那么才是思考去做前面的这些性能调优的点。相干问题:(1)调配哪些资源?(2)在哪里能够设置这些资源?(3)分析为什么调配这些资源之后,性能能够失去晋升?
1.1 调配哪些资源
executor-memory、executor-cores、driver-memory
1.2 在哪里能够设置这些资源
在理论的生产环境中,提交spark工作时,应用spark-submit shell脚本,在外面调整对应的参数。 提交工作的脚本: spark-submit \ --master spark://node1:7077 \ --class com.hoult.WordCount \ --num-executors 3 \ 配置executor的数量 --driver-memory 1g \ 配置driver的内存(影响不大) --executor-memory 1g \ 配置每一个executor的内存大小 --executor-cores 3 \ 配置每一个executor的cpu个数 /export/servers/wordcount.jar
1.2 参数调节到多大,算是最大
- ==Standalone模式==
先计算出公司spark集群上的所有资源 每台节点的内存大小和cpu核数, 比方:一共有20台worker节点,每台节点8g内存,10个cpu。 理论工作在给定资源的时候,能够给20个executor、每个executor的内存8g、每个executor的应用的cpu个数10。
- ==Yarn模式==
先计算出yarn集群的所有大小,比方一共500g内存,100个cpu; 这个时候能够调配的最大资源,比方给定50个executor、每个executor的内存大小10g,每个executor应用的cpu个数为2。
- 应用准则
在资源比拟短缺的状况下,尽可能的应用更多的计算资源,尽量去调节到最大的大小
1.3 为什么调大资源当前性能能够晋升
--executor-memory--total-executor-cores
2. 进步并行度
2.1 Spark的并行度指的是什么
spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度! 当调配完所能调配的最大资源了,而后对应资源去调节程序的并行度,如果并行度没有与资源相匹配,那么导致你调配上来的资源都节约掉了。同时并行运行,还能够让每个task要解决的数量变少(很简略的原理。正当设置并行度,能够充分利用集群资源,缩小每个task解决数据量,而减少性能放慢运行速度。)
2.2 如何进步并行度
2.2.1 能够设置task的数量
至多设置成与spark Application 的总cpu core 数量雷同。最现实状况,150个core,调配150task,一起运行,差不多同一时间运行结束官网举荐,task数量,设置成spark Application 总cpu core数量的2~3倍 。 比方150个cpu core ,根本设置task数量为300~500. 与现实状况不同的,有些task会运行快一点,比方50s就完了,有些task 可能会慢一点,要一分半才运行完,所以如果你的task数量,刚好设置的跟cpu core 数量雷同,可能会导致资源的节约。 因为比方150个task中10个先运行完了,残余140个还在运行,然而这个时候,就有10个cpu core闲暇进去了,导致节约。如果设置2~3倍,那么一个task运行完当前,另外一个task马上补上来,尽量让cpu core不要闲暇。同时尽量晋升spark运行效率和速度。晋升性能。
2.2.2 如何设置task数量来进步并行度
设置参数spark.default.parallelism 默认是没有值的,如果设置了值为10,它会在shuffle的过程才会起作用。 比方 val rdd2 = rdd1.reduceByKey(_+_) 此时rdd2的分区数就是10 能够通过在构建SparkConf对象的时候设置,例如: new SparkConf().set("spark.defalut.parallelism","500")
2.2.3 给RDD从新设置partition的数量
应用rdd.repartition 来从新分区,该办法会生成一个新的rdd,使其分区数变大。此时因为一个partition对应一个task,那么对应的task个数越多,通过这种形式也能够进步并行度。
2.2.4 进步sparksql运行的task数量
http://spark.apache.org/docs/2.3.3/sql-programming-guide.html
通过设置参数 spark.sql.shuffle.partitions=500 默认为200;能够适当增大,来进步并行度。 比方设置为 spark.sql.shuffle.partitions=500
专门针对sparkSQL来设置的
3. RDD的重用和长久化
3.1 理论开发遇到的状况阐明
如上图所示的计算逻辑:(1)当第一次应用rdd2做相应的算子操作失去rdd3的时候,就会从rdd1开始计算,先读取HDFS上的文件,而后对rdd1做对应的算子操作失去rdd2,再由rdd2计算之后失去rdd3。同样为了计算失去rdd4,后面的逻辑会被从新计算。(3)默认状况下屡次对一个rdd执行算子操作,去获取不同的rdd,都会对这个rdd及之前的父rdd全副从新计算一次。这种状况在理论开发代码的时候会常常遇到,然而咱们肯定要防止一个rdd反复计算屡次,否则会导致性能急剧升高。总结:能够把屡次应用到的rdd,也就是公共rdd进行长久化,防止后续须要,再次从新计算,晋升效率。
3.2 如何对rdd进行长久化
- 能够调用rdd的cache或者persist办法。
(1)cache办法默认是把数据长久化到内存中 ,例如:rdd.cache ,其本质还是调用了persist办法(2)persist办法中有丰盛的缓存级别,这些缓存级别都定义在StorageLevel这个object中,能够结合实际的利用场景正当的设置缓存级别。例如: rdd.persist(StorageLevel.MEMORY_ONLY),这是cache办法的实现。
3.3 rdd长久化的时能够采纳序列化
(1)如果失常将数据长久化在内存中,那么可能会导致内存的占用过大,这样的话,兴许会导致OOM内存溢出。(2)当纯内存无奈撑持公共RDD数据齐全寄存的时候,就优先思考应用序列化的形式在纯内存中存储。将RDD的每个partition的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。(3)序列化的形式,惟一的毛病就是,在获取数据的时候,须要反序列化。然而能够缩小占用的空间和便于网络传输(4)如果序列化纯内存形式,还是导致OOM,内存溢出;就只能思考磁盘的形式,内存+磁盘的一般形式(无序列化)。(5)为了数据的高可靠性,而且内存短缺,能够应用双正本机制,进行长久化 长久化的双正本机制,长久化后的一个正本,因为机器宕机了,正本丢了,就还是得从新计算一次; 长久化的每个数据单元,存储一份正本,放在其余节点下面,从而进行容错; 一个正本丢了,不必从新计算,还能够应用另外一份正本。这种形式,仅仅针对你的内存资源极度短缺。 比方: StorageLevel.MEMORY_ONLY_2
4. 播送变量的应用
4.1 场景形容
在理论工作中可能会遇到这样的状况,因为要解决的数据量十分大,这个时候可能会在一个stage中呈现大量的task,比方有1000个task,这些task都须要一份雷同的数据来解决业务,这份数据的大小为100M,该数据会拷贝1000份正本,通过网络传输到各个task中去,给task应用。这里会波及大量的网络传输开销,同时至多须要的内存为1000*100M=100G,这个内存开销是十分大的。不必要的内存的耗费和占用,就导致了你在进行RDD长久化到内存,兴许就没法齐全在内存中放下;就只能写入磁盘,最初导致后续的操作在磁盘IO上耗费性能;这对于spark工作解决来说就是一场劫难。 因为内存开销比拟大,task在创建对象的时候,可能会呈现堆内存放不下所有对象,就会导致频繁的垃圾回收器的回收GC。GC的时候肯定是会导致工作线程进行,也就是导致Spark暂停工作那么一点工夫。频繁GC的话,对Spark作业的运行的速度会有相当可观的影响。
4.2 播送变量引入
Spark中分布式执行的代码须要传递到各个executor的task上运行。对于一些只读、固定的数据,每次都须要Driver播送到各个Task上,这样效率低下。播送变量容许将变量只播送给各个executor。该executor上的各个task再从所在节点的BlockManager(负责管理某个executor对应的内存和磁盘上的数据)获取变量,而不是从Driver获取变量,从而晋升了效率。
播送变量,初始的时候,就在Drvier上有一份正本。通过在Driver把共享数据转换成播送变量。 task在运行的时候,想要应用播送变量中的数据,此时首先会在本人本地的Executor对应的BlockManager中,尝试获取变量正本;如果本地没有,那么就从Driver近程拉取播送变量正本,并保留在本地的BlockManager中; 尔后这个executor上的task,都会间接应用本地的BlockManager中的正本。那么这个时候所有该executor中的task都会应用这个播送变量的正本。也就是说一个executor只须要在第一个task启动时,取得一份播送变量数据,之后的task都从本节点的BlockManager中获取相干数据。 executor的BlockManager除了从driver上拉取,也可能从其余节点的BlockManager上拉取变量正本,网络间隔越近越好。
4.3 应用播送变量后的性能剖析
比方一个工作须要50个executor,1000个task,共享数据为100M。(1)在不应用播送变量的状况下,1000个task,就须要该共享数据的1000个正本,也就是说有1000份数须要大量的网络传输和内存开销存储。消耗的内存大小1000*100=100G.(2)应用了播送变量后,50个executor就只须要50个正本数据,而且不肯定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的blockmanager上拉取播送变量正本,网络传输速度大大增加;内存开销 50*100M=5G总结: 不应用播送变量的内存开销为100G,应用后的内存开销5G,这里就相差了20倍左右的网络传输性能损耗和内存开销,应用播送变量后对于性能的晋升和影响,还是很可观的。 播送变量的应用不肯定会对性能产生决定性的作用。比方运行30分钟的spark作业,可能做了播送变量当前,速度快了2分钟,或者5分钟。然而一点一滴的调优,千里之行;始于足下。最初还是会有成果的。
4.4 播送变量应用注意事项
(1)能不能将一个RDD应用播送变量播送进来? 不能,因为RDD是不存储数据的。能够将RDD的后果播送进来。(2)播送变量只能在Driver端定义,不能在Executor端定义。(3)在Driver端能够批改播送变量的值,在Executor端无奈批改播送变量的值。(4)如果executor端用到了Driver的变量,如果不应用播送变量在Executor有多少task就有多少Driver端的变量正本。(5)如果Executor端用到了Driver的变量,如果应用播送变量在每个Executor中只有一份Driver端的变量正本。
4.5 如何应用播送变量
- 例如
(1) 通过sparkContext的broadcast办法把数据转换成播送变量,类型为Broadcast, val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6)) (2) 而后executor上的BlockManager就能够拉取该播送变量的正本获取具体的数据。 获取播送变量中的值能够通过调用其value办法 val array: Array[Int] = broadcastArray.value
5. 尽量避免应用shuffle类算子
5.1 shuffle形容
spark中的shuffle波及到数据要进行大量的网络传输,上游阶段的task工作须要通过网络拉取上阶段task的输入数据,shuffle过程,简略来说,就是将散布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比方reduceByKey、join等算子,都会触发shuffle操作。 如果有可能的话,要尽量避免应用shuffle类算子。 因为Spark作业运行过程中,最耗费性能的中央就是shuffle过程。
5.2 哪些算子操作会产生shuffle
spark程序在开发的过程中应用reduceByKey、join、distinct、repartition等算子操作,这里都会产生shuffle,因为shuffle这一块是十分消耗性能的,理论开发中尽量应用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,能够大大减少性能开销。
5.3 如何防止产生shuffle
- 小案例
//谬误的做法:// 传统的join操作会导致shuffle操作。// 因为两个RDD中,雷同的key都须要通过网络拉取到一个节点上,由一个task进行join操作。val rdd3 = rdd1.join(rdd2) //正确的做法:// Broadcast+map的join操作,不会导致shuffle操作。// 应用Broadcast将一个数据量较小的RDD作为播送变量。val rdd2Data = rdd2.collect()val rdd2DataBroadcast = sc.broadcast(rdd2Data)// 在rdd1.map算子中,能够从rdd2DataBroadcast中,获取rdd2的所有数据。// 而后进行遍历,如果发现rdd2中某条数据的key与rdd1的以后数据的key是雷同的,那么就断定能够进行join。// 此时就能够依据本人须要的形式,将rdd1以后数据与rdd2中能够连贯的数据,拼接在一起(String或Tuple)。val rdd3 = rdd1.map(rdd2DataBroadcast...)// 留神,以上操作,倡议仅仅在rdd2的数据量比拟少(比方几百M,或者一两G)的状况下应用。// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
5.4 应用map-side预聚合的shuffle操作
- map-side预聚合
如果因为业务须要,肯定要应用shuffle操作,无奈用map类的算子来代替,那么尽量应用能够map-side预聚合的算子。 所谓的map-side预聚合,说的是在每个节点本地对雷同的key进行一次聚合操作,相似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条雷同的key,因为多条雷同的key都被聚合起来了。其余节点在拉取所有节点上的雷同key时,就会大大减少须要拉取的数据数量,从而也就缩小了磁盘IO以及网络传输开销。 通常来说,在可能的状况下,倡议应用reduceByKey或者aggregateByKey算子来代替掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会应用用户自定义的函数对每个节点本地的雷同key进行预聚合。 而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间散发和传输,性能相对来说比拟差。 比方如下两幅图,就是典型的例子,别离基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,能够看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,能够看到,每个节点本地的雷同key数据,都进行了预聚合,而后才传输到其余节点上进行全局聚合。
- ==groupByKey进行单词计数原理==
- ==reduceByKey单词计数原理==
6. 应用高性能的算子
6.1 应用reduceByKey/aggregateByKey代替groupByKey
- reduceByKey/aggregateByKey 能够进行预聚合操作,缩小数据的传输量,晋升性能
- groupByKey 不会进行预聚合操作,进行数据的全量拉取,性能比拟低
6.2 应用mapPartitions代替一般map
mapPartitions类的算子,一次函数调用会解决一个partition所有的数据,而不是一次函数调用解决一条,性能相对来说会高一些。 然而有的时候,应用mapPartitions会呈现OOM(内存溢出)的问题。因为单次函数调用就要解决掉一个partition所有的数据,如果内存不够,垃圾回收时是无奈回收掉太多对象的,很可能呈现OOM异样。所以应用这类操作时要谨慎!
6.3 应用foreachPartition代替foreach
原理相似于“应用mapPartitions代替map”,也是一次函数调用解决一个partition的所有数据,而不是一次函数调用解决一条数据。 在实践中发现,foreachPartitions类的算子,对性能的晋升还是很有帮忙的。比方在foreach函数中,将RDD中所有数据写MySQL,那么如果是一般的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创立一个数据库连贯,此时就势必会频繁地创立和销毁数据库连贯,性能是十分低下; 然而如果用foreachPartitions算子一次性解决一个partition的数据,那么对于每个partition,只有创立一个数据库连贯即可,而后执行批量插入操作,此时性能是比拟高的。实际中发现,对于1万条左右的数据量写MySQL,性能能够晋升30%以上。
6.4 应用filter之后进行coalesce操作
通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比方30%以上的数据),倡议应用coalesce算子,手动缩小RDD的partition数量,将RDD中的数据压缩到更少的partition中去。 因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task解决的partition中的数据量并不是很多,有一点资源节约,而且此时解决的task越多,可能速度反而越慢。 因而用coalesce缩小partition数量,将RDD中的数据压缩到更少的partition之后,只有应用更少的task即可解决完所有的partition。在某些场景下,对于性能的晋升会有肯定的帮忙。
6.5 应用repartitionAndSortWithinPartitions代替repartition与sort类操作
repartitionAndSortWithinPartitions是Spark官网举荐的一个算子,官网倡议,如果须要在repartition重分区之后,还要进行排序,倡议间接应用repartitionAndSortWithinPartitions算子。 因为该算子能够一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
7. 应用Kryo优化序列化性能
7.1 spark序列化介绍
Spark在进行工作计算的时候,会波及到数据跨过程的网络传输、数据的长久化,这个时候就须要对数据进行序列化。Spark默认采纳Java的序列化器。默认java序列化的优缺点如下:其益处: 解决起来不便,不须要咱们手动做其余操作,只是在应用一个对象和变量的时候,须要实现Serializble接口。其毛病: 默认的序列化机制的效率不高,序列化的速度比较慢;序列化当前的数据,占用的内存空间绝对还是比拟大。Spark反对应用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大略是Java序列化机制的1/10。所以Kryo序列化优化当前,能够让网络传输的数据变少;在集群中消耗的内存资源大大减少。
7.2 Kryo序列化启用后失效的中央
Kryo序列化机制,一旦启用当前,会失效的几个中央:(1)算子函数中应用到的内部变量 算子中的内部变量可能来着与driver须要波及到网络传输,就须要用到序列化。 最终能够优化网络传输的性能,优化集群中内存的占用和耗费 (2)长久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER 将rdd长久化时,对应的存储级别里,须要用到序列化。 最终能够优化内存的占用和耗费;长久化RDD占用的内存越少,task执行的时候,创立的对象,就不至于频繁的占满内存,频繁产生GC。 (3) 产生shuffle的中央,也就是宽依赖 上游的stage中的task,拉取上游stage中的task产生的后果数据,跨网络传输,须要用到序列化。最终能够优化网络传输的性能
7.3 如何开启Kryo序列化机制
// 创立SparkConf对象。val conf = new SparkConf().setMaster(...).setAppName(...)// 设置序列化器为KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册要序列化的自定义类型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
8. 应用fastutil优化数据格式
8.1 fastutil介绍
fastutil是扩大了Java规范汇合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了非凡类型的map、set、list和queue;fastutil可能提供更小的内存占用,更快的存取速度;咱们应用fastutil提供的汇合类,来代替本人平时应用的JDK的原生的Map、List、Set.
8.2 fastutil益处
fastutil汇合类,能够减小内存的占用,并且在进行汇合的遍历、依据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度
8.3 Spark中利用fastutil的场景和应用
8.3.1 算子函数应用了内部变量
(1)你能够应用Broadcast播送变量优化;(2)能够应用Kryo序列化类库,晋升序列化性能和效率;(3)如果内部变量是某种比拟大的汇合,那么能够思考应用fastutil改写内部变量;首先从源头上就缩小内存的占用(fastutil),通过播送变量进一步缩小内存占用,再通过Kryo序列化类库进一步缩小内存占用。
8.3.2 算子函数里应用了比拟大的汇合Map/List
在你的算子函数里,也就是task要执行的计算逻辑外面,如果有逻辑中,呈现,要创立比拟大的Map、List等汇合,可能会占用较大的内存空间,而且可能波及到耗费性能的遍历、存取等汇合操作; 那么此时,能够思考将这些汇合类型应用fastutil类库重写,应用了fastutil汇合类当前,就能够在肯定水平上,缩小task创立进去的汇合类型的内存占用。 防止executor内存频繁占满,频繁唤起GC,导致性能降落。
8.3.3 fastutil的应用
第一步:在pom.xml中援用fastutil的包 <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> 第二步:平时应用List (Integer)的替换成IntList即可。 List<Integer>的list对应的到fastutil就是IntList类型 应用阐明:根本都是相似于IntList的格局,前缀就是汇合的元素类型; 非凡的就是Map,Int2IntMap,代表了key-value映射的元素类型。
9. 调节数据本地化期待时长
Spark在Driver上对Application的每一个stage的task进行调配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;Spark的task调配算法,优先会心愿每个task正好调配到它要计算的数据所在的节点,这样的话就不必在网络间传输数据; 然而通常来说,有时大失所望,可能task没有机会调配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以这种时候,通常来说,Spark会期待一段时间,默认状况下是3秒(不是相对的,还有很多种状况,对不同的本地化级别,都会去期待),到最初切实是期待不了了,就会抉择一个比拟差的本地化级别,比如说将task调配到间隔要计算的数据所在节点比拟近的一个节点,而后进行计算。
9.1 本地化级别
(1)PROCESS_LOCAL:过程本地化 代码和数据在同一个过程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好(2)NODE_LOCAL:节点本地化 代码和数据在同一个节点中;比如说数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是数据和task在一个节点上的不同executor中;数据须要在过程间进行传输;性能其次(3)RACK_LOCAL:机架本地化 数据和task在一个机架的两个节点上;数据须要通过网络在节点之间进行传输; 性能比拟差(4) ANY:无限度 数据和task可能在集群中的任何中央,而且不在一个机架中;性能最差
9.2 数据本地化期待时长
spark.locality.wait,默认是3s首先采纳最佳的形式,期待3s后降级,还是不行,持续降级...,最初还是不行,只可能采纳最差的。
9.3 如何调节参数并且测试
批改spark.locality.wait参数,默认是3s,能够减少上面是每个数据本地化级别的等待时间,默认都是跟spark.locality.wait工夫雷同,默认都是3s(可查看spark官网对应参数阐明,如下图所示)spark.locality.wait.nodespark.locality.wait.processspark.locality.wait.rack
在代码中设置:new SparkConf().set("spark.locality.wait","10")而后把程序提交到spark集群中运行,留神察看日志,spark作业的运行日志,举荐大家在测试的时候,先用client模式,在本地就间接能够看到比拟全的日志。 日志外面会显示,starting task .... PROCESS LOCAL、NODE LOCAL.....例如:Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes)察看大部分task的数据本地化级别 如果大多都是PROCESS_LOCAL,那就不必调节了。如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的期待时长。应该是要重复调节,每次调节完当前,再来运行,察看日志 看看大部分的task的本地化级别有没有晋升;看看整个spark作业的运行工夫有没有缩短。留神留神:在调节参数、运行工作的时候,别轻重倒置,本地化级别倒是晋升了, 然而因为大量的期待时长,spark作业的运行工夫反而减少了,那就还是不要调节了。
10. 基于Spark内存模型调优
10.1 spark中executor内存划分
Executor的内存次要分为三块
- 第一块是让task执行咱们本人编写的代码时应用;
- 第二块是让task通过shuffle过程拉取了上一个stage的task的输入后,进行聚合等操作时应用
- 第三块是让RDD缓存时应用
10.2 spark的内存模型
在spark1.6版本以前 spark的executor应用的动态内存模型,然而在spark1.6开始,多减少了一个对立内存模型。 通过spark.memory.useLegacyMode 这个参数去配置 默认这个值是false,代表用的是新的动态内存模型; 如果想用以前的动态内存模型,那么就要把这个值改为true。
10.2.1 动态内存模型
实际上就是把咱们的一个executor分成了三局部, 一部分是Storage内存区域, 一部分是execution区域, 还有一部分是其余区域。如果应用的动态内存模型,那么用这几个参数去管制: spark.storage.memoryFraction:默认0.6spark.shuffle.memoryFraction:默认0.2 所以第三局部就是0.2如果咱们cache数据量比拟大,或者是咱们的播送变量比拟大, 那咱们就把spark.storage.memoryFraction这个值调大一点。 然而如果咱们代码外面没有播送变量,也没有cache,shuffle又比拟多,那咱们要把spark.shuffle.memoryFraction 这值调大。
- 动态内存模型的毛病
咱们配置好了Storage内存区域和execution区域后,咱们的一个工作假如execution内存不够用了,然而它的Storage内存区域是闲暇的,两个之间不能相互借用,不够灵便,所以才进去咱们新的对立内存模型。
10.2.2 对立内存模型
动态内存模型先是预留了300m内存,避免内存溢出。动态内存模型把整体内存分成了两局部,由这个参数示意spark.memory.fraction 这个指的默认值是0.6 代表另外的一部分是0.4,而后spark.memory.fraction 这部分又划分成为两个小局部。这两小局部共占整体内存的0.6 .这两局部其实就是:Storage内存和execution内存。由spark.memory.storageFraction 这个参数去调配,因为两个共占0.6。如果spark.memory.storageFraction这个值配的是0.5,那阐明这0.6外面 storage占了0.5,也就是executor占了0.3 。
- 对立内存模型有什么特点呢?
Storage内存和execution内存 能够互相借用。不必像动态内存模型那样死板,然而是有规定的
为什么受伤的都是storage呢?是因为execution外面的数据是马上就要用的,而storage里的数据不肯定马上就要用。
10.2.3 工作提交脚本参考
- 以下是一份spark-submit命令的示例,大家能够参考一下,并依据本人的理论状况进行调节
bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
10.2.4 集体教训
java.lang.OutOfMemoryErrorExecutorLostFailureExecutor exit code 为143executor losthearbeat time outshuffle file lost如果遇到以上问题,很有可能就是内存除了问题,能够先尝试减少内存。如果还是解决不了,那么请听下一次数据歪斜调优的课。
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注