该笔记来自 美团 spark性能优化指南

开发准则

Broadcast与map联合应用 代替原始 join

小表 broadcast 到每个excutor, 各个excutor本地间接调用小表,防止shuffle

// 传统的join操作会导致shuffle操作。 // 因为两个RDD中,雷同的key都须要通过网络拉取到一个节点上,由一个task进行join操作。 val rdd3 = rdd1.join(rdd2) // Broadcast+map的join操作,不会导致shuffle操作。 // 应用Broadcast将一个数据量较小的RDD作为播送变量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // 在rdd1.map算子中,能够从rdd2DataBroadcast中,获取rdd2的所有数据。 // 而后进行遍历,如果发现rdd2中某条数据的key与rdd1的以后数据的key是雷同的,那么就断定能够进行join。// 此时就能够依据本人须要的形式,将rdd1以后数据与rdd2中能够连贯的数据,拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) // 留神,以上操作,倡议仅仅在rdd2的数据量比拟少(比方几百M,或者一两G)的状况下应用。// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

尽量应用map-side预聚合的算子

所谓的map-side预聚合,说的是在每个节点本地对雷同的key进行一次聚合操作,相似于MapReduce中的本地combiner。
如应用reduceByKey或aggregateByKey 代替groupByKey

应用foreachPartitions代替foreach

一次函数调用解决一个partition的所有数据,而不是一次函数调用解决一条数据。如在foreach函数中,将RDD中所有数据写MySQL,那么如果是一般的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创立一个数据库连贯,此时就势必会频繁地创立和销毁数据库连贯,性能是十分低下;然而如果用foreachPartitions算子一次性解决一个partition的数据,那么对于每个partition,只有创立一个数据库连贯即可,而后执行批量插入操作,此时性能是比拟高的。实际中发现,对于1万条左右的数据量写MySQL,性能能够晋升30%以上

filter后应用coalesce 缩小分区

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比方30%以上的数据),倡议应用coalesce算子,手动缩小RDD的partition数量,将RDD中的数据压缩到更少的partition中去

应用repartitionAndSortWithinPartitions代替repartition与sort类操作

repartitionAndSortWithinPartitions是Spark官网举荐的一个算子,官网倡议,如果须要在repartition重分区之后,还要进行排序,倡议间接应用repartitionAndSortWithinPartitions算子。因为该算子能够一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

应用Kryo优化序列化性能

