乐趣区

关于大数据:Spark性能调优RDD算子调优篇深度好文面试常问建议收藏

RDD 算子调优

不废话,间接进入正题!

1. RDD 复用

在对 RDD 进行算子时,要防止雷同的算子和计算逻辑之下对 RDD 进行反复的计算,如下图所示:

对上图中的 RDD 计算架构进行批改,失去如下图所示的优化后果:

2. 尽早 filter

获取到初始 RDD 后,应该思考 尽早地过滤掉不须要的数据,进而缩小对内存的占用,从而晋升 Spark 作业的运行效率。

本文首发于公众号:五分钟学大数据,欢送围观!回复【书籍】即可取得上百本大数据书籍

3. 读取大量小文件 - 用 wholeTextFiles

当咱们将一个文本文件读取为 RDD 时,输出的每一行都会成为 RDD 的一个元素。

也能够将多个残缺的文本文件一次性读取为一个 pairRDD,其中键是文件名,值是文件内容。

val input:RDD[String] = sc.textFile("dir/*.log") 

如果传递目录,则将目录下的所有文件读取作为 RDD。文件门路反对通配符。

然而这样对于大量的小文件读取效率并不高,应该应用 wholeTextFiles
返回值为 RDD[(String, String)],其中 Key 是文件的名称,Value 是文件的内容。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

wholeTextFiles 读取小文件:

val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

4. mapPartition 和 foreachPartition

  • mapPartitions

map(_….) 示意每一个元素

mapPartitions(_….) 示意每个分区的数据组成的迭代器

一般的 map 算子对 RDD 中的每一个元素进行操作,而 mapPartitions 算子对 RDD 中每一个分区进行操作。

如果是一般的 map 算子,假如一个 partition 有 1 万条数据,那么 map 算子中的 function 要执行 1 万次,也就是对每个元素进行操作。

如果是 mapPartition 算子,因为一个 task 解决一个 RDD 的 partition,那么一个 task 只会执行一次 function,function 一次接管所有的 partition 数据,效率比拟高。

比方,当要把 RDD 中的所有数据通过 JDBC 写入数据,如果应用 map 算子,那么须要对 RDD 中的每一个元素都创立一个数据库连贯,这样对资源的耗费很大,如果应用 mapPartitions 算子,那么针对一个分区的数据,只须要建设一个数据库连贯

mapPartitions 算子也存在一些毛病:对于一般的 map 操作,一次解决一条数据,如果在解决了 2000 条数据后内存不足,那么能够将曾经解决完的 2000 条数据从内存中垃圾回收掉;然而如果应用 mapPartitions 算子,但数据量十分大时,function 一次解决一个分区的数据,如果一旦内存不足,此时无奈回收内存,就可能会 OOM,即内存溢出。

因而,mapPartitions 算子实用于数据量不是特地大的时候,此时应用 mapPartitions 算子对性能的晋升成果还是不错的。(当数据量很大的时候,一旦应用 mapPartitions 算子,就会间接 OOM)

在我的项目中,应该首先估算一下 RDD 的数据量、每个 partition 的数据量,以及调配给每个 Executor 的内存资源,如果资源容许,能够思考应用 mapPartitions 算子代替 map。

  • foreachPartition

rrd.foreache(_….) 示意每一个元素

rrd.forPartitions(_….) 示意每个分区的数据组成的迭代器

在生产环境中,通常应用 foreachPartition 算子来实现数据库的写入,通过 foreachPartition 算子的个性,能够优化写数据库的性能。

如果应用 foreach 算子实现数据库的操作,因为 foreach 算子是遍历 RDD 的每条数据,因而,每条数据都会建设一个数据库连贯,这是对资源的极大节约,因而,对于写数据库操作,咱们该当应用 foreachPartition 算子

与 mapPartitions 算子十分类似,foreachPartition 是将 RDD 的每个分区作为遍历对象,一次解决一个分区的数据,也就是说,如果波及数据库的相干操作,一个分区的数据只须要创立一次数据库连贯,如下图所示:

