关于大数据:一文带你彻底了解大数据处理引擎Flink内存管理

39次阅读

共计 11542 个字符,预计需要花费 29 分钟才能阅读完成。

摘要: Flink 是 jvm 之上的大数据处理引擎。

Flink 是 jvm 之上的大数据处理引擎,jvm 存在 java 对象存储密度低、full gc 时耗费性能,gc 存在 stw 的问题,同时 omm 时会影响稳定性。同时针对频繁序列化和反序列化问题 flink 应用堆内堆外内存能够间接在一些场景下操作二进制数据,缩小序列化反序列化的耗费。同时基于大数据流式解决的特点,flink 定制了本人的一套序列化框架。flink 也会基于 cpu L1 L2 L3 高速缓存的机制以及局部性原理,设计应用缓存敌对的数据结构。flink 内存治理和 spark 的 tungsten 的内存治理的出发点很类似。

内存模型

Flink 能够应用堆内和堆外内存,内存模型如图所示:

flink 应用内存划分为堆内内存和堆外内存。依照用处能够划分为 task 所用内存,network memory、managed memory、以及 framework 所用内存,其中 task network managed 所用内存计入 slot 内存。framework 为 taskmanager 专用。

堆内内存 蕴含用户代码所用内存、heapstatebackend、框架执行所用内存。

堆外内存 是未经 jvm 虚拟化的内存,间接映射到操作系统的内存地址,堆外内存蕴含框架执行所用内存,jvm 堆外内存、Direct、native 等。

Direct memory 内存可用于网络传输缓冲。network memory 属于 direct memory 的领域,flink 能够借助于此进行 zero copy,从而缩小内核态到用户态 copy 次数,从而进行更高效的 io 操作。

jvm metaspace 寄存 jvm 加载的类的元数据,加载的类越多,须要的空间越大,overhead 用于 jvm 的其余开销,如 native memory、code cache、thread stack 等。

Managed Memory 次要用于 RocksDBStateBackend 和批处理算子,也属于 native memory 的领域,其中 rocksdbstatebackend 对应 rocksdb,rocksdb 基于 lsm 数据结构实现,每个 state 对应一个列族,占有独立的 writebuffer,rocksdb 占用 native 内存大小为 blockCahe + writebufferNum * writeBuffer + index,同时堆外内存是过程之间共享的,jvm 虚拟化大量 heap 内存耗时较久,应用堆外内存的话能够无效的防止该环节。但堆外内存也有肯定的弊病,即监控调试应用绝对简单,对于生命周期较短的 segment 应用堆内内存开销更低,flink 在一些状况下,间接操作二进制数据,防止一些反序列化带来的开销。如果须要解决的数据超出了内存限度,则会将局部数据存储到硬盘上。

内存治理

相似于 OS 中的 page 机制,flink 模仿了操作系统的机制,通过 page 来治理内存,flink 对应 page 的数据结构为 dataview 和 MemorySegment,memorysegment 是 flink 内存调配的最小单位,默认 32kb,其能够在堆上也能够在堆外,flink 通过 MemorySegment 的数据结构来拜访堆内堆外内存,借助于 flink 序列化机制(序列化机制会在下一大节解说),memorysegment 提供了对二进制数据的读取和写入的办法,flink 应用 datainputview 和 dataoutputview 进行 memorysegment 的二进制的读取和写入,flink 能够通过 HeapMemorySegment 治理堆内内存,通过 HybridMemorySegment 来治理堆内和堆外内存,MemorySegment 治理 jvm 堆内存时,其定义一个字节数组的援用指向内存端,基于该外部字节数组的援用进行操作的 HeapMemorySegment。

public abstract class MemorySegment {/**

 * The heap byte array object relative to which we access the memory.
 *  如果为堆内存,则指向拜访的内存的援用,否则若内存为非堆内存,则为 null
 * <p>Is non-<tt>null</tt> if the memory is on the heap, and is <tt>null</tt>, if the memory is
 * off the heap. If we have this buffer, we must never void this reference, or the memory
 * segment will point to undefined addresses outside the heap and may in out-of-order execution
 * cases cause segmentation faults. */
protected final byte[] heapMemory; /**
 * The address to the data, relative to the heap memory byte array. If the heap memory byte
 * array is <tt>null</tt>, this becomes an absolute memory address outside the heap.
 * 字节数组对应的绝对地址 */
protected long address;  

}

HeapMemorySegment 用来调配堆上内存。

public final class HeapMemorySegment extends MemorySegment {/**

 * An extra reference to the heap memory, so we can let byte array checks fail by the built-in
 * checks automatically without extra checks.
 * 字节数组的援用指向该内存段 */
private byte[] memory; public void free() {super.free(); this.memory = null;
} public final void get(DataOutput out, int offset, int length) throws IOException {out.write(this.memory, offset, length);
}

}

HybridMemorySegment 即反对 onheap 和 offheap 内存,flink 通过 jvm 的 unsafe 操作,如果对象 o 不为 null,为 onheap 的场景,并且前面的地址或者地位是绝对地位,那么会间接对以后对象(比方数组)的绝对地位进行操作。如果对象 o 为 null,操作的内存块不是 JVM 堆内存,为 off-heap 的场景,并且前面的地址是某个内存块的相对地址,那么这些办法的调用也相当于对该内存块进行操作。