Spark中,次要有三个中央波及到了序列化:

  • 在算子函数中应用到内部变量时,该变量会被序列化后进行网络传输(见“准则七:播送大变量”中的解说)。
  • 将自定义的类型作为RDD的泛型类型时(比方JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因而这种状况下,也要求自定义的类必须实现Serializable接口。
  • 应用可序列化的长久化策略时(比方MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

Spark同时反对应用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官网介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Kryo要求最好要注册所有须要进行序列化的自定义类型,因而对于开发者来说,这种形式比拟麻烦。

// 创立SparkConf对象。 val conf = new SparkConf().setMaster(...).setAppName(...)// 设置序列化器为KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册要序列化的自定义类型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

数据结构

尽量应用字符串代替对象;
应用原始类型(比方Int、Long)代替字符串;
应用数组代替汇合类型;
这样尽可能地缩小内存占用,从而升高GC频率,晋升性能。

调优配置

num-executors

  • 参数阐明:该参数用于设置Spark作业总共要用多少个Executor过程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能依照你的设置来在集群的各个工作节点上,启动相应数量的Executor过程。这个参数十分之重要,如果不设置的话,默认只会给你启动大量的Executor过程,此时你的Spark作业的运行速度是十分慢的。
  • 参数调优倡议:每个Spark作业的运行个别设置50~100个左右的Executor过程比拟适合,设置太少或太多的Executor过程都不好。设置的太少,无奈充分利用集群资源;设置的太多的话,大部分队列可能无奈给予充沛的资源。

executor-memory

  • 参数阐明:该参数用于设置每个Executor过程的内存。Executor内存的大小,很多时候间接决定了Spark作业的性能,而且跟常见的JVM OOM异样,也有间接的关联。
  • 参数调优倡议:每个Executor过程的内存设置4G~8G较为适合。然而这只是一个参考值,具体的设置还是得依据不同部门的资源队列来定。能够看看本人团队的资源队列的最大内存限度是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,防止你本人的Spark作业占用了队列所有的资源,导致别的同学的作业无奈运行。

executor-cores

  • 参数阐明:该参数用于设置每个Executor过程的CPU core数量。这个参数决定了每个Executor过程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因而每个Executor过程的CPU core数量越多,越可能疾速地执行完调配给本人的所有task线程。
  • 参数调优倡议:Executor的CPU core数量设置为2~4个较为适合。同样得依据不同部门的资源队列来定,能够看看本人的资源队列的最大CPU core限度是多少,再根据设置的Executor数量,来决定每个Executor过程能够调配到几个CPU core。同样倡议,如果是跟别人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比拟适合,也是防止影响其他同学的作业运行。

driver-memory

  • 参数阐明:该参数用于设置Driver过程的内存。
  • 参数调优倡议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。惟一须要留神的一点是,如果须要应用collect算子将RDD的数据全副拉取到Driver上进行解决,那么必须确保Driver的内存足够大,否则会呈现OOM内存溢出的问题。

spark.default.parallelism

  • 参数阐明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会间接影响你的Spark作业性能。
  • 参数调优倡议:Spark作业的默认task数量为500~1000个较为适合。很多同学常犯的一个谬误就是不去设置这个参数,那么此时就会导致Spark本人依据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比方就几十个task),如果task数量偏少的话,就会导致你后面设置好的Executor的参数都半途而废。试想一下,无论你的Executor过程有多少个,内存和CPU有多大,然而task只有1个或者10个,那么90%的Executor过程可能基本就没有task执行,也就是白白浪费了资源!因而Spark官网倡议的设置准则是,设置该参数为num-executors * executor-cores的2~3倍较为适合,比方Executor的总CPU core数量为300个,那么设置1000个task是能够的,此时能够充沛地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数阐明:该参数用于设置RDD长久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,能够用来保留长久化的RDD数据。依据你抉择的不同的长久化策略,如果内存不够时,可能数据就不会长久化,或者数据会写入磁盘。
  • 参数调优倡议:如果Spark作业中,有较多的RDD长久化操作,该参数的值能够适当进步一些,保障长久化的数据可能包容在内存中。防止内存不够缓存所有的数据,导致数据只能写入磁盘中,升高了性能。然而如果Spark作业中的shuffle类操作比拟多,而长久化操作比拟少,那么这个参数的值适当升高一些比拟适合。此外,如果发现作业因为频繁的gc导致运行迟缓(通过spark web ui能够察看到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样倡议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数阐明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输入后,进行聚合操作时可能应用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现应用的内存超出了这个20%的限度,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地升高性能。
  • 参数调优倡议:如果Spark作业中的RDD长久化操作较少,shuffle操作较多时,倡议升高长久化操作的内存占比,进步shuffle操作的内存占比比例,防止shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,升高了性能。此外,如果发现作业因为频繁的gc导致运行迟缓,意味着task执行用户代码的内存不够用,那么同样倡议调低这个参数的值。

spark.sql.shuffle.partitions

能够缓解数据歪斜

该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小.
减少shuffle read task的数量,能够让本来调配给一个task的多个key调配给多个task,从而让每个task解决比原来更少的数据。举例来说,如果本来有5个key,每个key对应10条数据,这5个key都是调配给一个task的,那么这个task就要解决50条数据。而减少了shuffle read task当前,每个task就调配到一个key,即每个task就解决10条数据,那么天然每个task的执行工夫都会变短了.

配置参考

./bin/spark-submit \--master yarn-cluster \--num-executors 100 \--executor-memory 6G \--executor-cores 4 \--driver-memory 1G \--conf spark.default.parallelism=1000 \--conf spark.storage.memoryFraction=0.5 \--conf spark.shuffle.memoryFraction=0.3 \

数据歪斜

数据歪斜产生时的景象

  1. 绝大多数task执行得都十分快,但个别task执行极慢。比方,总共有1000个task,997个task都在1分钟之内执行完了,然而残余两三个task却要一两个小时。这种状况很常见。
  2. 本来可能失常执行的Spark作业,某天忽然报出OOM(内存溢出)异样,察看异样栈,是咱们写的业务代码造成的。这种状况比拟少见。