乐趣区

Spark-Core-解析RDD

引言

Spark Core 是 Spark 的核心部分,是 Spark SQL,Spark Streaming,Spark MLlib 等等其他模块的基础, Spark Core 提供了开发分布式应用的脚手架,使得其他模块或应用的开发者不必关心复杂的分布式计算如何实现,只需使用 Spark Core 提供的分布式数据结构 RDD 及丰富的算子 API,以类似开发单机应用的方式来进行开发。

图中最下面那个就是 Spark Core 啦,日常使用的 RDD 相关的 API 就属于 Spark Core,而 Dataset、DataFrame 则属于 Spark SQL。

RDD 概览

RDD 是 Spark Core 的用户级 API,了解 RDD 是了解 Spark Core 的第一步,本文基于 Spark 2.x,主要对 RDD 的特点和组成进行分析。

定义

RDD (Resilient Distributed Dataset,弹性分布式数据集):

  • Resilient:不可变的、容错的
  • Distributed:数据分散在不同节点(机器,进程)
  • Dataset:一个由多个分区组成的数据集

特征

In-Memory:RDD 会优先使用内存
Immutable(Read-Only):一旦创建不可修改
Lazy evaluated:惰性执行
Cacheable:可缓存,可复用
Parallel:可并行处理
Typed:强类型,单一类型数据
Partitioned:分区的
Location-Stickiness:可指定分区优先使用的节点

是 Spark 中最核心的数据抽象,数据处理和计算基本都是基于 RDD。

组成

一个 RDD 通常由 5 个要素组成:

  • 一组分区(partition)
  • 一个计算函数
  • 一组依赖(直接依赖的父 RDD)
  • 一个分区器 (可选)
  • 一组优先计算位置(e.g. 将 Task 分配至靠近 HDFS 块的节点进行计算) (可选)

与传统数据结构对比,只关心访问,不关心存储。通过迭代器访问数据,只要数据能被不重复地访问即可。
后面会详细分析各要素。

算子

算子,即对 RDD 进行变换的操作,按照是否触发 Job 提交可以分为两大类:

  • transformation:不会立即执行的一类变换,不会触发 Job 执行,会生成并返回新的 RDD,同时记录下依赖关系。如:map,filter,union,join,reduceByKey。
  • action: 会立即提交 Job 的一类变换,不会返回新的 RDD,而是直接返回计算结果。如:count,reduce,foreach。

下面对 RDD 的组成要素进行分析

Partition & Partitioner

为什么要把数据分区?
把数据分成若干 partition 是为了将数据分散到不同节点不同线程,从而能进行分布式的多线程的并行计算。

按什么规则分区?
RDD 从数据源生成的时候,数据通常是随机分配到不同的 partition 或者保持数据源的分区,如 sc.parallelize(…),sc.textFile(…)。

这对于某些 RDD 操作来说是没有问题的,比如 filter(),map(),flatMap(),rdd.union(otherRDD),rdd.intersection(otherRDD),
rdd.subtract(otherRDD)。

但是对于 reduceByKey(),foldByKey(),combineByKey(),groupByKey(),sortByKey(),cogroup(), join() ,leftOuterJoin(), rightOuterJoin()这些操作,随机分配分区就非常不友好,会带来很多额外的网络传输。影响一个分布式计算系统性能的最大敌人就是网络传输,所以必须尽量最小化网络传输。

为了减少网络传输,怎么分区才合理?
对于 reduceByKey 操作应该把相同 key 的数据放到同一分区;
对于 sortByKey 操作应该把同一范围的数据放到同一分区。

可见不同的操作适合不同的数据分区规则,Spark 将划分规则抽象为Partitioner(分区器),分区器的核心作用是决定数据应归属的分区,本质就是计算数据对应的分区 ID。

在 Spark Core 中内置了 2 个 Partitioner 来支持常用的分区规则(Spark MLlib,Spark SQL 中有其他的)。

  • HashPartitioner 哈希分区器
  • RangePartitioner 范围分区器

HashPartitioner

哈希分区器是默认的分区器,也是使用最广泛的一个,作用是将数据按照 key 的 hash 值进行分区。

分区 ID 计算公式非常简单:key 的 hash 值 % 分区个数,如果 key 为 null,则返回 0.

也就是将 key 的 hash 值(Java 中每个对象都有 hash code, 对象相等则 hash code 相同),除以分区个数,取余数为分区 ID,这样能够保证相同 Key 的数据被分到同一个分区,但是每个分区的数据量可能会相差很大,出现数据倾斜。

RangePartitioner