public final class HybridMemorySegment extends MemorySegment {
@Override public ByteBuffer wrap(int offset, int length) {if (address <= addressLimit) {if (heapMemory != null) {return ByteBuffer.wrap(heapMemory, offset, length);

        } else { try {ByteBuffer wrapper = offHeapBuffer.duplicate();
                wrapper.limit(offset + length);
                wrapper.position(offset); return wrapper;
            } catch (IllegalArgumentException e) {throw new IndexOutOfBoundsException();
            }
        }
    } else {throw new IllegalStateException("segment has been freed");
    }
}

}

flink 通过 MemorySegmentFactory 来创立 memorySegment,memorySegment 是 flink 内存调配的最小单位。对于跨 memorysegment 的数据方位,flink 形象出一个拜访视图,数据读取 datainputView,数据写入 dataoutputview。

/**

  • This interface defines a view over some memory that can be used to sequentially read the contents of the memory.
  • The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. */ @Public public interface DataInputView extends DataInput {private MemorySegment[] memorySegments; // view 持有的 MemorySegment 的援用,该组 memorysegment 能够视为一个内存页,

flink 能够程序读取 memorysegmet 中的数据 /**

 * Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.
 * It returns the number of read bytes or -1 if there is no more data left.
 * @param b byte array to store the data to
 * @param off offset into byte array
 * @param len byte length to read
 * @return the number of actually read bytes of -1 if there is no more data left */
int read(byte[] b, int off, int len) throws IOException;

}

dataoutputview 是数据写入的视图,outputview 持有多个 memorysegment 的援用,flink 能够程序的写入 segment。

/**

  • This interface defines a view over some memory that can be used to sequentially write contents to the memory.
  • The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. / @Public public interface DataOutputView extends DataOutput {private final List<MemorySegment> memory; // memorysegment 的援用 /*

    * Copies {@code numBytes} bytes from the source to this view.
    * @param source The source to copy the bytes from.
    * @param numBytes The number of bytes to copy.

    void write(DataInputView source, int numBytes) throws IOException;

}

上一大节中讲到的 managedmemory 内存局部,flink 应用 memorymanager 来治理该内存,managedmemory 只应用堆外内存,次要用于批处理中的 sorting、hashing、以及 caching(社区音讯,将来流解决也会应用到该局部),在流计算中作为 rocksdbstatebackend 的局部内存。memeorymanager 通过 memorypool 来治理 memorysegment。

/**

  • The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends
  • (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain
  • size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
  • Any allocated memory has to be released to be reused later.
  • <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}).
  • Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately
  • releases the underlying memory. */

public class MemoryManager {/**

 * Allocates a set of memory segments from this memory manager.
 * <p>The total allocated memory will not exceed its size limit, announced in the constructor.
 * @param owner The owner to associate with the memory segment, for the fallback release.
 * @param target The list into which to put the allocated memory pages.
 * @param numberOfPages The number of pages to allocate.
 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
 *                                   of memory pages any more. */
public void allocatePages(
        Object owner,
        Collection<MemorySegment> target, int numberOfPages) throws MemoryAllocationException {

} private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {

    segment.free(); if (segments != null) {segments.remove(segment);
    }
} /**
 * Frees this memory segment.
 * <p>After this operation has been called, no further operations are possible on the memory
 * segment and will fail. The actual memory (heap or off-heap) will only be released after this
 * memory segment object has become garbage collected. */
public void free() { // this ensures we can place no more data and trigger // the checks for the freed segment
    address = addressLimit + 1;
}

}

对于上一大节中提到的 NetWorkMemory 的内存,flink 应用 networkbuffer 做了一层 buffer 封装。buffer 的底层也是 memorysegment,flink 通过 bufferpool 来治理 buffer,每个 taskmanager 都有一个 netwokbufferpool,该 tm 上的各个 task 共享该 networkbufferpool,同时 task 对应的 localbufferpool 所需的内存须要从 networkbufferpool 申请而来,它们都是 flink 申请的堆外内存。

上游算子向 resultpartition 写入数据时,申请 buffer 资源,应用 bufferbuilder 将数据写入 memorysegment,上游算子从 resultsubpartition 生产数据时,利用 bufferconsumer 从 memorysegment 中读取数据,bufferbuilder 与 bufferconsumer 一一对应。同时这一流程也和 flink 的反压机制相干。如图

/**

  • A buffer pool used to manage a number of {@link Buffer} instances from the
  • {@link NetworkBufferPool}.
  • <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
  • free operation of the network stack by limiting the number of buffers per
  • local buffer pool. It also implements the default mechanism for buffer
  • recycling, which ensures that every buffer is ultimately returned to the
  • network buffer pool.
  • <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  • will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
  • match its new size. */

