共计 5712 个字符,预计需要花费 15 分钟才能阅读完成。
序
本文主要研究一下 flink 的 MemoryPool
MemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
abstract static class MemoryPool {
abstract int getNumberOfAvailableMemorySegments();
abstract MemorySegment allocateNewSegment(Object owner);
abstract MemorySegment requestSegmentFromPool(Object owner);
abstract void returnSegmentToPool(MemorySegment segment);
abstract void clear();
}
MemoryPool 定义了 getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear 这几个抽象方法;它有 HybridHeapMemoryPool、HybridOffHeapMemoryPool 这两个子类
HybridHeapMemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
static final class HybridHeapMemoryPool extends MemoryPool {
/** The collection of available memory segments. */
private final ArrayDeque<byte[]> availableMemory;
private final int segmentSize;
HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<>(numInitialSegments);
this.segmentSize = segmentSize;
for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(new byte[segmentSize]);
}
}
@Override
MemorySegment allocateNewSegment(Object owner) {
return MemorySegmentFactory.allocateUnpooledSegment(segmentSize, owner);
}
@Override
MemorySegment requestSegmentFromPool(Object owner) {
byte[] buf = availableMemory.remove();
return MemorySegmentFactory.wrapPooledHeapMemory(buf, owner);
}
@Override
void returnSegmentToPool(MemorySegment segment) {
if (segment.getClass() == HybridMemorySegment.class) {
HybridMemorySegment heapSegment = (HybridMemorySegment) segment;
availableMemory.add(heapSegment.getArray());
heapSegment.free();
}
else {
throw new IllegalArgumentException(“Memory segment is not a ” + HybridMemorySegment.class.getSimpleName());
}
}
@Override
protected int getNumberOfAvailableMemorySegments() {
return availableMemory.size();
}
@Override
void clear() {
availableMemory.clear();
}
}
HybridHeapMemoryPool 继承了 MemoryPool,它使用的是 jvm 的 heap 内存;构造器接收 numInitialSegments、segmentSize 两个参数用于初始化 availableMemory 这个 ArrayDeque,该 queue 的元素类型为 byte[]
allocateNewSegment 方法调用的是 MemorySegmentFactory.allocateUnpooledSegment,用于分配 unpooled memory;requestSegmentFromPool 方法调用的是 availableMemory.remove(),然后调用 MemorySegmentFactory.wrapPooledHeapMemory 包装为 MemorySegment,这个方法没有判断 ArrayDeque 的大小就直接 remove,需要注意
returnSegmentToPool 方法只对 HybridMemorySegment 类型进行处理,首先将它的 byte[] 归还到 availableMemory,之后调用 heapSegment.free() 释放;getNumberOfAvailableMemorySegments 方法返回的是 availableMemory.size();clear 方法调用的是 availableMemory.clear()
HybridOffHeapMemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
static final class HybridOffHeapMemoryPool extends MemoryPool {
/** The collection of available memory segments. */
private final ArrayDeque<ByteBuffer> availableMemory;
private final int segmentSize;
HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<>(numInitialSegments);
this.segmentSize = segmentSize;
for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
}
}
@Override
MemorySegment allocateNewSegment(Object owner) {
return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
}
@Override
MemorySegment requestSegmentFromPool(Object owner) {
ByteBuffer buf = availableMemory.remove();
return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
}
@Override
void returnSegmentToPool(MemorySegment segment) {
if (segment.getClass() == HybridMemorySegment.class) {
HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
ByteBuffer buf = hybridSegment.getOffHeapBuffer();
availableMemory.add(buf);
hybridSegment.free();
}
else {
throw new IllegalArgumentException(“Memory segment is not a ” + HybridMemorySegment.class.getSimpleName());
}
}
@Override
protected int getNumberOfAvailableMemorySegments() {
return availableMemory.size();
}
@Override
void clear() {
availableMemory.clear();
}
}
HybridOffHeapMemoryPool 继承了 MemoryPool,它使用的是 OffHeap;构造器接收 numInitialSegments、segmentSize 两个参数用于初始化 availableMemory 这个 ArrayDeque,该 queue 的元素类型为 ByteBuffer
allocateNewSegment 方法调用的是 MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配 unpooled off-heap memory;requestSegmentFromPool 方法调用的是 availableMemory.remove(),然后调用 MemorySegmentFactory.wrapPooledOffHeapMemory 包装为 MemorySegment,这个方法没有判断 ArrayDeque 的大小就直接 remove,需要注意
returnSegmentToPool 方法只对 HybridMemorySegment 类型进行处理,首先将它的 ByteBuffer 归还到 availableMemory,之后调用 heapSegment.free() 释放;getNumberOfAvailableMemorySegments 方法返回的是 availableMemory.size();clear 方法调用的是 availableMemory.clear()
小结
MemoryPool 定义了 getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear 这几个抽象方法;它有 HybridHeapMemoryPool、HybridOffHeapMemoryPool 这两个子类
HybridHeapMemoryPool 继承了 MemoryPool,它使用的是 jvm 的 heap 内存;构造器接收 numInitialSegments、segmentSize 两个参数用于初始化 availableMemory 这个 ArrayDeque,该 queue 的元素类型为 byte[];allocateNewSegment 方法调用的是 MemorySegmentFactory.allocateUnpooledSegment,用于分配 unpooled memory;requestSegmentFromPool 方法调用的是 availableMemory.remove(),然后调用 MemorySegmentFactory.wrapPooledHeapMemory 包装为 MemorySegment,这个方法没有判断 ArrayDeque 的大小就直接 remove,需要注意;returnSegmentToPool 方法只对 HybridMemorySegment 类型进行处理,首先将它的 byte[] 归还到 availableMemory,之后调用 heapSegment.free() 释放;getNumberOfAvailableMemorySegments 方法返回的是 availableMemory.size();clear 方法调用的是 availableMemory.clear()
HybridOffHeapMemoryPool 继承了 MemoryPool,它使用的是 OffHeap;构造器接收 numInitialSegments、segmentSize 两个参数用于初始化 availableMemory 这个 ArrayDeque,该 queue 的元素类型为 ByteBuffer;allocateNewSegment 方法调用的是 MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配 unpooled off-heap memory;requestSegmentFromPool 方法调用的是 availableMemory.remove(),然后调用 MemorySegmentFactory.wrapPooledOffHeapMemory 包装为 MemorySegment,这个方法没有判断 ArrayDeque 的大小就直接 remove,需要注意;returnSegmentToPool 方法只对 HybridMemorySegment 类型进行处理,首先将它的 ByteBuffer 归还到 availableMemory,之后调用 heapSegment.free() 释放;getNumberOfAvailableMemorySegments 方法返回的是 availableMemory.size();clear 方法调用的是 availableMemory.clear()
doc
MemoryManager