Spark 作为一个基于内存的分布式计算引擎,其内存治理模块在整个零碎中扮演着十分重要的角色。了解 Spark 内存治理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。本文旨在梳理出 Spark 内存治理的脉络,抛砖引玉,引出读者对这个话题的深入探讨。本文中论述的原理基于 Spark 2.1 版本,浏览本文须要读者有肯定的 Spark 和 Java 根底,理解 RDD、Shuffle、JVM 等相干概念。
在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 过程,前者为主控过程,负责创立 Spark 上下文,提交 Spark 作业(Job),并将作业转化为计算工作(Task),在各个 Executor 过程间协调工作的调度,后者负责在工作节点上执行具体的计算工作,并将后果返回给 Driver,同时为须要长久化的 RDD 提供存储性能[1]。因为 Driver 的内存治理相对来说较为简单,本文次要对 Executor 的内存治理进行剖析,下文中的 Spark 内存均特指 Executor 的内存。
1 . Spark Shuffle 进化史
在 MapReduce 框架中,shuffle 是连贯 Map 和 Reduce 之间的桥梁,Map 的输入要用到 Reduce 中必须通过 shuffle 这个环节,shuffle 的性能高下间接影响了整个程序的性能和吞吐量。Spark 作为 MapReduce 框架的一种实现,天然也实现了 shuffle 的逻辑。
Shuffle 是 MapReduce 框架中的一个特定的 phase,介于 Map phase 和 Reduce phase 之间,当 Map 的输入后果要被 Reduce 应用时,输入后果须要按 key 哈希,并且散发到每一个 Reducer 下来,这个过程就是 shuffle。因为 shuffle 波及到了磁盘的读写和网络的传输,因而 shuffle 性能的高下间接影响到了整个程序的运行效率。
上面这幅图清晰地形容了 MapReduce 算法的整个流程,其中 shuffle phase 是介于 Map phase 和 Reduce phase 之间。
概念上 shuffle 就是一个沟通数据连贯的桥梁,那么实际上 shuffle(partition)这一部分是如何实现的的呢,上面咱们就以 Spark 为例讲一下 shuffle 在 Spark 中的实现。
先以图为例简略形容一下 Spark 中 shuffle 的整一个流程:
- 首先每一个 Mapper 会依据 Reducer 的数量创立出相应的 bucket,bucket 的数量是 MM×RR,其中 MM 是 Map 的个数,RR 是 Reduce 的个数。
- 其次 Mapper 产生的后果会依据设置的 partition 算法填充到每个 bucket 中去。这里的 partition 算法是能够自定义的,当然默认的算法是依据 key 哈希到不同的 bucket 中去。
- 当 Reducer 启动时,它会依据本人 task 的 id 和所依赖的 Mapper 的 id 从远端或是本地的 block manager 中获得相应的 bucket 作为 Reducer 的输出进行解决。
这里的 bucket 是一个抽象概念,在实现中每个 bucket 能够对应一个文件,能够对应文件的一部分或是其余等。
Apache Spark 的 Shuffle 过程与 Apache Hadoop 的 Shuffle 过程有着诸多相似,一些概念可间接套用,例如,Shuffle 过程中,提供数据的一端,被称作 Map 端,Map 端每个生成数据的工作称为 Mapper,对应的,接收数据的一端,被称作 Reduce 端,Reduce 端每个拉取数据的工作称为 Reducer,Shuffle 过程实质上都是将 Map 端取得的数据应用分区器进行划分,并将数据发送给对应的 Reducer 的过程。
2. 堆内和堆外内存布局
作为一个 JVM 过程,Executor 的内存治理建设在 JVM 的内存治理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为具体的调配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之能够间接在工作节点的零碎内存中开拓空间,进一步优化了内存的应用。
图 1 . 堆内和堆外内存示意图
1.1 堆内内存
堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发工作共享 JVM 堆内内存,这些工作在缓存 RDD 数据和播送(Broadcast)数据时占用的内存被布局为存储(Storage)内存,而这些工作在执行 Shuffle 时占用的内存被布局为执行(Execution)内存,残余的局部不做非凡布局,那些 Spark 外部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用残余的空间。不同的管理模式下,这三局部占用的空间大小各不相同(上面第 2 大节会进行介绍)。
Spark 对堆内内存的治理是一种逻辑上的 ” 布局式 ” 的治理,因为对象实例占用内存的申请和开释都由 JVM 实现,Spark 只能在申请后和开释前记录这些内存,咱们来看其具体流程:
申请内存:
Spark 在代码中 new 一个对象实例
JVM 从堆内内存调配空间,创建对象并返回对象援用
Spark 保留该对象的援用,记录该对象占用的内存
开释内存:
Spark 记录该对象开释的内存,删除该对象的援用
期待 JVM 的垃圾回收机制开释该对象占用的堆内内存
咱们晓得,JVM 的对象能够以序列化的形式存储,序列化的过程是将对象转换为二进制字节流,实质上能够了解为将非间断空间的链式存储转化为间断空间或块存储,在拜访时则须要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的形式能够节俭存储空间,但减少了存储和读取时候的计算开销。
对于 Spark 中序列化的对象,因为是字节流的模式,其占用的内存大小可间接计算,而对于非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,即并不是每次新增的数据项都会计算一次占用的内存大小,这种办法升高了工夫开销然而有可能误差较大,导致某一时刻的理论内存有可能远远超出预期[2]。此外,在被 Spark 标记为开释的对象实例,很有可能在实际上并没有被 JVM 回收,导致理论可用的内存小于 Spark 记录的可用内存。所以 Spark 并不能精确记录理论可用的堆内内存,从而也就无奈完全避免内存溢出(OOM, Out of Memory)的异样。
尽管不能精准管制堆内内存的申请和开释,但 Spark 通过对存储内存和执行内存各自独立的布局治理,能够决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在肯定水平上能够晋升内存的利用率,缩小异样的呈现。
1.2 堆外内存
为了进一步优化内存的应用以及进步 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之能够间接在工作节点的零碎内存中开拓空间,存储通过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在治理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现[3]),Spark 能够间接操作系统堆外内存,缩小了不必要的内存开销,以及频繁的 GC 扫描和回收,晋升了解决性能。堆外内存能够被准确地申请和开释,而且序列化的数据占用的空间能够被准确计算,所以相比堆内内存来说升高了治理的难度,也升高了误差。
在默认状况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分形式雷同,所有运行中的并发工作共享存储内存和执行内存。
1.3 内存治理接口
Spark 为存储内存和执行内存的治理提供了对立的接口——MemoryManager,同一个 Executor 内的工作都调用这个接口的办法来申请或开释内存:
清单 1 . 内存治理接口的次要办法
// 申请存储内存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申请开展内存
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申请执行内存
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
// 开释存储内存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
// 开释执行内存
def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit
// 开释开展内存
def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit
咱们看到,在调用这些办法时都须要指定其内存模式(MemoryMode),这个参数决定了是在堆内还是堆外实现这次操作。
MemoryManager 的具体实现上,Spark 1.6 之后默认为对立治理(Unified Memory Manager)形式,1.6 之前采纳的动态治理(Static Memory Manager)形式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用。两种形式的区别在于对空间调配的形式,上面的第 2 大节会别离对这两种形式进行介绍。
2 . 内存空间调配
2.1 动态内存治理
在 Spark 最后采纳的动态内存管理机制下,存储内存、执行内存和其余内存的大小在 Spark 利用程序运行期间均为固定的,但用户能够应用程序启动前进行配置,堆内内存的调配如图 2 所示:
图 2 . 动态内存治理图示——堆内
能够看到,可用的堆内内存的大小须要依照上面的形式计算:
清单 2 . 可用堆内内存空间
- 可用的存储内存 = systemMaxMemory spark.storage.memoryFraction spark.storage.safetyFraction
- 可用的执行内存 = systemMaxMemory spark.shuffle.memoryFraction spark.shuffle.safetyFraction
其中 systemMaxMemory 取决于以后 JVM 堆内内存的大小,最初可用的执行内存或者存储内存要在此基础上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 1-safetyFraction 这么一块保险区域,升高因理论内存超出以后预设范畴而导致 OOM 的危险(上文提到,对于非序列化对象的内存采样估算会产生误差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的布局,在具体应用时 Spark 并没有区别对待,和 ” 其它内存 ” 一样交给了 JVM 去治理。
堆外的空间调配较为简单,只有存储内存和执行内存,如图 3 所示。可用的执行内存和存储内存占用的空间大小间接由参数 spark.memory.storageFraction 决定,因为堆外内存占用的空间能够被准确计算,所以无需再设定保险区域。
图 3 . 动态内存治理图示——堆外
动态内存管理机制实现起来较为简单,但如果用户不相熟 Spark 的存储机制,或没有依据具体的数据规模和计算工作或做相应的配置,很容易造成 ” 一半淡水,一半火焰 ” 的场面,即存储内存和执行内存中的一方残余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。因为新的内存管理机制的呈现,这种形式目前曾经很少有开发者应用,出于兼容旧版本的应用程序的目标,Spark 依然保留了它的实现。
2.2 对立内存治理
Spark 1.6 之后引入的对立内存管理机制,与动态内存治理的区别在于存储内存和执行内存共享同一块空间,能够动静占用对方的闲暇区域,如图 4 和图 5 所示
图 4 . 对立内存治理图示——堆内
图 5 . 对立内存治理图示——堆外
其中最重要的优化在于动静占用机制,其规定如下:
- 设定根本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了单方各自领有的空间的范畴
- 单方的空间都有余时,则存储到硬盘;若己方空间有余而对方空余时,可借用对方的空间;(存储空间有余是指不足以放下一个残缺的 Block)
- 执行内存的空间被对方占用后,可让对方将占用的局部转存到硬盘,而后 ” 偿还 ” 借用的空间
- 存储内存的空间被对方占用后,无奈让对方 ” 偿还 ”,因为须要思考 Shuffle 过程中的很多因素,实现起来较为简单[4]
图 6 . 动静占用机制图示
凭借对立内存管理机制,Spark 在肯定水平上进步了堆内和堆外内存资源的利用率,升高了开发者保护 Spark 内存的难度,但并不意味着开发者能够居安思危。譬如,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,升高工作执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的 [5]。所以要想充分发挥 Spark 的性能,须要开发者进一步理解存储内存和执行内存各自的治理形式和实现原理。
3 . 存储内存治理
3.1 RDD 的长久化机制
弹性分布式数据集(RDD)作为 Spark 最基本的数据抽象,是只读的分区记录(Partition)的汇合,只能基于在稳固物理存储中的数据集上创立,或者在其余已有的 RDD 上执行转换(Transformation)操作产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖关系,形成了血统(Lineage)。凭借血统,Spark 保障了每一个 RDD 都能够被从新复原。但 RDD 的所有转换都是惰性的,即只有当一个返回后果给 Driver 的口头(Action)产生时,Spark 才会创立工作读取 RDD,而后真正触发转换的执行。
Task 在启动之初读取一个分区时,会先判断这个分区是否曾经被长久化,如果没有则须要查看 Checkpoint 或依照血统从新计算。所以如果一个 RDD 上要执行屡次口头,能够在第一次口头中应用 persist 或 cache 办法,在内存或磁盘中长久化或缓存这个 RDD,从而在前面的口头时晋升计算速度。事实上,cache 办法是应用默认的 MEMORY_ONLY 的存储级别将 RDD 长久化到内存,故缓存是一种非凡的长久化。堆内和堆外存储内存的设计,便能够对缓存 RDD 时应用的内存做对立的布局和治理(存储内存的其余利用场景,如缓存 broadcast 数据,临时不在本文的探讨范畴之内)。
RDD 的长久化由 Spark 的 Storage 模块 [7] 负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或近程存取数据的性能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage 模块形成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Storage 模块在逻辑上以 Block 为根本存储单位,RDD 的每个 Partition 通过解决后惟一对应一个 Block(BlockId 的格局为 rdd_RDD-ID_PARTITION-ID)。Master 负责整个 Spark 应用程序的 Block 的元数据信息的治理和保护,而 Slave 须要将 Block 的更新等状态上报到 Master,同时接管 Master 的命令,例如新增或删除一个 RDD。
图 7 . Storage 模块示意图
在对 RDD 长久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不同的 存储级别,而存储级别是以下 5 个变量的组合:
清单 3 . 存储级别
class StorageLevel private(
private var _useDisk: Boolean, // 磁盘
private var _useMemory: Boolean, // 这里其实是指堆内内存
private var _useOffHeap: Boolean, // 堆外内存
private var _deserialized: Boolean, // 是否为非序列化
private var _replication: Int = 1 // 正本个数
)
通过对数据结构的剖析,能够看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储形式:
存储地位:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前抉择堆外内存时不能同时存储到其余地位。
存储模式:Block 缓存到存储内存后,是否为非序列化的模式。如 MEMORY_ONLY 是非序列化形式存储,OFF_HEAP 是序列化形式存储。
正本数量:大于 1 时须要近程冗余备份到其余节点。如 DISK_ONLY_2 须要近程备份 1 个正本。
3.2 RDD 缓存的过程
RDD 在缓存到存储内存之前,Partition 中的数据个别以迭代器(Iterator)的数据结构来拜访,这是 Scala 语言中一种遍历数据汇合的办法。通过 Iterator 能够获取分区中每一条序列化或者非序列化的数据项(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 局部的空间,同一 Partition 的不同 Record 的空间并不间断。
RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块间断的空间。将 Partition 由不间断的存储空间转换为间断存储空间的过程,Spark 称之为 ” 开展 ”(Unroll)。Block 有序列化和非序列化两种存储格局,具体以哪种形式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例,序列化的 Block 则以 SerializedMemoryEntry 的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 构造(LinkedHashMap)来治理堆内和堆外存储内存中所有的 Block 对象的实例[6],对这个 LinkedHashMap 新增和删除间接记录了内存的申请和开释。
因为不能保障存储空间能够一次包容 Iterator 中的所有数据,以后的计算工作在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来长期占位,空间有余则 Unroll 失败,空间足够时能够持续进行。对于序列化的 Partition,其所需的 Unroll 空间能够间接累加计算,一次申请。而非序列化的 Partition 则要在遍历 Record 的过程中顺次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间有余时能够中断,开释已占用的 Unroll 空间。如果最终 Unroll 胜利,以后 Partition 所占用的 Unroll 空间被转换为失常的缓存 RDD 的存储空间,如下图 8 所示。
图 8. Spark Unroll 示意图
在图 3 和图 5 中能够看到,在动态内存治理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,对立内存治理时则没有对 Unroll 空间进行特地辨别,当存储空间有余时会依据动静占用机制进行解决。
3.3 淘汰和落盘
因为同一个 Executor 的所有的计算工作共享无限的存储内存空间,当有新的 Block 须要缓存然而残余空间有余且无奈动静占用时,就要对 LinkedHashMap 中的旧 Block 进行淘汰(Eviction),而被淘汰的 Block 如果其存储级别中同时蕴含存储到磁盘的要求,则要对其进行落盘(Drop),否则间接删除该 Block。
存储内存的淘汰规定为:
- 被淘汰的旧 Block 要与新 Block 的 MemoryMode 雷同,即同属于堆外或堆内内存
- 新旧 Block 不能属于同一个 RDD,防止循环淘汰
- 旧 Block 所属 RDD 不能处于被读状态,防止引发一致性问题
- 遍历 LinkedHashMap 中 Block,依照最近起码应用(LRU)的程序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的个性。
落盘的流程则比较简单,如果其存储级别合乎_useDisk 为 true 的条件,再依据其_deserialized 判断是否是非序列化的模式,若是则对其进行序列化,最初将数据存储到磁盘,在 Storage 模块中更新其信息。
4 . 执行内存治理
4.1 多任务间内存调配
Executor 内运行的工作同样共享执行内存,Spark 用一个 HashMap 构造保留了工作到内存消耗的映射。每个工作可占用的执行内存大小的范畴为 1/2N ~ 1/N,其中 N 为以后 Executor 内正在运行的工作的个数。每个工作在启动之时,要向 MemoryManager 申请申请起码为 1/2N 的执行内存,如果不能被满足要求则该工作被阻塞,直到有其余工作开释了足够的执行内存,该工作才能够被唤醒。
4.2 Shuffle 的内存占用
执行内存次要用来存储工作在执行 Shuffle 时占用的内存,Shuffle 是依照肯定规定对 RDD 数据从新分区的过程,咱们来看 Shuffle 的 Write 和 Read 两阶段对执行内存的应用:
Shuffle Write
若在 map 端抉择一般的排序形式,会采纳 ExternalSorter 进行外排,在内存中存储数据时次要占用堆内执行空间。
若在 map 端抉择 Tungsten 的排序形式,则采纳 ShuffleExternalSorter 间接对以序列化模式存储的数据排序,在内存中存储数据时能够占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
Shuffle Read
在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 解决,在内存中存储数据时占用堆内执行空间。
如果须要进行最终后果排序,则要将再次将数据交给 ExternalSorter 解决,占用堆内执行空间。
在 ExternalSorter 和 Aggregator 中,Spark 会应用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保留到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到肯定水平,无奈再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最初会被归并(Merge)。
Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 应用的打算 [9],解决了一些 JVM 在性能上的限度和弊病。Spark 会依据 Shuffle 的状况来主动抉择是否采纳 Tungsten 排序。Tungsten 采纳的页式内存管理机制建设在 MemoryManager 之上,即 Tungsten 对执行内存的应用进行了一步的形象,这样在 Shuffle 过程中无需关怀数据具体存储在堆内还是堆外。每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量对立标识一个内存页在零碎内存中的地址。堆内的 MemoryBlock 是以 long 型数组的模式调配的内存,其 obj 的值为是这个数组的对象援用,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者配合应用能够定位这个数组在堆内的相对地址;堆外的 MemoryBlock 是间接申请到的内存块,其 obj 为 null,offset 是这个内存块在零碎内存中的 64 位相对地址。Spark 用 MemoryBlock 奇妙地将堆内和堆外内存页对立形象封装,并用页表(pageTable) 治理每个 Task 申请到的内存页。
Tungsten 页式治理下的所有内存用 64 位的逻辑地址示意,由页号和页内偏移量组成:
页号:占 13 位,惟一标识一个内存页,Spark 在申请内存页之前要先申请闲暇页号。
页内偏移量:占 51 位,是在应用内存页存储数据时,数据在页内的偏移地址。
有了对立的寻址形式,Spark 能够用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的过程只须要对指针进行排序,并且无需反序列化,整个过程十分高效,对于内存拜访效率和 CPU 应用效率带来了显著的晋升[10]。
Spark 的存储内存和执行内存有着截然不同的治理形式:对于存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由须要缓存的 RDD 的 Partition 转化而成;而对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序中甚至形象成为页式内存治理,开拓了全新的 JVM 内存管理机制。