class LocalBufferPool implements BufferPool {
@Nullable private MemorySegment requestMemorySegment(int targetChannel) throws IOException {

    MemorySegment segment = null;
    synchronized (availableMemorySegments) {returnExcessMemorySegments(); if (availableMemorySegments.isEmpty()) {segment = requestMemorySegmentFromGlobal();
        } // segment may have been released by buffer pool owner
        if (segment == null) {segment = availableMemorySegments.poll();
        } if (segment == null) {availabilityHelper.resetUnavailable();
        } if (segment != null && targetChannel != UNKNOWN_CHANNEL) {if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {
                unavailableSubpartitionsCount++;
                availabilityHelper.resetUnavailable();}
        }
    } return segment;
}
} /**
  • A result partition for data produced by a single task.

*

  • <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
  • a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
  • or more {@link ResultSubpartition} instances, which further partition the data depending on the
  • number of consuming tasks and the data {@link DistributionPattern}.
  • <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
  • happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
    The life-cycle of each result partition has three (possibly overlapping) phases:
    Produce Consume Release Buffer management State management */

public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {

  @Override public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkInProduceState(); return bufferPool.requestBufferBuilderBlocking(targetChannel);
}
}

}

自定义序列化框架

flink 对本身反对的根本数据类型,实现了定制的序列化机制,flink 数据集对象绝对固定,能够只保留一份 schema 信息,从而节俭存储空间,数据序列化就是 java 对象和二进制数据之间的数据转换,flink 应用 TypeInformation 的 createSerializer 接口负责创立每种类型的序列化器,进行数据的序列化反序列化,类型信息在构建 streamtransformation 时通过 typeextractor 依据办法签名类信息等提取类型信息并存储在 streamconfig 中。

/**

 * Creates a serializer for the type. The serializer may use the ExecutionConfig
 * for parameterization.
 * 创立出对应类型的序列化器
 * @param config The config used to parameterize the serializer.
 * @return A serializer for this type. */ @PublicEvolving public abstract TypeSerializer<T> createSerializer(ExecutionConfig config); /**
  • A utility for reflection analysis on classes, to determine the return type of implementations of transformation
  • functions. / @Public public class TypeExtractor {/*
  • Creates a {@link TypeInformation} from the given parameters.

    * If the given {@code instance} implements {@link ResultTypeQueryable}, its information
    * is used to determine the type information. Otherwise, the type information is derived
    * based on the given class information.
    * @param instance            instance to determine type information for
    * @param baseClass            base class of {@code instance}
    * @param clazz                class of {@code instance}
    * @param returnParamPos    index of the return type in the type arguments of {@code clazz}
    * @param <OUT>                output type
    * @return type information */ @SuppressWarnings("unchecked")

    @PublicEvolving public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz, int returnParamPos) {if (instance instanceof ResultTypeQueryable) {return ((ResultTypeQueryable<OUT>) instance).getProducedType();

       } else {return createTypeInfo(baseClass, clazz, returnParamPos, null, null);
       }

    }

}

对于嵌套的数据类型,flink 从最内层的字段开始序列化,内层序列化的后果将组成外层序列化后果,反序列时,从内存中程序读取二进制数据,依据偏移量反序列化为 java 对象。flink 自带序列化机制存储密度很高,序列化对应的类型值即可。

flink 中的 table 模块在 memorysegment 的根底上应用了 BinaryRow 的数据结构,能够更好地缩小反序列化开销,须要反序列化是能够只序列化相应的字段,而无需序列化整个对象。

同时你也能够注册子类型和自定义序列化器,对于 flink 无奈序列化的类型,会交给 kryo 进行解决,如果 kryo 也无奈解决,将强制应用 avro 来序列化,kryo 序列化性能绝对 flink 自带序列化机制较低,开发时能够应用 env.getConfig().disableGenericTypes()来禁用 kryo,尽量应用 flink 框架自带的序列化器对应的数据类型。

缓存敌对的数据结构

cpu 中 L1、L2、L3 的缓存读取速度比从内存中读取数据快很多,高速缓存的访问速度是主存的访问速度的很多倍。另外一个重要的程序个性是局部性原理,程序经常应用它们最近应用的数据和指令,其中两种局部性类型,工夫局部性指最近拜访的内容很可能短期内被再次拜访,空间局部性是指地址互相邻近的我的项目很可能短时间内被再次拜访。

联合这两个个性设计缓存敌对的数据结构能够无效的晋升缓存命中率和本地化个性,该个性次要用于排序操作中,惯例状况下一个指针指向一个 <key,v> 对象,排序时须要依据指针 pointer 获取到理论数据,而后再进行比拟,这个环节波及到内存的随机拜访,缓存本地化会很低,应用序列化的定长 key + pointer,这样 key 就会间断存储到内存中,防止的内存的随机拜访,还能够晋升 cpu 缓存命中率。对两条记录进行排序时首先比拟 key,如果大小不同间接返回后果,只需替换指针即可,不必替换理论数据,如果雷同,则比拟指针理论指向的数据。

后记

flink 社区已走向流批一体的倒退,后继将更多的关注与流批一体的引擎实现及联合存储层面的实现。flink 服务请应用华为云 EI DLI-FLINK serverless 服务。

参考

 [1]: https://ci.apache.org/project…

 [2]: https://github.com/apache/flink

 [3]: https://ververica.cn/

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0