RangePartitioner 的作用是根据 key,将数据按范围大致平均的分到各个分区,只支持能排序的 key。

要知道一个 key 属于哪个分区,需要知道每个分区的边界值。
确定边界值需要对数据进行排序,因为数据量通常较大,通过样本替代总体来估计每个分区的边界值。

采样流程:

    1. 使用水塘抽样对总体进行采样;
    1. 针对数据量远超平均值的分区,进行传统抽样(伯努利抽样)。

使用场景:sortByKey

如何使用

对于一个没有明确指定 Partitioner 的情况下,
reduceByKey(),foldByKey(),combineByKey(),groupByKey()等操作会默认使用 HashPartitioner。
sortByKey 操作会采用 RangePartitioner。

reduceByKey 也有一个可以自定义分区器的版本:reduceByKey(partitioner: Partitioner, func: (V, V) => V)

Function

传入给 transformation 的函数

transformation 会生成新的 RDD,传给 RDD transformation 的函数最终会以成员变量的形式存储在新生成的 RDD 中。

以 map 函数为例。

val r11 = r00.map(n => (n, n))

map 函数接受的参数类型为 f: T => U,因为 Scala 支持函数式编程,函数可以像值一样存储在变量中,也可以作为参数传递。
f 参数的类型T => U 代表一种函数类型,这个函数的输入参数的类型必须为 T,输出类型为 U,这里 T 和 U 都是泛型,T 代表 RDD 中数据的类型,对于 RDD[String]来说,T 就是 String。

最终 f 参数,会转换成有关迭代器的一个函数,存储到 RDD 的 f 成员变量中。

最终存储的类型为:
f: (TaskContext, Int, Iterator[T]) => Iterator[U]
对于 map 来说是这样一个函数
(context, pid, iter) => iter.map(f)
也就是说我们传入到 RDD.map 的 f 函数,最终传给了 Iterator.map 函数。

传入给 action 的函数

action 不会生成新的 RDD,而是将函数传递给 Job。

Dependency

当 RDD1 经过 transformation 生成了 RDD2,就称作 RDD2 依赖 RDD1,RDD1 是 RDD2 的父 RDD,他们是父子关系。

先看一个例子

val r00 = sc.parallelize(0 to 9)
val r01 = sc.parallelize(0 to 90 by 10)
val r10 = r00 cartesian r01
val r11 = r00.map(n => (n, n))
val r12 = r00 zip r01
val r13 = r01.keyBy(_ / 20)
val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)

我们看下 RDD 之间的依赖关系图

RDD 的依赖关系网又叫 RDD 的血统(lineage),可以看做是 RDD 的逻辑执行计划。

Dependency 存储

父 RDD 与子 RDD 之间的依赖关系记录在 子 RDD的属性中(deps: Seq[Dependency[_]]),数据类型为 Dependency(可以有多个),Dependency 中保存了父 RDD 的引用,这样通过 Dependency 就能找到父 RDD。

Dependency 分类

Dependency 不仅描述了 RDD 之间的依赖关系,还进一步描述了不同 RDD 的 partition 之间的依赖关系。

依据 partition 之间依赖关系的不同 Dependency 分为两大类:

  • NarrowDependency 窄依赖,1 个父分区只对应 1 个子分区,这时父 RDD 不需要改变分区方式。如:map、filter、union,co-paritioned join
  • ShuffleDependency Shuffle 依赖(宽依赖),1 个父分区对应多个子分区,这种情况父 RDD 必须重新分区,才能符合子 RDD 的需求。如:groupByKey、reduceByKey、sortByKey,(not co-paritioned)join

NarrowDependency

NarrowDependency 是一个抽象类,一共有 3 中实现类,也就是说有 3 种 NarrowDependency。

  • OneToOneDependency:一对一依赖,比如 map,
  • RangeDependency:范围依赖,如 union
  • PruneDependency:裁剪依赖,过滤掉部分分区,如 PartitionPruningRDD

ShuffleDependency

出现 shuffle 依赖表示父 RDD 与子 RDD 的分区方式发生了变化。

RDD 分类

RDD 的具体实现类有几十种(大概 60+),介绍下最常见的几种。

