Spark调优之Shuffle调优

本节开始先解说Shuffle外围概念;而后针对HashShuffleSortShuffle进行调优;接下来对map端reduce端调优;再针对Spark中的数据歪斜问题进行分析及调优;最初是Spark运行过程中的故障排除

本文首发于公众号【五分钟学大数据】,本公号专一于大数据技术,分享高质量大数据原创技术文章。

一、Shuffle的外围概念

1. ShuffleMapStage与ResultStage

在划分stage时,最初一个stage称为FinalStage,它实质上是一个ResultStage对象,后面的所有stage被称为ShuffleMapStage

ShuffleMapStage的完结随同着shuffle文件的写磁盘。

ResultStage基本上对应代码中的action算子,行将一个函数利用在RDD的各个partition的数据集上,意味着一个job的运行完结。

2. Shuffle中的工作个数

咱们晓得,Spark Shuffle分为map阶段和reduce阶段,或者称之为ShuffleRead阶段和ShuffleWrite阶段,那么对于一次Shuffle,map过程和reduce过程都会由若干个task来执行,那么map task和reduce task的数量是如何确定的呢?

假如Spark工作从HDFS中读取数据,那么初始RDD分区个数由该文件的split个数决定,也就是一个split对应生成的RDD的一个partition,咱们假如初始partition个数为N。

初始RDD通过一系列算子计算后(假如没有执行repartition和coalesce算子进行重分区,则分区个数不变,仍为N,如果通过重分区算子,那么分区个数变为M),咱们假如分区个数不变,当执行到Shuffle操作时,map端的task个数和partition个数统一,即map task为N个

reduce端的stage默认取spark.default.parallelism这个配置项的值作为分区数,如果没有配置,则以map端的最初一个RDD的分区数作为其分区数(也就是N),那么分区数就决定了reduce端的task的个数。

3. reduce端数据的读取

依据stage的划分咱们晓得,map端task和reduce端task不在雷同的stage中,map task位于ShuffleMapStagereduce task位于ResultStage,map task会先执行,那么后执行的reduce task如何晓得从哪里去拉取map task落盘后的数据呢?

reduce端的数据拉取过程如下

  1. map task 执行结束后会将计算状态以及磁盘小文件地位等信息封装到MapStatus对象中,而后由本过程中的MapOutPutTrackerWorker对象将mapStatus对象发送给Driver过程的MapOutPutTrackerMaster对象;
  2. 在reduce task开始执行之前会先让本过程中的MapOutputTrackerWorker向Driver过程中的MapoutPutTrakcerMaster动员申请,申请磁盘小文件地位信息;
  3. 当所有的Map task执行结束后,Driver过程中的MapOutPutTrackerMaster就把握了所有的磁盘小文件的地位信息。此时MapOutPutTrackerMaster会通知MapOutPutTrackerWorker磁盘小文件的地位信息;
  4. 实现之前的操作之后,由BlockTransforService去Executor0所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的20%内存中)。

二、HashShuffle解析

以下的探讨都假如每个Executor有1个cpu core。

1. 未经优化的HashShuffleManager

shuffle write阶段,次要就是在一个stage完结计算之后,为了下一个stage能够执行shuffle类的算子(比方reduceByKey),而将每个task解决的数据按key进行“划分”。所谓“划分”,就是对雷同的key执行hash算法,从而将雷同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于上游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去

下一个stage的task有多少个,以后stage的每个task就要创立多少份磁盘文件。比方下一个stage总共有100个task,那么以后stage的每个task都要创立100份磁盘文件。如果以后stage有50个task,总共有10个Executor,每个Executor执行5个task,那么每个Executor上总共就要创立500个磁盘文件,所有Executor上会创立5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的

shuffle read阶段,通常就是一个stage刚开始时要做的事件。此时该stage的每一个task就须要将上一个stage的计算结果中的所有雷同key,从各个节点上通过网络都拉取到本人所在的节点上,而后进行key的聚合或连贯等操作。因为shuffle write的过程中,map task给上游stage的每个reduce task都创立了一个磁盘文件,因而shuffle read的过程中,每个reduce task只有从上游stage的所有map task所在节点上,拉取属于本人的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个本人的buffer缓冲,每次都只能拉取与buffer缓冲雷同大小的数据,而后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最初将所有数据到拉取完,并失去最终的后果。

