Spark系列文章四Spark之RDD

37次阅读

共计 6545 个字符,预计需要花费 17 分钟才能阅读完成。

Spark 系列文章(四):Spark 之 RDD

作者:studytime
原文:https://www.studytime.xin

一、RDD 的概述

1.1、RDD 是什么?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

1.2、RDD 的主要属性?

RDD 是 Spark 中最基本的数据抽象,是一个逻辑概念,它可能并不对应次磁盘或内存中的物理数据,而仅仅是记录了 RDD 的由来, 父 RDD 是谁,以及怎样从父 RDD 计算而来。

spark 源码里面对 RDD 的描述:

Internally, each RDD is characterized by five main properties:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

可以知道,每个 RDD 有以下五部分构成:

  • 一组分片(Partition),即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。
  • 每个 Partition 的计算函数。Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
  • RDD 之间的依赖关系。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。
  • (可选的)对于 key-value 类型的 RDD, 则包含一个 Partitioner,即 RDD 的分片函数。当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于 key-value 的 RDD,才会有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决定了 parent RDD Shuffle 输出时的分片数量。
  • (可选的)计算每个 Parition 所倾向的节点位置。存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD 是一个应用层面的逻辑概念。一个 RDD 多个分片。RDD 就是一个元数据记录集,记录了 RDD 内存所有的关系数据。

二、Spark 编程接口

Spark 程序设计流程一般如下:

步骤一:实例化 sparkContent 对象。sparkContent 封装了程序运行的上下文环境,包括配置信息、数据库管理器、任务调度等。
步骤二:构造 RDD。可通过 sparkContent 提供的函数构造 RDD,常见的 RDD 构造方式分为:将 Scala 集合转换为 RDD 和将 Hadoop 文件转换为 RDD。
步骤三:在 RDD 基础上,通过 Spark 提供的 transformation 算子完成数据处理步骤。
步骤四:通过 action 算子将最终 RDD 作为结果直接返回或者保存到文件中。

Spark 提供了两大类编程接口,分别为 RDD 操作符以及共享变量。
其中 RDD 操作符包括 transformation 和 action 以及 control API 三类;共享变量包括广播变量和累加器两种。

三、创建 sparkContent 对象,封装了 Spark 执行环境信息

1、创建 conf , 封装了 spark 配置信息

SparkConf conf = new SparkConf().setAppName(appName); 
conf.set(“spark.master”,“local”); 
conf.set(“spark.yarn.queue”,“infrastructure”);

2、创建 SparkContext,封装了调度器等信息

JavaSparkContext jsc = new JavaSparkContext(conf);

四、构建 RDD

1、Java 集合构建

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); 
JavaRDD<Integer> rdd = jsc.parallelize(data, 3)

2、将文本文件转换为 RDD

