RDD算子调优

不废话,间接进入正题!

1. RDD复用

在对RDD进行算子时,要防止雷同的算子和计算逻辑之下对RDD进行反复的计算,如下图所示:

对上图中的RDD计算架构进行批改,失去如下图所示的优化后果:

2. 尽早filter

获取到初始RDD后,应该思考尽早地过滤掉不须要的数据,进而缩小对内存的占用,从而晋升Spark作业的运行效率。

本文首发于公众号:五分钟学大数据,欢送围观!回复【书籍】即可取得上百本大数据书籍

3. 读取大量小文件-用wholeTextFiles

当咱们将一个文本文件读取为 RDD 时,输出的每一行都会成为RDD的一个元素。

也能够将多个残缺的文本文件一次性读取为一个pairRDD,其中键是文件名,值是文件内容。

val input:RDD[String] = sc.textFile("dir/*.log") 

如果传递目录,则将目录下的所有文件读取作为RDD。文件门路反对通配符。

然而这样对于大量的小文件读取效率并不高,应该应用 wholeTextFiles
返回值为RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

wholeTextFiles读取小文件:

val filesRDD: RDD[(String, String)] =sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

4. mapPartition和foreachPartition

  • mapPartitions

map(_....) 示意每一个元素

mapPartitions(_....) 示意每个分区的数据组成的迭代器

一般的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。

如果是一般的map算子,假如一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。

如果是mapPartition算子,因为一个task解决一个RDD的partition,那么一个task只会执行一次function,function一次接管所有的partition数据,效率比拟高。

比方,当要把RDD中的所有数据通过JDBC写入数据,如果应用map算子,那么须要对RDD中的每一个元素都创立一个数据库连贯,这样对资源的耗费很大,如果应用mapPartitions算子,那么针对一个分区的数据,只须要建设一个数据库连贯

mapPartitions算子也存在一些毛病:对于一般的map操作,一次解决一条数据,如果在解决了2000条数据后内存不足,那么能够将曾经解决完的2000条数据从内存中垃圾回收掉;然而如果应用mapPartitions算子,但数据量十分大时,function一次解决一个分区的数据,如果一旦内存不足,此时无奈回收内存,就可能会OOM,即内存溢出。

因而,mapPartitions算子实用于数据量不是特地大的时候,此时应用mapPartitions算子对性能的晋升成果还是不错的。(当数据量很大的时候,一旦应用mapPartitions算子,就会间接OOM)

在我的项目中,应该首先估算一下RDD的数据量、每个partition的数据量,以及调配给每个Executor的内存资源,如果资源容许,能够思考应用mapPartitions算子代替map。

  • foreachPartition

rrd.foreache(_....) 示意每一个元素

rrd.forPartitions(_....) 示意每个分区的数据组成的迭代器

在生产环境中,通常应用foreachPartition算子来实现数据库的写入,通过foreachPartition算子的个性,能够优化写数据库的性能。

如果应用foreach算子实现数据库的操作,因为foreach算子是遍历RDD的每条数据,因而,每条数据都会建设一个数据库连贯,这是对资源的极大节约,因而,对于写数据库操作,咱们该当应用foreachPartition算子

与mapPartitions算子十分类似,foreachPartition是将RDD的每个分区作为遍历对象,一次解决一个分区的数据,也就是说,如果波及数据库的相干操作,一个分区的数据只须要创立一次数据库连贯,如下图所示:

应用了foreachPartition 算子后,能够取得以下的性能晋升:

  1. 对于咱们写的function函数,一次解决一整个分区的数据;
  2. 对于一个分区内的数据,创立惟一的数据库连贯;
  3. 只须要向数据库发送一次SQL语句和多组参数;

在生产环境中,全副都会应用foreachPartition算子实现数据库操作。foreachPartition算子存在一个问题,与mapPartitions算子相似,如果一个分区的数据量特地大,可能会造成OOM,即内存溢出

5. filter+coalesce/repartition(缩小分区)

在Spark工作中咱们常常会应用filter算子实现RDD中数据的过滤,在工作初始阶段,从各个分区中加载到的数据量是相近的,然而一旦进过filter过滤后,每个分区的数据量有可能会存在较大差别,如下图所示:

依据上图咱们能够发现两个问题:

  1. 每个partition的数据质变小了,如果还依照之前与partition相等的task个数去解决以后数据,有点节约task的计算资源;
  2. 每个partition的数据量不一样,会导致前面的每个task解决每个partition数据的时候,每个task要解决的数据量不同,这很有可能导致数据歪斜问题。