未优化的HashShuffleManager工作原理如下图所示:

2. 优化后的HashShuffleManager

为了优化HashShuffleManager咱们能够设置一个参数:spark.shuffle.consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制,通常来说,如果咱们应用HashShuffleManager,那么都倡议开启这个选项

开启consolidate机制之后,在shuffle write过程中,task就不是为上游stage的每个task创立一个磁盘文件了,此时会呈现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与上游stage的task数量是雷同的。一个Executor上有多少个cpu core,就能够并行执行多少个task。而第一批并行执行的每个task都会创立一个shuffleFileGroup,并将数据写入对应的磁盘文件内

当Executor的cpu core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包含其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因而,consolidate机制容许不同的task复用同一批磁盘文件,这样就能够无效将多个task的磁盘文件进行肯定水平上的合并,从而大幅度缩小磁盘文件的数量,进而晋升shuffle write的性能

假如第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor(Executor CPU个数为1),每个Executor执行5个task。那么本来应用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。然而此时通过优化之后,每个Executor创立的磁盘文件的数量的计算公式为:cpu core的数量 * 下一个stage的task数量,也就是说,每个Executor此时只会创立100个磁盘文件,所有Executor只会创立1000个磁盘文件。

优化后的HashShuffleManager工作原理如下图所示:

三、 SortShuffle解析

SortShuffleManager的运行机制次要分成两种,一种是一般运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

1. 一般运行机制

在该模式下,数据会先写入一个内存数据结构中,此时依据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存如果是join这种一般的shuffle算子,那么会选用Array数据结构,间接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。

在溢写到磁盘文件之前,会先依据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的模式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输入流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样能够缩小磁盘IO次数,晋升性能

一个task将所有数据写入内存数据结构的过程中,会产生屡次磁盘溢写操作,也就会产生多个临时文件。最初会将之前所有的长期磁盘文件都进行合并,这就是merge过程,此时会将之前所有长期磁盘文件中的数据读取进去,而后顺次写入最终的磁盘文件之中。此外,因为一个task就只对应一个磁盘文件,也就意味着该task为上游stage的task筹备的数据都在这一个文件中,因而还会独自写一份索引文件,其中标识了上游各个task的数据在文件中的start offset与end offset。

SortShuffleManager因为有一个磁盘文件merge的过程,因而大大减少了文件数量。比方第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。因为每个task最终只有一个磁盘文件,因而此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

一般运行机制的SortShuffleManager工作原理如下图所示:

2. bypass运行机制

bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
  • 不是聚合类的shuffle算子。

此时,每个task会为每个上游task都创立一个长期磁盘文件,并将数据按key进行hash而后依据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最初,同样会将所有长期磁盘文件都合并成一个磁盘文件,并创立一个独自的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是截然不同的,因为都要创立数量惊人的磁盘文件,只是在最初会做一个磁盘文件的合并而已。因而大量的最终磁盘文件,也让该机制绝对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与一般SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大益处在于,shuffle write过程中,不须要进行数据的排序操作,也就节俭掉了这部分的性能开销。

bypass运行机制的SortShuffleManager工作原理如下图所示:

四、map和reduce端缓冲区大小

在Spark工作运行过程中,如果shuffle的map端解决的数据量比拟大,然而map端缓冲的大小是固定的,可能会呈现map端缓冲数据频繁spill溢写到磁盘文件中的状况,使得性能十分低下,通过调节map端缓冲的大小,能够防止频繁的磁盘IO操作,进而晋升Spark工作的整体性能

map端缓冲的默认配置是32KB,如果每个task解决640KB的数据,那么会产生640/32 = 20次溢写,如果每个task解决64000KB的数据,即会产生64000/32=2000次溢写,这对于性能的影响是十分重大的。

map端缓冲的配置办法:

val conf = new SparkConf()  .set("spark.shuffle.file.buffer", "64")

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次可能缓冲的数据量,也就是每次可能拉取的数据量,如果内存资源较为短缺,适当减少拉取数据缓冲区的大小,能够缩小拉取数据的次数,也就能够缩小网络传输的次数,进而晋升性能

reduce端数据拉取缓冲区的大小能够通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB。该参数的设置办法如下:

reduce端数据拉取缓冲区配置:

val conf = new SparkConf()  .set("spark.reducer.maxSizeInFlight", "96")

五、reduce端重试次数和等待时间距离

Spark Shuffle过程中,reduce task拉取属于本人的数据时,如果因为网络异样等起因导致失败会主动进行重试。对于那些蕴含了特地耗时的shuffle操作的作业,倡议减少重试最大次数(比方60次),以防止因为JVM的full gc或者网络不稳固等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度晋升稳定性

reduce端拉取数据重试次数能够通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了能够重试的最大次数。如果在指定次数之内拉取还是没有胜利,就可能会导致作业执行失败,默认为3,该参数的设置办法如下:

reduce端拉取数据重试次数配置:

val conf = new SparkConf()  .set("spark.shuffle.io.maxRetries", "6")

Spark Shuffle过程中,reduce task拉取属于本人的数据时,如果因为网络异样等起因导致失败会主动进行重试,在一次失败后,会期待肯定的工夫距离再进行重试,能够通过加大距离时长(比方60s),以减少shuffle操作的稳定性

reduce端拉取数据期待距离能够通过spark.shuffle.io.retryWait参数进行设置,默认值为5s,该参数的设置办法如下:

reduce端拉取数据期待距离配置:

val conf = new SparkConf()  .set("spark.shuffle.io.retryWait", "60s")

六、bypass机制开启阈值

对于SortShuffleManager,如果shuffle reduce task的数量小于某一阈值则shuffle write过程中不会进行排序操作,而是间接依照未经优化的HashShuffleManager的形式去写数据,然而最初会将每个task产生的所有长期磁盘文件都合并成一个文件,并会创立独自的索引文件。

当你应用SortShuffleManager时,如果确实不须要排序操作,那么倡议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,缩小了排序的性能开销,然而这种形式下,仍然会产生大量的磁盘文件,因而shuffle write性能有待进步

SortShuffleManager排序操作阈值的设置能够通过spark.shuffle.sort.bypassMergeThreshold这一参数进行设置,默认值为200,该参数的设置办法如下:

reduce端拉取数据期待距离配置:

val conf = new SparkConf()  .set("spark.shuffle.sort.bypassMergeThreshold", "400")

数据歪斜

就是数据分到各个区的数量不太平均,能够自定义分区器,想怎么分就怎么分。

Spark中的数据歪斜问题次要指shuffle过程中呈现的数据歪斜问题,是因为不同的key对应的数据量不同导致的不同task所解决的数据量不同的问题

例如,reduced端一共要解决100万条数据,第一个和第二个task别离被调配到了1万条数据,计算5分钟内实现,第三个task调配到了98万数据,此时第三个task可能须要10个小时实现,这使得整个Spark作业须要10个小时能力运行实现,这就是数据歪斜所带来的结果。

留神,要辨别开数据歪斜数据适量这两种状况,数据歪斜是指多数task被调配了绝大多数的数据,因而多数task运行迟缓;数据适量是指所有task被调配的数据量都很大,相差不多,所有task都运行迟缓。

数据歪斜的体现:

  1. Spark作业的大部分task都执行迅速,只有无限的几个task执行的十分慢,此时可能呈现了数据歪斜,作业能够运行,然而运行得十分慢;
  2. Spark作业的大部分task都执行迅速,然而有的task在运行过程中会忽然报出OOM,重复执行几次都在某一个task报出OOM谬误,此时可能呈现了数据歪斜,作业无奈失常运行。

定位数据歪斜问题:

  1. 查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,依据代码逻辑判断此处是否会呈现数据歪斜;
  2. 查看Spark作业的log文件,log文件对于谬误的记录会准确到代码的某一行,能够依据异样定位到的代码地位来明确谬误产生在第几个stage,对应的shuffle算子是哪一个;

1. 预聚合原始数据

1. 防止shuffle过程

绝大多数状况下,Spark作业的数据起源都是Hive表,这些Hive表根本都是通过ETL之后的昨天的数据。
为了防止数据歪斜,咱们能够思考防止shuffle过程,如果防止了shuffle过程,那么从根本上就打消了产生数据歪斜问题的可能。