scala> r20.toDebugString
res34: String =
(28) UnionRDD[38] at union at <pastie>:31 []
 |   UnionRDD[37] at union at <pastie>:31 []
 |   UnionRDD[36] at union at <pastie>:31 []
 |   CartesianRDD[32] at cartesian at <pastie>:27 []
 |   ParallelCollectionRDD[30] at parallelize at <pastie>:25 []
 |   ParallelCollectionRDD[31] at parallelize at <pastie>:26 []
 |   MapPartitionsRDD[33] at map at <pastie>:28 []
 |   ParallelCollectionRDD[30] at parallelize at <pastie>:25 []
 |   ZippedPartitionsRDD2[34] at zip at <pastie>:29 []
 |   ParallelCollectionRDD[30] at parallelize at <pastie>:25 []
 |   ParallelCollectionRDD[31] at parallelize at <pastie>:26 []
 |   MapPartitionsRDD[35] at keyBy at <pastie>:30 []
 |   ParallelCollectionRDD[31] at parallelize at <pastie>:26 []

不同的 RDD 代表着不同的‘计算模式’:
MapPartitionsRDD,对 Iterator 的每个值应用相同的函数;

ShuffledRDD,对 Iterator 执行 combineByKey 的模式,可以指定
createCombiner: V => C,mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, compute 函数返回 ShuffleReader 生成的迭代器。

MapPartitionsRDD

MapPartitionsRDD 对于父 RDD 的依赖类型只能是 OneToOneDependency,代表将函数应用到每一个分区的计算。

相关 transformation:map, flatMap, filter, mapPartitions 等等

scala> sc.parallelize(0 to 10000).map(x=>(x%9,1)).dependencies
res35: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@7c6843f)

ShuffledRDD

对于父 RDD 的依赖类型只能是 ShuffleDependency,代表需要改变分区方式进行 shuffle 的计算。

会创建 ShuffledRDD 的 transformation:
RDD:coalesce
PairRDDFunctions:reduceByKey, combineByKeyWithClassTag,partitionBy (分区方式不同时) 等
OrderedRDDFunctions:sortByKey,repartitionAndSortWithinPartitions

RDD Checkpoint

Checkpoint 检查点,是一种截断 RDD 依赖链,并把 RDD 数据持久化到存储系统 (通常是 HDFS 或本地) 的过程。
主要作用是截断 RDD 依赖关系,防止 stack overflow(与 DAG 递归调用有关)。
存储的数据包括 RDD 计算后的数据和 partitioner。

Checkpoint 分为两种:

  • reliable:调用函数为 RDD.checkpoint(),数据保存到可靠存储 HDFS,RDD 的 parent 替换为 ReliableCheckpointRDD;
  • local:调用函数为 RDD.localCheckpoint(),数据保存到 spark cache 中(不是本地),RDD 的 parent 替换为 LocalCheckpointRDD。当 executor 挂掉,数据会丢失。

注意:与 streaming 中的 checkpointing 不同,streaming 中的 checkpointing 会同时保存元数据和 RDD 数据,可以用于 Application 容错。

如何使用

scala> :paste
// Entering paste mode (ctrl-D to finish)

val a=sc.parallelize(0 to 9)
val b=a.map(_*10)
val c=b.filter(_>10)

// Exiting paste mode, now interpreting.

a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:26

scala> c.toDebugString
res0: String =
(4) MapPartitionsRDD[2] at filter at <console>:26 []
 |  MapPartitionsRDD[1] at map at <console>:25 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:24 []
 
 
scala> sc.setCheckpointDir("/tmp/spark-checkpoint")

scala> b.checkpoint

scala> b.count
res4: Long = 10

scala> c.toDebugString
res5: String =
(4) MapPartitionsRDD[2] at filter at <console>:26 []
 |  MapPartitionsRDD[1] at map at <console>:25 []
 |  ReliableCheckpointRDD[3] at count at <console>:26 []
 
scala> b.toDebugString
res6: String =
(4) MapPartitionsRDD[1] at map at <console>:25 []
 |  ReliableCheckpointRDD[3] at count at <console>:26 [] 
 
//local 
scala> c.localCheckpoint
scala> c.count
res9: Long = 8

scala> c.toDebugString
res10: String =
(4) MapPartitionsRDD[2] at filter at <console>:26 [Disk Memory Deserialized 1x Replicated]
 |       CachedPartitions: 4; MemorySize: 104.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  LocalCheckpointRDD[4] at count at <console>:26 [Disk Memory Deserialized 1x Replicated]

查看 HDFS 上存储的 checkpoint 文件

hdfs dfs -ls /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1
Found 4 items
-rw-r--r--   2 ld-liuyuan_su hdfs         91 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00000
-rw-r--r--   2 ld-liuyuan_su hdfs        101 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00001
-rw-r--r--   2 ld-liuyuan_su hdfs         91 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00002
-rw-r--r--   2 ld-liuyuan_su hdfs        101 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00003

RDD Cache