如上图所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在雷同的解决逻辑下,第二个分区对应的task解决的数据量与第三个分区对应的task解决的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据歪斜问题

针对上述的两个问题,咱们别离进行剖析:

  1. 针对第一个问题,既然分区的数据质变小了,咱们心愿能够对分区数据进行重新分配,比方将原来4个分区的数据转化到2个分区中,这样只须要用前面的两个task进行解决即可,防止了资源的节约。
  2. 针对第二个问题,解决办法和第一个问题的解决办法十分类似,对分区数据重新分配,让每个partition中的数据量差不多,这就防止了数据歪斜问题。

那么具体应该如何实现下面的解决思路?咱们须要coalesce算子。

repartition与coalesce都能够用来进行重分区,其中repartition只是coalesce接口中shuffle为true的繁难实现,coalesce默认状况下不进行shuffle,然而能够通过参数进行设置。

假如咱们心愿将本来的分区个数A通过从新分区变为B,那么有以下几种状况:

  1. A > B(少数分区合并为多数分区)

    • A与B相差值不大

      此时应用coalesce即可,无需shuffle过程。

    • A与B相差值很大

      此时能够应用coalesce并且不启用shuffle过程,然而会导致合并过程性能低下,所以举荐设置coalesce的第二个参数为true,即启动shuffle过程。

  2. A < B(多数分区合成为少数分区)

此时应用repartition即可,如果应用coalesce须要将shuffle设置为true,否则coalesce有效。

咱们能够在filter操作之后,应用coalesce算子针对每个partition的数据量各不相同的状况,压缩partition的数量,而且让每个partition的数据量尽量平均紧凑,以便于前面的task进行计算操作,在某种程度上可能在肯定水平上晋升性能

留神:local模式是过程内模仿集群运行,曾经对并行度和分区数量有了肯定的外部优化,因而不必去设置并行度和分区数量。

6. 并行度设置

Spark作业中的并行度指各个stage的task的数量

如果并行度设置不合理而导致并行度过低,会导致资源的极大节约,例如,20个Executor,每个Executor调配3个CPU core,而Spark作业有40个task,这样每个Executor调配到的task个数是2个,这就使得每个Executor有一个CPU core闲暇,导致资源的节约。

现实的并行度设置,应该是让并行度与资源相匹配,简略来说就是在资源容许的前提下,并行度要设置的尽可能大,达到能够充分利用集群资源。正当的设置并行度,能够晋升整个Spark作业的性能和运行速度。

Spark官网举荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。之所以没有举荐task数量与CPU core总数相等,是因为task的执行工夫不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行实现后,会呈现CPU core闲暇的状况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行结束后,CPU core会立即执行下一个task,升高了资源的节约,同时晋升了Spark作业运行的效率。

Spark作业并行度的设置如下:

val conf = new SparkConf().set("spark.default.parallelism", "500")

准则:**让 cpu 的 Core(cpu 外围数) 充分利用起来,
如有100个 Core,那么并行度能够设置为200~300**。

7. repartition/coalesce调节并行度

咱们晓得 Spark 中有并行度的调节策略,然而,并行度的设置对于Spark SQL是不失效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage失效

Spark SQL的并行度不容许用户本人指定,Spark SQL本人会默认依据hive表对应的HDFS文件的split个数主动设置Spark SQL所在的那个stage的并行度,用户本人通 spark.default.parallelism 参数指定的并行度,只会在没Spark SQL的stage中失效。

因为Spark SQL所在stage的并行度无奈手动设置,如果数据量较大,并且此stage中后续的transformation操作有着简单的业务逻辑,而Spark SQL主动设置的task数量很少,这就意味着每个task要解决为数不少的数据量,而后还要执行非常复杂的解决逻辑,这就可能体现为第一个有Spark SQL的stage速度很慢,而后续的没有Spark SQL的stage运行速度十分快。

为了解决Spark SQL无奈设置并行度和task数量的问题,咱们能够应用repartition算子。

repartition 算子应用前后比照图如下:

Spark SQL这一步的并行度和task数量必定是没有方法去扭转了,然而,对于Spark SQL查问进去的RDD,立刻应用repartition算子,去从新进行分区,这样能够从新分区为多个partition,从repartition之后的RDD操作,因为不再波及Spark SQL,因而stage的并行度就会等于你手动设置的值,这样就防止了Spark SQL所在的stage只能用大量的task去解决大量数据并执行简单的算法逻辑。应用repartition算子的前后比照如上图所示

8. reduceByKey本地预聚合