如果Spark作业的数据来源于Hive表,那么能够先在Hive表中对数据进行聚合,例如依照key进行分组,将同一key对应的所有value用一种非凡的格局拼接到一个字符串里去,这样,一个key就只有一条数据了;之后,对一个key的所有value进行解决时,只须要进行map操作即可,无需再进行任何的shuffle操作。通过上述形式就防止了执行shuffle操作,也就不可能会产生任何的数据歪斜问题。

对于Hive表中数据的操作,不肯定是拼接成一个字符串,也能够是间接对key的每一条数据进行累计计算。
要辨别开,解决的数据量大和数据歪斜的区别

2. 增大key粒度(减小数据歪斜可能性,增大每个task的数据量)

如果没有方法对每个key聚合进去一条数据,在特定场景下,能够思考扩充key的聚合粒度。

例如,目前有10万条用户数据,以后key的粒度是(省,城市,区,日期),当初咱们思考扩充粒度,将key的粒度扩充为(省,城市,日期),这样的话,key的数量会缩小,key之间的数据量差别也有可能会缩小,由此能够加重数据歪斜的景象和问题。(此办法只针对特定类型的数据无效,当利用场景不合适时,会减轻数据歪斜)

2. 预处理导致歪斜的key

1. 过滤

如果在Spark作业中容许抛弃某些数据,那么能够思考将可能导致数据歪斜的key进行过滤,滤除可能导致数据歪斜的key对应的数据,这样,在Spark作业中就不会产生数据歪斜了。

2. 应用随机key

当应用了相似于groupByKey、reduceByKey这样的算子时,能够思考应用随机key实现双重聚合,如下图所示:

首先,通过map算子给每个数据的key增加随机数前缀,对key进行打散,将原先一样的key变成不一样的key,而后进行第一次聚合,这样就能够让本来被一个task解决的数据扩散到多个task下来做部分聚合;随后,去除掉每个key的前缀,再次进行聚合。

此办法对于由groupByKey、reduceByKey这类算子造成的数据歪斜有比拟好的成果,仅仅实用于聚合类的shuffle操作,适用范围绝对较窄。如果是join类的shuffle操作,还得用其余的解决方案

此办法也是前几种计划没有比拟好的成果时要尝试的解决方案。

3. sample采样对歪斜key独自进行join

在Spark中,如果某个RDD只有一个key,那么在shuffle过程中会默认将此key对应的数据打散,由不同的reduce端task进行解决

所以当由单个key导致数据歪斜时,可有将产生数据歪斜的key独自提取进去,组成一个RDD,而后用这个原本会导致歪斜的key组成的RDD和其余RDD独自join,此时,依据Spark的运行机制,此RDD中的数据会在shuffle阶段被扩散到多个task中去进行join操作。

歪斜key独自join的流程如下图所示:

实用场景剖析:

对于RDD中的数据,能够将其转换为一个两头表,或者是间接应用countByKey()的形式,看一下这个RDD中各个key对应的数据量,此时如果你发现整个RDD就一个key的数据量特地多,那么就能够思考应用这种办法。

当数据量十分大时,能够思考应用sample采样获取10%的数据,而后剖析这10%的数据中哪个key可能会导致数据歪斜,而后将这个key对应的数据独自提取进去。

不实用场景剖析:

如果一个RDD中导致数据歪斜的key很多,那么此计划不实用。

3. 进步reduce并行度

当计划一和计划二对于数据歪斜的解决没有很好的成果时,能够思考进步shuffle过程中的reduce端并行度,reduce端并行度的进步就减少了reduce端task的数量,那么每个task调配到的数据量就会相应缩小,由此缓解数据歪斜问题。

1. reduce端并行度的设置

在大部分的shuffle算子中,都能够传入一个并行度的设置参数,比方reduceByKey(500),这个参数会决定shuffle过程中reduce端的并行度,在进行shuffle操作的时候,就会对应着创立指定数量的reduce task。对于Spark SQL中的shuffle类语句,比方group by、join等,须要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

减少shuffle read task的数量,能够让本来调配给一个task的多个key调配给多个task,从而让每个task解决比原来更少的数据。

举例来说,如果本来有5个key,每个key对应10条数据,这5个key都是调配给一个task的,那么这个task就要解决50条数据。而减少了shuffle read task当前,每个task就调配到一个key,即每个task就解决10条数据,那么天然每个task的执行工夫都会变短了。

2. reduce端并行度设置存在的缺点