应用了 foreachPartition 算子后,能够取得以下的性能晋升:

  1. 对于咱们写的 function 函数,一次解决一整个分区的数据;
  2. 对于一个分区内的数据,创立惟一的数据库连贯;
  3. 只须要向数据库发送一次 SQL 语句和多组参数;

在生产环境中,全副都会应用 foreachPartition 算子实现数据库操作。foreachPartition 算子存在一个问题,与 mapPartitions 算子相似,如果一个分区的数据量特地大,可能会造成 OOM,即内存溢出

5. filter+coalesce/repartition(缩小分区)

在 Spark 工作中咱们常常会应用 filter 算子实现 RDD 中数据的过滤,在工作初始阶段,从各个分区中加载到的数据量是相近的,然而一旦进过 filter 过滤后,每个分区的数据量有可能会存在较大差别,如下图所示:

依据上图咱们能够发现两个问题:

  1. 每个 partition 的数据质变小了,如果还依照之前与 partition 相等的 task 个数去解决以后数据,有点节约 task 的计算资源;
  2. 每个 partition 的数据量不一样,会导致前面的每个 task 解决每个 partition 数据的时候,每个 task 要解决的数据量不同,这很有可能导致数据歪斜问题。

如上图所示,第二个分区的数据过滤后只剩 100 条,而第三个分区的数据过滤后剩下 800 条,在雷同的解决逻辑下,第二个分区对应的 task 解决的数据量与第三个分区对应的 task 解决的数据量差距达到了 8 倍,这也会导致运行速度可能存在数倍的差距,这也就是 数据歪斜问题

针对上述的两个问题,咱们别离进行剖析:

  1. 针对第一个问题,既然分区的数据质变小了,咱们心愿能够对分区数据进行重新分配,比方将原来 4 个分区的数据转化到 2 个分区中,这样只须要用前面的两个 task 进行解决即可,防止了资源的节约。
  2. 针对第二个问题,解决办法和第一个问题的解决办法十分类似,对分区数据重新分配,让每个 partition 中的数据量差不多,这就防止了数据歪斜问题。

那么具体应该如何实现下面的解决思路?咱们须要 coalesce 算子。

repartition 与 coalesce 都能够用来进行重分区,其中 repartition 只是 coalesce 接口中 shuffle 为 true 的繁难实现,coalesce 默认状况下不进行 shuffle,然而能够通过参数进行设置。

假如咱们心愿将本来的分区个数 A 通过从新分区变为 B,那么有以下几种状况:

  1. A > B(少数分区合并为多数分区)

    • A 与 B 相差值不大

      此时应用 coalesce 即可,无需 shuffle 过程。

    • A 与 B 相差值很大

      此时能够应用 coalesce 并且不启用 shuffle 过程,然而会导致合并过程性能低下,所以举荐设置 coalesce 的第二个参数为 true,即启动 shuffle 过程。

  2. A < B(多数分区合成为少数分区)

此时应用 repartition 即可,如果应用 coalesce 须要将 shuffle 设置为 true,否则 coalesce 有效。

咱们能够在 filter 操作之后,应用 coalesce 算子针对每个 partition 的数据量各不相同的状况,压缩 partition 的数量,而且让每个 partition 的数据量尽量平均紧凑,以便于前面的 task 进行计算操作,在某种程度上可能在肯定水平上晋升性能

留神:local 模式是过程内模仿集群运行,曾经对并行度和分区数量有了肯定的外部优化,因而不必去设置并行度和分区数量。

6. 并行度设置

Spark 作业中的并行度指各个 stage 的 task 的数量

如果并行度设置不合理而导致并行度过低,会导致资源的极大节约,例如,20 个 Executor,每个 Executor 调配 3 个 CPU core,而 Spark 作业有 40 个 task,这样每个 Executor 调配到的 task 个数是 2 个,这就使得每个 Executor 有一个 CPU core 闲暇,导致资源的节约。