reduceByKey相较于一般的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,而后将数据写入给下个stage的每个task创立的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。

reduceByKey算子的执行过程如下图所示:

应用reduceByKey对性能的晋升如下:

  1. 本地聚合后,在map端的数据质变少,缩小了磁盘IO,也缩小了对磁盘空间的占用;
  2. 本地聚合后,下一个stage拉取的数据质变少,缩小了网络传输的数据量;
  3. 本地聚合后,在reduce端进行数据缓存的内存占用缩小;
  4. 本地聚合后,在reduce端进行聚合的数据量缩小。

基于reduceByKey的本地聚合特色,咱们应该思考应用reduceByKey代替其余的shuffle算子,例如groupByKey。

groupByKey与reduceByKey的运行原理如下图1和图2所示:

依据上图可知,groupByKey不会进行map端的聚合,而是将所有map端的数据shuffle到reduce端,而后在reduce端进行数据的聚合操作。因为reduceByKey有map端聚合的个性,使得网络传输的数据量减小,因而效率要显著高于groupByKey。

9. 应用长久化+checkpoint

Spark长久化在大部分状况下是没有问题的,然而有时数据可能会失落,如果数据一旦失落,就须要对失落的数据从新进行计算,计算完后再缓存和应用,为了防止数据的失落,能够抉择对这个RDD进行checkpoint,也就是将数据长久化一份到容错的文件系统上(比方HDFS)

一个RDD缓存并checkpoint后,如果一旦发现缓存失落,就会优先查看checkpoint数据存不存在,如果有,就会应用checkpoint数据,而不必从新计算。也即是说,checkpoint能够视为cache的保障机制,如果cache失败,就应用checkpoint的数据。

应用checkpoint的长处在于进步了Spark作业的可靠性,一旦缓存呈现问题,不须要从新计算数据,毛病在于,checkpoint时须要将数据写入HDFS等文件系统,对性能的耗费较大

长久化设置如下:

sc.setCheckpointDir(‘HDFS’)rdd.cache/persist(memory_and_disk)rdd.checkpoint

10. 应用播送变量

默认状况下,task中的算子中如果应用了内部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大耗费。一方面,如果后续对RDD进行长久化,可能就无奈将RDD数据存入内存,只能写入磁盘,磁盘IO将会重大耗费性能;另一方面,task在创建对象的时候,兴许会发现堆内存无奈寄存新创建的对象,这就会导致频繁的GC,GC会导致工作线程进行,进而导致Spark暂停工作一段时间,重大影响Spark性能。

假如当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task中产生500个正本,消耗集群10G的内存,如果应用了播送变量, 那么每个Executor保留一个正本,一共耗费400M内存,内存耗费缩小了5倍。

播送变量在每个Executor保留一个正本,此Executor的所有task共用此播送变量,这让变量产生的正本数量大大减少。

在初始阶段,播送变量只在Driver中有一份正本。task在运行的时候,想要应用播送变量中的数据,此时首先会在本人本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其余节点的BlockManager上近程拉取变量的复本,并由本地的BlockManager进行治理;之后此Executor的所有task都会间接从本地的BlockManager中获取变量。

对于多个Task可能会共用的数据能够播送到每个Executor上:

val 播送变量名= sc.broadcast(会被各个Task用到的变量,即须要播送的变量)播送变量名.value//获取播送变量

11. 应用Kryo序列化

默认状况下,Spark应用Java的序列化机制。Java的序列化机制使用方便,不须要额定的配置,在算子中应用的变量实现Serializable接口即可,然而,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间仍然较大。

Spark官网声称Kryo序列化机制比Java序列化机制性能进步10倍左右,Spark之所以没有默认应用Kryo作为序列化类库,是因为它不反对所有对象的序列化,同时Kryo须要用户在应用前注册须要序列化的类型,不够不便,但从Spark 2.0.0版本开始,简略类型、简略类型数组、字符串类型的Shuffling RDDs 曾经默认应用Kryo序列化形式了

Kryo序列化注册形式的代码如下:

public class MyKryoRegistrator implements KryoRegistrator{  @Override  public void registerClasses(Kryo kryo){    kryo.register(StartupReportLogs.class);  }}

配置Kryo序列化形式的代码如下:

//创立SparkConf对象val conf = new SparkConf().setMaster(…).setAppName(…)//应用Kryo序列化库conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  //在Kryo序列化库中注册自定义的类汇合conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 
本文首发于公众号:五分钟学大数据,回复【666】即可取得全套大数据笔面试教程