jsc.textFile(“/data”, 1) 
jsc.textFile(“/data/file.txt”, 1) 
jsc.textFile(“/data/*.txt”, 1) 
jsc.textFile(“hdfs://bigdata:9000/data/”, 1) 
jsc.sequenceFile(“/data”, 1) 
jsc.wholeTextFiles(“/data”, 1)

五、RDD transformation

transformation API 是惰性的,调用这些 API 比不会触发实际的分布式数据计算,而仅仅是将相关信息记录下来,直到 action API 才会开始数据计算。

Spark 提供了大量的 transformation API,下面列举了一些常用的 API:

API 功能
map(func) 将 RDD 中的元素,通过 func 函数逐一映射成另外一个值,形成一个新的 RDD
filter(func) 将 RDD 中使用 func 函数返回 true 的元素过滤出来,形成一个新的 RDD
flatMap(func 类似于 map,但每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]
sample(withReplacement, fraction, seed) 数据采样函数。根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子
union(otherDataset) 求两个 RDD (目标 RDD 与指定 RDD)的并集,并以 RDD 形式返回
intersection(otherDataset) 求两个 RDD (目标 RDD 与指定 RDD)的交集,并以 RDD 形式返回
distinct([numTasks])) 对目标 RDD 进行去重后返回一个新的 RDD
groupByKey([numTasks]) 针对 key/value 类型的 RDD,将 key 相同的 value 聚集在一起。默认任务并发度与父 RDD 相同,可显示设置 [numTasks]大小
reduceByKey(func, [numTasks]) 针对 key/value 类型的 RDD,将 key 相同的 value 聚集在一起,将对每组 value,按照函数 func 规约,产生新的 RDD
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 与 reduceByKey 类似,但目标 key/value 的类型与最终产生的 RDD 可能不同
sortByKey([ascending], [numTasks]) 针对 key/value 类型的 RDD,按照 key 进行排序,若 ascending 为 true,则为升序,反之为降序
join(otherDataset, [numTasks]) 针对 key/value 类型的 RDD,对 (K,V) 类型的 RDD 和 (K,W) 类型的 RDD 上调用,按照 key 进行等值连接,返回一个相同 key 对应的所有元素在一起的 (K,(V,W)) 的 RDD 相当于内连接(求交集)
cogroup(otherDataset, [numTasks]) 分组函数,对 (K,V) 类型的 RDD 和 (K,W) 类型的 RD 按照 key 进行分组,产生新的 (K,(Iterable<V>,Iterable<W>)) 类型的 RDD
cartesian(otherDataset) 求两个 RDD 的笛卡尔积
coalesce(numPartitions) 重新分区, 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率
repartition(numPartitions) 重新分区,将目标 RDD 的 partition 数量重新调整为 numPartitions,少变多
glom() 将 RDD 中每个 partition 中元素转换为数组, 并生 成新的 rdd2
mapValues() 针对于 (K,V) 形式的类型只对 V 进行操作
cache RDD 缓存,可以避免重复计算从而减少时间,cache 内部调用了 persist 算子,cache 默认就一个缓存级别 MEMORY-ONLY
persist persist 可以选择缓存级别

六、RDD action

transformation 算子具有惰性执行的特性,他仅仅是记录一些原信息,知道遇到 action 算子才会触发相关 transformation 算子的执行,

Spark 提供了大量的 action API,下面列举了一些常用的 API:

API 功能
reduce(func) 将 RDD 中元素前两个传给输入函数,产生一个新的 return 值,新产生的 return 值与 RDD 中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止
collect() 将 RDD 以数组的形式返回给 Driver, 通过将计算后的较小结果集返回
count() 计算 RDD 中的元素个数
first() 返回 RDD 中第一个元素
take(n) 以数组的形式返回 RDD 前 n 个元素
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子
saveAsTextFile(path) 将 RDD 存储到文本文件中,并一次调用每个元素的 toString 方法将之转换成字符串保存成一行
saveAsSequenceFile(path) 针对 key/value 类型的 RDD, 保存成 SequenceFile 格式文件
countByKey() 针对 key/value 类型的 RDD, 统计每个 key 出现的次数,并以 hashmap 形式返回
foreach(func) 将 RDD 中的元素一次交给 func 处理
aggregate 先对分区进行操作,再总体操作
aggregateByKey
lookup(key: K) 针对 key/value 类型的 RDD, 指定 key 值,返回 RDD 中该 K 对应的所有 V 值。
foreachPartition 类似于 foreach,但独立地在 RDD 的每一个分片上运行,其中可嵌入 foreach 算子

七、RDD 的宽依赖和窄依赖

由于 RDD 是粗粒度的操作数据集,每个 Transformation 操作都会生成一个新的 RDD,所以 RDD 之间就会形成类似流水线的前后依赖关系;RDD 和它依赖的父 RDD(s)的关系有两种不同的类型,即窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。

宽依赖和窄依赖深度剖析图:

窄依赖:指的是子 RDD 只依赖于父 RDD 中一个固定数量的分区。
宽依赖:指的是子 RDD 的每一个分区都依赖于父 RDD 的所有分区。

RDD Stage:

在 Spark 中,Spark 会将每一个 Job 分为多个不同的 Stage, 而 Stage 之间的依赖关系则形成了有向无环图,Spark 会根据 RDD 之间的依赖关系将 DAG 图(有向无环图)划分为不同的阶段,对于窄依赖,由于 Partition 依赖关系的确定性,Partition 的转换处理就可以在同一个线程里完成,窄依赖就被 Spark 划分到同一个 stage 中,而对于宽依赖,只能等父 RDD shuffle 处理完成后,下一个 stage 才能开始接下来的计算。

八、RDD 持久化

persist 和 cache 算子

RDD 持久化是 Spark 非常重要的特性之一。用户可显式将一个 RDD 持久化到内存或磁盘中,以便重用该 RDD。RDD 持久化是一个分布式的过程,其内部的每个 Partition 各自缓存到所在的计算节点上。RDD 持久化存储能大大加快数据计算效率,尤其适合迭代式计算和交互式计算。

Spark 提供了 persist 和 cache 两个持久化函数,其中 cache 将 RDD 持久化到内存中,而 persist 则支持多种存储级别。

persist RDD 存储级别:

持久化级别 含义
MEMORY_ONLY 以非序列化的 Java 对象的方式持久化在 JVM 内存中。如果内存无法完全存储 RDD 所有的 partition,那么那些没有持久化的 partition 就会在下一次需要使用它的时候,重新被计算。
MEMORY_AND_DISK 同上,但是当某些 partition 无法存储在内存中时,会持久化到磁盘中。下次需要使用这些 partition 时,需要从磁盘上读取。
MEMORY_ONLY_SER 同 MEMORY_ONLY,但是会使用 Java 序列化方式,将 Java 对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大 CPU 开销
MEMORY_AND_DSK_SER 同 MEMORY_AND_DSK。但是使用序列化方式持久化 Java 对象。
DISK_ONLY 使用非序列化 Java 对象的方式持久化,完全存储到磁盘上。
MEMORY_ONLY_2
MEMORY_AND_DISK_2 如果是尾部加了 2 的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。

如何选择 RDD 持久化策略:

Spark 提供的多种持久化级别,主要是为了在 CPU 和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:

  • 优先使用 MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗 CPU 进行反序列化操作。
  • 如果 MEMORY_ONLY 策略,无法存储的下所有数据的话,那么使用 MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗 CPU 进行反序列化。
  • 如果需要进行快速的失败恢复,那么就选择带后缀为_2 的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
  • 能不使用 DISK 相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。

checkpoint

除了 cache 和 persist 之外,Spark 还提供了另外一种持久化:checkpoint, 它能将 RDD 写入文件系统,提供类似于数据库快照的功能。

于 cache 和 persist, 区别:

  • Spark 自动管理(创建和回收)cache 和 persist 持久化的数据,而 checkpoint 持久化的数据需要有由户自己管理
  • checkpoint 会清楚 RDD 的血统,避免血统过长导致序列化开销增大,而 cache 和 persist 不会清楚 RDD 的血统

代码实例:

sc.checkpoint("hdfs://spark/rdd"); // 设置存放目录
val data = sc.testFile("hdfs://bigdata:9000/input");
val rdd = data.map(..).reduceByKey(...)

rdd.checkpoint
rdd.count()

正文完
 0