进步reduce端并行度并没有从根本上扭转数据歪斜的实质和问题(计划一和计划二从根本上防止了数据歪斜的产生),只是尽可能地去缓解和加重shuffle reduce task的数据压力,以及数据歪斜的问题,实用于有较多key对应的数据量都比拟大的状况。

该计划通常无奈彻底解决数据歪斜,因为如果呈现一些极其状况,比方某个key对应的数据量有100万,那么无论你的task数量减少到多少,这个对应着100万数据的key必定还是会调配到一个task中去解决,因而注定还是会产生数据歪斜的。所以这种计划只能说是在发现数据歪斜时尝试应用的一种伎俩,尝试去用最简略的办法缓解数据歪斜而已,或者是和其余计划联合起来应用。

在现实状况下,reduce端并行度晋升后,会在肯定水平上加重数据歪斜的问题,甚至根本打消数据歪斜;然而,在一些状况下,只会让原来因为数据歪斜而运行迟缓的task运行速度稍有晋升,或者防止了某些task的OOM问题,然而,依然运行迟缓,此时,要及时放弃计划三,开始尝试前面的计划。

4. 应用map join

失常状况下,join操作都会执行shuffle过程,并且执行的是reduce join,也就是先将所有雷同的key和对应的value汇聚到一个reduce task中,而后再进行join。一般join的过程如下图所示:

一般的join是会走shuffle过程的,而一旦shuffle,就相当于会将雷同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。然而如果一个RDD是比拟小的,则能够采纳播送小RDD全量数据+map算子来实现与join同样的成果,也就是map join,此时就不会产生shuffle操作,也就不会产生数据歪斜。

留神:RDD是并不能间接进行播送的,只能将RDD外部的数据通过collect拉取到Driver内存而后再进行播送

1. 外围思路:

不应用join算子进行连贯操作,而应用broadcast变量与map类算子实现join操作,进而齐全躲避掉shuffle类的操作,彻底防止数据歪斜的产生和呈现。将较小RDD中的数据间接通过collect算子拉取到Driver端的内存中来,而后对其创立一个broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从broadcast变量中获取较小RDD的全量数据,与以后RDD的每一条数据依照连贯key进行比对,如果连贯key雷同的话,那么就将两个RDD的数据用你须要的形式连接起来。

根据上述思路,基本不会产生shuffle操作,从根本上杜绝了join操作可能导致的数据歪斜问题。

当join操作有数据歪斜问题并且其中一个RDD的数据量较小时,能够优先思考这种形式,成果十分好

map join的过程如下图所示:

2. 不实用场景剖析:

因为Spark的播送变量是在每个Executor中保留一个正本,如果两个RDD数据量都比拟大,那么如果将一个数据量比拟大的RDD做成播送变量,那么很有可能会造成内存溢出。

故障排除

1. 防止OOM-out of memory

在Shuffle过程,reduce端task并不是等到map端task将其数据全副写入磁盘后再去拉取,而是map端写一点数据,reduce端task就会拉取一小部分数据,而后立刻进行前面的聚合、算子函数的应用等操作

reduce端task可能拉取多少数据,由reduce拉取数据的缓冲区buffer来决定,因为拉取过去的数据都是先放在buffer中,而后再进行后续的解决,buffer的默认大小为48MB

reduce端task会一边拉取一边计算,不肯定每次都会拉满48MB的数据,可能大多数时候拉取一部分数据就解决掉了。

尽管说增大reduce端缓冲区大小能够缩小拉取次数,晋升Shuffle性能,然而有时map端的数据量十分大,写出的速度十分快,此时reduce端的所有task在拉取的时候,有可能全副达到本人缓冲的最大极限值,即48MB,此时,再加上reduce端执行的聚合函数的代码,可能会创立大量的对象,这可能会导致内存溢出,即OOM

如果一旦呈现reduce端内存溢出的问题,咱们能够思考减小reduce端拉取数据缓冲区的大小,例如缩小为12MB

在理论生产环境中是呈现过这种问题的,这是典型的以性能换执行的原理。reduce端拉取数据的缓冲区减小,不容易导致OOM,然而相应的,reudce端的拉取次数减少,造成更多的网络传输开销,造成性能的降落。

留神,要保障工作可能运行,再思考性能的优化。

2. 防止GC导致的shuffle文件拉取失败