Cache 机制是 Spark 提供的一种将数据缓存到内存 (或磁盘) 的机制,
主要用途是使得中间计算结果可以被重用。

常见的使用场景有如下几种,底层都是调用 RDD 的 cache,这里只讲 RDD 的 cache。

rdd.cache()
dataset.cache()
spark.sql("cache table test.test")
...

Spark 的 Cache 不仅能将数据缓存到内存,也能使用磁盘,甚至同时使用内存和磁盘,这种缓存的不同存储方式,称作‘StorageLevel(存储级别)’。

可以这样使用:rdd.persist(StorageLevel.MEMORY_ONLY)

Spark 目前支持的存储级别如下:

NONE (default)
DISK_ONL
DISK_ONLY_2
MEMORY_ONLY (cache 操作使用的级别)
MEMORY_ONLY_2
MEMORY_ONLY_SER
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP

2代表存储份数为 2,也就是有个备份存储。
SER代表存储序列化后的数据。

DISK_ONLY 后面没跟 SER,但其实只能是存储序列化后的数据。

要 cache RDD, 常用到两个函数, cache()persist(),cache 方法本质上是persist(StorageLevel.MEMORY_ONLY),也就是说 persist 可以指定 StorageLevel,而 cache 不行。

Checkpoint vs Cache

  • Cache 用于缓存,采用临时保存,Executor 挂掉会导致数据丢失,但是数据可以重新计算。
  • Checkpoint 用于截断依赖链,reliable 方式下 Executor 挂掉不会丢失数据,数据一旦丢失不可恢复。

使用的时候要想清楚目的,就不会用错啦。

RDD Broadcast

一种将数据在不同节点间共享的机制,可以将指定的 只读 数据广播分发到每个 Executor,每个 Executor 有一份完整的备份。

是一种高效的数据共享机制,被广播的数据可以被不同的 stage 和 task 共享,而不需要给每个 task 拷贝一份。

Broadcast 机制有个非常重要的作用,Spark 就是通过它将 task 分发给各个 Executor。

下面举个使用的例子

rddA

k low
1 a
2 b
3 c

rddB

k up
1 A
2 B
3 C

rddAB

k low up
1 a A
2 b B
3 c C
scala> val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
rddA: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
rddB: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rddAB=rddA.join(rddB)
rddAB: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[9] at join at <console>:27

scala> rddAB.collect
res11: Array[(Int, (String, String))] = Array((1,(a,A)), (2,(b,B)), (3,(c,C)))

scala> rddAB.toDebugString
res12: String =
(4) MapPartitionsRDD[9] at join at <console>:27 []
 |  MapPartitionsRDD[8] at join at <console>:27 []
 |  CoGroupedRDD[7] at join at <console>:27 []
 +-(4) ParallelCollectionRDD[5] at parallelize at <console>:24 []
 +-(4) ParallelCollectionRDD[6] at parallelize at <console>:24 []

scala> val rddBMap=sc.broadcast(rddB.collectAsMap)
rddBMap: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,String]] = Broadcast(9)

scala> val rddABMapJoin= rddA.map{case(k,v) => (k,(v,rddBMap.value.get(k).get))}
rddABMapJoin: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[10] at map at <console>:27

scala> rddABMapJoin.collect
res13: Array[(Int, (String, String))] = Array((1,(a,A)), (2,(b,B)), (3,(c,C)))

scala> rddABMapJoin.toDebugString
res14: String =
(4) MapPartitionsRDD[10] at map at <console>:27 []
 |  ParallelCollectionRDD[5] at parallelize at <console>:24 []

通过 broadcast 机制,将原本的两个 stage 计算减少为 1 个 stage。
这里模拟实现了 map-side join。

Broadcast VS Cache

Cache 也会把数据分发到各个节点,但是一个节点上通常只有部分分区的数据,而 Broadcast 会保证每个节点都有完整的数据。
Broadcast 会消耗更多的内存,但是带来了更好的性能。

RDD Accumulators

Broadcast 机制有个短板,它的变量是只读的,于是 Spark 提供了 Accumulators(累加器)来弥补。

Accumulator 的值可以增减,但是不能直接修改为指定值。

scala> val acc=sc.longAccumulator
acc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 200, name: None, value: 0)

scala> rddA.map(_=>acc.add(-1)).count
res15: Long = 3

scala> acc.value
res17: Long = -3

# 参考

Spark RDDs Simplified

Understanding Spark Partitioning

Checkpointing

Spark 内核设计的艺术

转载请注明原文地址:https://liam-blog.ml/2019/10/23/spark-core-rdd/

本文由博客一文多发平台 OpenWrite 发布!

退出移动版