共计 3062 个字符,预计需要花费 8 分钟才能阅读完成。
1.RDD 缓存机制 cache, persist
Spark 速度十分快的一个起因是 RDD 反对缓存。胜利缓存后,如果之后的操作应用到了该数据集,则间接从缓存中获取。尽管缓存也有失落的危险,然而因为 RDD 之间的依赖关系,如果某个分区的缓存数据失落,只须要从新计算该分区即可。
波及到的算子:persist、cache、unpersist;都是 Transformation
缓存是将计算结果写入不同的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前反对内存、堆
外内存、磁盘);
通过缓存,Spark 防止了 RDD 上的反复计算,可能极大地晋升计算速度;
RDD 长久化或缓存,是 Spark 最重要的特色之一。能够说,缓存是 Spark 构建迭代式算法和疾速交互式查问的要害因
素;
Spark 速度十分快的起因之一,就是在内存中长久化(或缓存)一个数据集。当长久化一个 RDD 后,每一个节点都将
把计算的分片后果保留在内存中,并在对此数据集(或者衍生出的数据集)进行的其余动作(Action)中重用。这使
得后续的动作变得更加迅速;应用 persist()办法对一个 RDD 标记为长久化。之所以说“标记为长久化”,是因为呈现 persist()语句的中央,并不会马
上计算生成 RDD 并把它长久化,而是要等到遇到第一个口头操作触发真正计算当前,才会把计算结果进行长久化;通过 persist()或 cache()办法能够标记一个要被长久化的 RDD,长久化被触发,RDD 将会被保留在计算节点的内存中
并重用;
什么时候缓存数据,须要对空间和速度进行衡量。个别状况下,如果多个动作须要用到某个 RDD,而它的计算代价
又很高,那么就应该把这个 RDD 缓存起来;
缓存有可能失落,或者存储于内存的数据因为内存不足而被删除。RDD 的缓存的容错机制保障了即便缓存失落也能保
证计算的正确执行。通过基于 RDD 的一系列的转换,失落的数据会被重算。RDD 的各个 Partition 是绝对独立的,因而
只须要计算失落的局部即可,并不需要重算全副 Partition。
启动堆外内存须要配置两个参数:
- spark.memory.offHeap.enabled:是否开启堆外内存,默认值为 false,须要设置为 true;
- spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,须要设置为正值。
1.1 缓存级别
Spark 速度十分快的一个起因是 RDD 反对缓存。胜利缓存后,如果之后的操作应用到了该数据集,则间接从缓存中获取。尽管缓存也有失落的危险,然而因为 RDD 之间的依赖关系,如果某个分区的缓存数据失落,只须要从新计算该分区即可。
Spark 反对多种缓存级别:
Storage Level(存储级别) | Meaning(含意) |
---|---|
MEMORY_ONLY |
默认的缓存级别,将 RDD 以反序列化的 Java 对象的模式存储在 JVM 中。如果内存空间不够,则局部分区数据将不再缓存。 |
MEMORY_AND_DISK |
将 RDD 以反序列化的 Java 对象的模式存储 JVM 中。如果内存空间不够,将未缓存的分区数据存储到磁盘,在须要应用这些分区时从磁盘读取。 |
MEMORY_ONLY_SER |
将 RDD 以序列化的 Java 对象的模式进行存储(每个分区为一个 byte 数组)。这种形式比反序列化对象节俭存储空间,但在读取时会减少 CPU 的计算累赘。仅反对 Java 和 Scala。 |
MEMORY_AND_DISK_SER |
相似于 MEMORY_ONLY_SER ,然而溢出的分区数据会存储到磁盘,而不是在用到它们时从新计算。仅反对 Java 和 Scala。 |
DISK_ONLY |
只在磁盘上缓存 RDD |
MEMORY_ONLY_2 , MEMORY_AND_DISK_2 |
与下面的对应级别性能雷同,然而会为每个分区在集群中的两个节点上建设正本。 |
OFF_HEAP |
与 MEMORY_ONLY_SER 相似,但将数据存储在堆外内存中。这须要启用堆外内存。 |
启动堆外内存须要配置两个参数:
- spark.memory.offHeap.enabled:是否开启堆外内存,默认值为 false,须要设置为 true;
- spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,须要设置为正值。
1.2 应用缓存
缓存数据的办法有两个:persist
和 cache
。cache
外部调用的也是 persist
,它是 persist
的特殊化模式,等价于 persist(StorageLevel.MEMORY_ONLY)
。示例如下:
// 所有存储级别均定义在 StorageLevel 对象中 | |
fileRDD.persist(StorageLevel.MEMORY_AND_DISK) | |
fileRDD.cache() |
被缓存的 RDD 在 DAG 图中有一个绿色的圆点。
1.3 移除缓存
Spark 会主动监督每个节点上的缓存应用状况,并依照最近起码应用(LRU)的规定删除旧数据分区。当然,你也能够应用 RDD.unpersist()
办法进行手动删除。
2.RDD 容错机制 Checkpoint
2.1 波及到的算子:checkpoint;也是 Transformation
Spark 中对于数据的保留除了长久化操作之外,还提供了检查点的机制;检查点实质是通过将 RDD 写入高牢靠的磁盘,次要目标是为了容错。检查点通过将数据写入到 HDFS 文件系统实现了
RDD 的检查点性能。Lineage 过长会造成容错老本过高,这样就不如在两头阶段做检查点容错,如果之后有节点呈现问题而失落分区,从
做检查点的 RDD 开始重做 Lineage,就会缩小开销。
2.2 cache 和 checkpoint 区别
cache 和 checkpoint 是有显著区别的,缓存把 RDD 计算出来而后放在内存中,然而 RDD 的依赖链不能丢掉,当某个点某个 executor 宕了,下面 cache 的 RDD 就会丢掉,须要通过依赖链重放计算。不同的是,checkpoint 是把
RDD 保留在 HDFS 中,是多正本牢靠存储,此时依赖链能够丢掉,所以斩断了依赖链。
2.3 checkpoint 适宜场景
以下场景适宜应用检查点机制:
1) DAG 中的 Lineage 过长,如果重算,则开销太大
2) 在宽依赖上做 Checkpoint 取得的收益更大
与 cache 相似 checkpoint 也是 lazy 的。
val rdd1 = sc.parallelize(1 to 100000) | |
// 设置检查点目录 | |
sc.setCheckpointDir("/tmp/checkpoint") | |
val rdd2 = rdd1.map(_*2) | |
rdd2.checkpoint | |
// checkpoint 是 lazy 操作 | |
rdd2.isCheckpointed | |
// checkpoint 之前的 rdd 依赖关系 | |
rdd2.dependencies(0).rdd | |
rdd2.dependencies(0).rdd.collect | |
// 执行一次 action,触发 checkpoint 的执行 | |
rdd2.count | |
rdd2.isCheckpointed | |
// 再次查看 RDD 的依赖关系。能够看到 checkpoint 后,RDD 的 lineage 被截断,变成从 checkpointRDD 开始 | |
rdd2.dependencies(0).rdd | |
rdd2.dependencies(0).rdd.collect | |
// 查看 RDD 所依赖的 checkpoint 文件 | |
rdd2.getCheckpointFile |
备注:checkpoint 的文件作业执行结束后不会被删除
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注