在Spark作业中,有时会呈现shuffle file not found的谬误,这是十分常见的一个报错,有时呈现这种谬误当前,抉择从新执行一遍,就不再报出这种谬误

呈现上述问题可能的起因是Shuffle操作中,前面stage的task想要去上一个stage的task所在的Executor拉取数据,后果对方正在执行GC,执行GC会导致Executor内所有的工作现场全副进行,比方BlockManager、基于netty的网络通信等,这就会导致前面的task拉取数据拉取了半天都没有拉取到,就会报出shuffle file not found的谬误,而第二次再次执行就不会再呈现这种谬误。

能够通过调整reduce端拉取数据重试次数和reduce端拉取数据工夫距离这两个参数来对Shuffle性能进行调整,增大参数值,使得reduce端拉取数据的重试次数减少,并且每次失败后期待的工夫距离加长。

JVM GC导致的shuffle文件拉取失败调整数据重试次数和reduce端拉取数据工夫距离:

val conf = new SparkConf()  .set("spark.shuffle.io.maxRetries", "6")  .set("spark.shuffle.io.retryWait", "60s")

3. YARN-CLIENT模式导致的网卡流量激增问题

在YARN-client模式下,Driver启动在本地机器上,而Driver负责所有的任务调度,须要与YARN集群上的多个Executor进行频繁的通信。

假如有100个Executor,1000个task,那么每个Executor调配到10个task,之后,Driver要频繁地跟Executor上运行的1000个task进行通信,通信数据十分多,并且通信品类特地高。这就导致有可能在Spark工作运行过程中,因为频繁大量的网络通讯,本地机器的网卡流量会激增。

留神,YARN-client模式只会在测试环境中应用,而之所以应用YARN-client模式,是因为能够看到具体全面的log信息,通过查看log,能够锁定程序中存在的问题,防止在生产环境下产生故障。

在生产环境下,应用的肯定是YARN-cluster模式。在YARN-cluster模式下,就不会造成本地机器网卡流量激增问题,如果YARN-cluster模式下存在网络通信的问题,须要运维团队进行解决。

4. YARN-CLUSTER模式的JVM栈内存溢出无奈执行问题

当Spark作业中蕴含SparkSQL的内容时,可能会碰到YARN-client模式下能够运行,然而YARN-cluster模式下无奈提交运行(报出OOM谬误)的状况。

YARN-client模式下,Driver是运行在本地机器上的,Spark应用的JVM的PermGen的配置,是本地机器上的spark-class文件,JVM永恒代的大小是128MB,这个是没有问题的,然而在YARN-cluster模式下,Driver运行在YARN集群的某个节点上,应用的是没有通过配置的默认设置,PermGen永恒代大小为82MB

SparkSQL的外部要进行很简单的SQL的语义解析、语法树转换等等,非常复杂,如果sql语句自身就非常复杂,那么很有可能会导致性能的损耗和内存的占用,特地是对PermGen的占用会比拟大。

所以,此时如果PermGen占用好过了82MB,然而又小于128MB,就会呈现YARN-client模式下能够运行,YARN-cluster模式下无奈运行的状况

解决上述问题的办法是减少PermGen(永恒代)的容量,须要在spark-submit脚本中对相干参数进行设置,设置办法如下:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

通过上述办法就设置了Driver永恒代的大小,默认为128MB,最大256MB,这样就能够防止下面所说的问题。

5. 防止SparkSQL JVM栈内存溢出

当SparkSQL的sql语句有成千盈百的or关键字时,就可能会呈现Driver端的JVM栈内存溢出。

JVM栈内存溢出基本上就是因为调用的办法层级过多,产生了大量的,十分深的,超出了JVM栈深度限度的递归。(咱们猜想SparkSQL有大量or语句的时候,在解析SQL时,例如转换为语法树或者进行执行打算的生成的时候,对于or的解决是递归,or十分多时,会产生大量的递归)

此时,倡议将一条sql语句拆分为多条sql语句来执行,每条sql语句尽量保障100个以内的子句。依据理论的生产环境试验,一条sql语句的or关键字管制在100个以内,通常不会导致JVM栈内存溢出。


更多大数据好文,欢送关注公众号【五分钟学大数据

--end--

文章举荐
Spark性能调优-RDD算子调优篇