现实的并行度设置,应该是让并行度与资源相匹配,简略来说就是在资源容许的前提下,并行度要设置的尽可能大,达到能够充分利用集群资源。正当的设置并行度,能够晋升整个 Spark 作业的性能和运行速度。

Spark 官网举荐,task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。之所以没有举荐 task 数量与 CPU core 总数相等,是因为 task 的执行工夫不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行实现后,会呈现 CPU core 闲暇的状况。如果 task 数量设置为 CPU core 总数的 2~3 倍,那么一个 task 执行结束后,CPU core 会立即执行下一个 task,升高了资源的节约,同时晋升了 Spark 作业运行的效率。

Spark 作业并行度的设置如下:

val conf = new SparkConf().set("spark.default.parallelism", "500")

准则:** 让 cpu 的 Core(cpu 外围数)充分利用起来,
如有 100 个 Core, 那么并行度能够设置为 200~300**。

7. repartition/coalesce 调节并行度

咱们晓得 Spark 中有并行度的调节策略,然而,并行度的设置对于 Spark SQL 是不失效的,用户设置的并行度只对于 Spark SQL 以外的所有 Spark 的 stage 失效

Spark SQL 的并行度不容许用户本人指定,Spark SQL 本人会默认依据 hive 表对应的 HDFS 文件的 split 个数主动设置 Spark SQL 所在的那个 stage 的并行度,用户本人通 spark.default.parallelism 参数指定的并行度,只会在没 Spark SQL 的 stage 中失效。

因为 Spark SQL 所在 stage 的并行度无奈手动设置,如果数据量较大,并且此 stage 中后续的 transformation 操作有着简单的业务逻辑,而 Spark SQL 主动设置的 task 数量很少,这就意味着每个 task 要解决为数不少的数据量,而后还要执行非常复杂的解决逻辑,这就可能体现为第一个有 Spark SQL 的 stage 速度很慢,而后续的没有 Spark SQL 的 stage 运行速度十分快。

为了解决 Spark SQL 无奈设置并行度和 task 数量的问题,咱们能够应用 repartition 算子。

repartition 算子应用前后比照图如下:

Spark SQL 这一步的并行度和 task 数量必定是没有方法去扭转了,然而,对于 Spark SQL 查问进去的 RDD,立刻应用 repartition 算子,去从新进行分区,这样能够从新分区为多个 partition,从 repartition 之后的 RDD 操作,因为不再波及 Spark SQL,因而 stage 的并行度就会等于你手动设置的值,这样就防止了 Spark SQL 所在的 stage 只能用大量的 task 去解决大量数据并执行简单的算法逻辑。应用 repartition 算子的前后比照如上图所示

8. reduceByKey 本地预聚合

reduceByKey 相较于一般的 shuffle 操作一个显著的特点就是会进行 map 端的本地聚合,map 端会先对本地的数据进行 combine 操作,而后将数据写入给下个 stage 的每个 task 创立的文件中,也就是在 map 端,对每一个 key 对应的 value,执行 reduceByKey 算子函数。

reduceByKey 算子的执行过程如下图所示:

应用 reduceByKey 对性能的晋升如下:

  1. 本地聚合后,在 map 端的数据质变少,缩小了磁盘 IO,也缩小了对磁盘空间的占用;
  2. 本地聚合后,下一个 stage 拉取的数据质变少,缩小了网络传输的数据量;
  3. 本地聚合后,在 reduce 端进行数据缓存的内存占用缩小;
  4. 本地聚合后,在 reduce 端进行聚合的数据量缩小。

基于 reduceByKey 的本地聚合特色,咱们应该思考应用 reduceByKey 代替其余的 shuffle 算子,例如 groupByKey。

groupByKey 与 reduceByKey 的运行原理如下图 1 和图 2 所示:

依据上图可知,groupByKey 不会进行 map 端的聚合,而是将所有 map 端的数据 shuffle 到 reduce 端,而后在 reduce 端进行数据的聚合操作。因为 reduceByKey 有 map 端聚合的个性,使得网络传输的数据量减小,因而效率要显著高于 groupByKey。

9. 应用长久化 +checkpoint

Spark 长久化在大部分状况下是没有问题的,然而有时数据可能会失落,如果数据一旦失落,就须要对失落的数据从新进行计算,计算完后再缓存和应用,为了防止数据的失落,能够抉择对这个 RDD 进行 checkpoint,也就是 将数据长久化一份到容错的文件系统上(比方 HDFS)

一个 RDD 缓存并 checkpoint 后,如果一旦发现缓存失落,就会优先查看 checkpoint 数据存不存在,如果有,就会应用 checkpoint 数据,而不必从新计算。也即是说,checkpoint 能够视为 cache 的保障机制,如果 cache 失败,就应用 checkpoint 的数据。

应用 checkpoint 的 长处在于进步了 Spark 作业的可靠性,一旦缓存呈现问题,不须要从新计算数据,毛病在于,checkpoint 时须要将数据写入 HDFS 等文件系统,对性能的耗费较大

长久化设置如下:

sc.setCheckpointDir(‘HDFS’)
rdd.cache/persist(memory_and_disk)
rdd.checkpoint

10. 应用播送变量

默认状况下,task 中的算子中如果应用了内部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大耗费。一方面,如果后续对 RDD 进行长久化,可能就无奈将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会重大耗费性能;另一方面,task 在创建对象的时候,兴许会发现堆内存无奈寄存新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程进行,进而导致 Spark 暂停工作一段时间,重大影响 Spark 性能。

假如当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在 500 个 task 中产生 500 个正本,消耗集群 10G 的内存,如果应用了播送变量,那么每个 Executor 保留一个正本,一共耗费 400M 内存,内存耗费缩小了 5 倍。

播送变量在每个 Executor 保留一个正本,此 Executor 的所有 task 共用此播送变量,这让变量产生的正本数量大大减少。

在初始阶段,播送变量只在 Driver 中有一份正本。task 在运行的时候,想要应用播送变量中的数据,此时首先会在本人本地的 Executor 对应的 BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其余节点的 BlockManager 上近程拉取变量的复本,并由本地的 BlockManager 进行治理;之后此 Executor 的所有 task 都会间接从本地的 BlockManager 中获取变量。

对于多个 Task 可能会共用的数据能够播送到每个 Executor 上:

val 播送变量名 = sc.broadcast(会被各个 Task 用到的变量, 即须要播送的变量)

播送变量名.value// 获取播送变量

11. 应用 Kryo 序列化

默认状况下,Spark 应用 Java 的序列化机制。Java 的序列化机制使用方便,不须要额定的配置,在算子中应用的变量实现 Serializable 接口即可,然而,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间仍然较大。

Spark 官网声称 Kryo 序列化机制比 Java 序列化机制 性能进步 10 倍左右 ,Spark 之所以没有默认应用 Kryo 作为序列化类库,是因为 它不反对所有对象的序列化 ,同时 Kryo 须要用户在应用前注册须要序列化的类型,不够不便, 但从 Spark 2.0.0 版本开始,简略类型、简略类型数组、字符串类型的 Shuffling RDDs 曾经默认应用 Kryo 序列化形式了

Kryo 序列化注册形式的代码如下:

public class MyKryoRegistrator implements KryoRegistrator{
  @Override
  public void registerClasses(Kryo kryo){kryo.register(StartupReportLogs.class);
  }
}

配置 Kryo 序列化形式的代码如下:

// 创立 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
// 应用 Kryo 序列化库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
// 在 Kryo 序列化库中注册自定义的类汇合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 

本文首发于公众号:五分钟学大数据,回复【666】即可取得全套大数据笔面试教程

退出移动版