Spark1.6 以后,增加统一内存管理机制内存管理模块包括堆内内存(On-heap Memory),堆外内存(Off-heap Memory)两大区域。
1. 堆内内存
Executor Memory:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据
Storage Memory:主要用于存储 spark 的 cache 数据,例如 RDD 的缓存、unroll 数据
User Memory:主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息
Reserved Memory:系统预留内存,会用来存储 Spark 内部对象
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
systemMemory:
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
Runtime.getRuntime.maxMemory
就是 JVM 运行时的堆内存,在 Java 程序中通过 -Xmx -Xms
配置,spark 中通过spark.executor.memory
或 --executor-memory
配置的。
useableMemory:spark 可用内存
val usableMemory = systemMemory - reservedMemory
补充:
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
execution Memory 不得小于 reservedMemory 的 1.5 倍。
2. 堆外内存
Spark 1.6 开始引入了 Off-heap memory,调用 Java 的 Unsafe 类 API 申请堆外的内存资源,这种方式不进行 Java 内存管理,可避免频繁 GC,但需要自己实现内存申请和释放的逻辑。
3. 堆内内存动态调整
初始化:程序提交时,execution 和 storage 各占 0.5(通过 spark.memory.storageFraction
配置)
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
这意味着
- 在程序运行时,如果双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU 规则进行的。若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
- Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后 ” 归还 ” 借用的空间
- Storage 内存的空间被对方占用后,目前的实现是无法让对方 ” 归还 ”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用。
注意,上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。
4.Task 内存分配
/**
* Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
* obtained, or 0 if none can be allocated.
* * This call may block until there is enough free memory in some situations, to make sure each
* task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
* * @param numBytes number of bytes to acquire
* @param taskAttemptId the task attempt acquiring memory
* @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
* one parameter (Long) that represents the desired amount of memory by
* which this pool should be expanded.
* @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
* at this given moment. This is not a field because the max pool
* size is variable in certain cases. For instance, in unified
* memory management, the execution pool can be expanded by evicting
* cached blocks, thereby shrinking the storage pool.
* * @return the number of bytes granted to the task.
*/
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
// TODO: clean up this clunky method signature
// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()}
// Keep looping until we're either sure that we don't want to grant this request (because this
// task would have more than 1 / numActiveTasks of the memory) or we have enough free
// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
// TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
// In every iteration of this loop, we should first try to reclaim any borrowed execution
// space from storage. This is necessary because of the potential race condition where new
// storage blocks may steal the free execution memory that this task was waiting for.
maybeGrowPool(numBytes - memoryFree)
// Maximum size the pool would have after potentially growing the pool.
// This is used to compute the upper bound of how much memory each task can occupy. This
// must take into account potential free memory as well as the amount this pool currently
// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
// we did not take into account space that could have been freed by evicting cached blocks.
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()} else {memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}
- 每个 task=>memory 都存放在
memoryForTask
这个 mutable.HashMap 里 - 如果当前要分配的 task id 不存在,就设为 0L。
- 申请不到足够的内存,方法阻塞,直到有足够的内存(
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
) - 如果只有一个 task,可以使用全部 execution 内存
5.Spark UI 数据解释
内存分配池的堆部分分为 Eden,Survivor 和 Tenured 三部分空间,而这里面一共包含了两个 Survivor 区域,而这两个 Survivor 区域在任何时候我们只能用到其中一个,所以我们可以使用下面的公式进行描述:
ExecutorMemory = Eden + 2 * Survivor + Tenured
Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured
Runtime.getRuntime.maxMemory 的差异取决于 GC 配置 spark.executor.memory
设为 1g,如图
384.1MB = (Runtime.getRuntime.maxMemory (910.5MB) – ReservedMemory (300MB)) × spark.memory.fraction (0.6) × 页面以 1000 为换算单位(1000/1024 × 1000/1024)
366.3MB = (Runtime.getRuntime.maxMemory (910.5MB) – ReservedMemory (300MB)) × spark.memory.fraction (0.6)
加上 1g 堆外内存:
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 1G
1390.3MB = (Runtime.getRuntime.maxMemory (910.5MB) – ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB
1.5G ≈ 1457.8MB = ((Runtime.getRuntime.maxMemory (910.5MB) – ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB) × 页面以 1000 为换算单位(1000/1024 × 1000/1024)
参考文章:
https://www.iteblog.com/archi…(过往记忆)
http://spark.apache.org/docs/…