版本 日期 备注
1.0 2021.12.20 文章首发

0. 前言

在最后接触到 Flink 时,是来自于业界里一些头部玩家的分享——大家会用其来解决海量数据。在这种场景下,如何防止 JVM GC 带来 StopTheWorld 带来的副作用 这样的问题始终盘绕在我心头。直到用了 Flink 当前,浏览了相干的源码(以 1.14.0 为基准),终于有了一些答案。在这篇文章里也是会分享给大家。

1. JVM 内存治理的有余

除了上述提到的 StopTheWorld,JVM 的内存治理还会带来以下问题:

  • 内存节约:一个 Java 对象在内存中存储时会分为三个局部:对象头、实例数据、对其填充局部。首先,32 位和 64 位的实现中,对象头别离要占用 32bit 和 64bit。而为了提供整体的应用效率,JVM 内存中的数据不是间断存储的,而是依照 8byte 的整数倍进行存储。哪怕你只有 1byte,会主动 padding7byte。
  • 缓存未命中:大家都晓得 CPU 是有 L1、2、3 级缓存的,当 CPU 去读取内存中的数据时,会将内存中邻近的数据读到缓存中——这是程序局部性原理的一种实际伎俩。最近被 CPU 拜访的数据,短期内 CPU 还要拜访(工夫);被 CPU 拜访的数据左近的数据,CPU 短期内还要拜访(空间)。但咱们后面提到,Java 对象在堆上存储的时候并不是间断的,所以 CPU 去读取 JVM 上的对象时,缓存的邻近内存区域数据往往不是 CPU 下一步计算所须要的。这时 CPU 只能空转期待从内存里读取数据(两者的速度不是一个量级)。如果数据恰好被 swap 到硬盘里,那就是难上加难了。

2. Flink 的演进计划

在 v0.10 之前,Flink 应用了堆上内存的实现。简略来说就是通过 Unsafe 来分配内存,并用 byte 数组的形式将其援用起来,应用层本人保护类型信息来获取相应的数据。但这样依然会有问题:

  • 在堆内内存过大的状况下,JVM 启动工夫会很长,而且 Full GC 会达到分钟级。
  • IO 效率低:堆上内存写磁盘或网络至多须要 1 次内存复制。

因而在 v0.10 后,Flink 引入了堆外内存治理性能。见 Jira:Add an off-heap variant of the managed memory。除了解决堆内内存的问题,还会带来一些益处:

  • 堆外内存能够做成过程之间共享。这象征 Flink 能够做此一些不便的故障复原。


  • 调配短生命周期的对象,比起堆上内存,在堆外内存上调配开销更高。
  • 堆外内存出错时排错更为简单。

这种实现在 Spark 中也能够找到,它叫做MemoryPool,同时反对堆内和堆外的内存形式,具体见MemoryMode.scala;Kafka 也有相似的思路——通过 Java NIO 的 ByteBuffer 来保留它的音讯。

3. 源码剖析

总的来说,Flink 在这一块的实现是比拟清晰的——和操作系统一样有内存段,也有内存页这样的数据结构。

3.1 内存段

次要实现为 MemorySegment。在 v1.12 前MemorySegment
仅仅为一个接口,它的实现有两个 HybridMemorySegmentHeapMemorySegment。在之后的倒退中,大家发现 HeapMemorySegment 根本都没有人用了,而是都用 HybridMemorySegment 了,为了优化性能——防止运行时每次都去查函数表确认调用的函数,去掉了 HeapMemorySegment,并将HybridMemorySegment 移到了 MemorySegment 中——这会见带来近 2.7 倍的调用速度优化。:Off-heap Memory in Apache Flink and the curious JIT compiler 以及 Jira:Don’t explicitly use HeapMemorySegment in raw format serde。

MemorySegment次要负责援用内存段,并其中数据进行读写——它对根本类型反对的很好,而简单类型则须要内部来做序列化。具体的实现还是比较简单的,从 field 的申明中就能够大抵看出实现了。惟一须要讲一下的是LITTLE_ENDIAN:不同的 CPU 架构会才不同的存储程序——PowerPC 会采纳 Big Endian 形式,低地址寄存最低无效字节;而 x86 会采纳 Little Endian 形式存储数据,低地址寄存最高无效字节。

说实话,读到这个代码的时候笔者还是略震惊的,因为写 Java 这么多年简直对底层的硬件是无感知的。没想到 Java 代码还要思考兼容 CPU 架构的逻辑。

这个时候就会有同学问了,那这个 MemorySegments 是如何在 Flink 中运作的呢?咱们能够看个测试用例:BinaryRowDataTest 里的 testPagesSer:
先是有 MemorySegments,通过对应的 BinaryRowWriter 写入数据到 RowData,再用 BinaryRowDataSerializer 写 RowData 到 RandomAccessOutputView:

    public void testPagesSer() throws IOException {MemorySegment[] memorySegments = new MemorySegment[5];
        ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
        for (int i = 0; i < 5; i++) {memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);

            // multi memorySegments
            String str = "啦啦啦啦啦我是高兴的粉刷匠,啦啦啦啦啦我是高兴的粉刷匠," + "啦啦啦啦啦我是高兴的粉刷匠。";
            BinaryRowData row = new BinaryRowData(1);
            BinaryRowWriter writer = new BinaryRowWriter(row);
            writer.writeString(0, fromString(str));

            RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
            BinaryRowDataSerializer serializer = new BinaryRowDataSerializer(1);
            serializer.serializeToPages(row, out);

            BinaryRowData mapRow = serializer.createInstance();
            mapRow =
                    serializer.mapFromPages(mapRow, new RandomAccessInputView(memorySegmentList, 64));
            writer.writeString(0, mapRow.getString(0));
            assertEquals(str, row.getString(0).toString());

            BinaryRowData deserRow =
                    serializer.deserializeFromPages(new RandomAccessInputView(memorySegmentList, 64));
            writer.writeString(0, deserRow.getString(0));
            assertEquals(str, row.getString(0).toString());
     // ignore some code

3.2 内存页

一个 MemorySegment 默认对应了 32KB 大小的内存块。在流解决中,很容易呈现超过 32KB 的数据,这时就须要跨 MemorySegment。那么对于编写相应逻辑的人就须要持有多个 MemorySegment,因而 Flink 提供了内存页的实现,它会持有多个 MemorySegment 实例,不便框架的开发人员来疾速的编写 Memory 相干的代码,而无需关注一个个的 MemorySegment。

其形象为 DataInputView 和 DataOutputView,别离对了数据读取和数据写入。

接下来,还是关联理论的代码看一下。咱们以咱们最常见的 KafkaProducer 应用为例:

|-- KafkaProducer#invoke // 在这里指定了 serializedValue
  \-- KeyedSerializationSchema#serializeValue // 序列化 record 的 value

咱们挑一个实现看看,以 TypeInformationKeyValueSerializationSchema 为例:

|-- TypeInformationKeyValueSerializationSchema#deserialize //KeyedSerializationSchema 的实现类
|-- DataInputDeserializer#setBuffer // 这是 DataInputView 的实现,用外部的 byte 数组存储数据。这里很奇怪的是并没有应用 MemorySegement。|-- TypeSerializer#deserialize  // 它的实现会针对不同的类型,从 DataInputView 里读出数据返回

其实这里的例子不太失当。因为 KeyedSerializationSchema 曾经被标记为了废除。社区更倡议咱们应用 KafkaSerializationSchema。第一个起因是因为 KeyedSerializationSchema 的形象并不适合 Kafka,当 Kafka 在 Record 新加字段时,是很难形象当这个接口里的——这个接口仅仅关注了 key、value 以及 topic。

KafkaSerializationSchema 开展的话,咱们能够看典型的实现——KafkaSerializationSchemaWrapper,咱们关怀的中央很容找到:

    public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {byte[] serialized = serializationSchema.serialize(element);
        final Integer partition;
        if (partitioner != null) {partition = partitioner.partition(element, null, serialized, topic, partitions);
        } else {partition = null;}

        final Long timestampToWrite;
        if (writeTimestamp) {timestampToWrite = timestamp;} else {timestampToWrite = null;}

        return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);

这个 serializationSchema 的申明是一个名为 SerializationSchema 的接口。能够看到它有大量的实现,其中很多对应了 DataStream 还有 SQL API 中的 format。咱们以 TypeInformationSerializationSchema 为例持续跟踪:

public class TypeInformationSerializationSchema<T>
        implements DeserializationSchema<T>, SerializationSchema<T> {

    //ignore some filed

    /** The serializer for the actual de-/serialization. */
    private final TypeSerializer<T> serializer;

又看到咱们相熟的接口 TypeSerializer 了。就像下面说的,它的实现会针对不同的类型,从 DataInputView、DataOutputView 进行互动,提供序列化和反序列化的能力。在它的办法签名中也是能够看到的:

     * Serializes the given record to the given target output view.
     * @param record The record to serialize.
     * @param target The output view to write the serialized data to.
     * @throws IOException Thrown, if the serialization encountered an I/O related error. Typically
     *     raised by the output view, which may have an underlying I/O channel to which it
     *     delegates.
    public abstract void serialize(T record, DataOutputView target) throws IOException;

     * De-serializes a record from the given source input view.
     * @param source The input view from which to read the data.
     * @return The deserialized element.
     * @throws IOException Thrown, if the de-serialization encountered an I/O related error.
     *     Typically raised by the input view, which may have an underlying I/O channel from which
     *     it reads.
    public abstract T deserialize(DataInputView source) throws IOException;

     * De-serializes a record from the given source input view into the given reuse record instance
     * if mutable.
     * @param reuse The record instance into which to de-serialize the data.
     * @param source The input view from which to read the data.
     * @return The deserialized element.
     * @throws IOException Thrown, if the de-serialization encountered an I/O related error.
     *     Typically raised by the input view, which may have an underlying I/O channel from which
     *     it reads.
    public abstract T deserialize(T reuse, DataInputView source) throws IOException;

     * Copies exactly one record from the source input view to the target output view. Whether this
     * operation works on binary data or partially de-serializes the record to determine its length
     * (such as for records of variable length) is up to the implementer. Binary copies are
     * typically faster. A copy of a record containing two integer numbers (8 bytes total) is most
     * efficiently implemented as {@code target.write(source, 8);}.
     * @param source The input view from which to read the record.
     * @param target The target output view to which to write the record.
     * @throws IOException Thrown if any of the two views raises an exception.
    public abstract void copy(DataInputView source, DataOutputView target) throws IOException;

那么 TypeSerializer#deserialize 到底是怎么被调用到的呢?这些细节并不是这篇文章须要关怀的。在这里咱们展现一下调用链,有趣味的读者能够沿着这个调用链看一下具体的代码:

|-- TypeSerializer#deserialize
|-- StreamElementSerializer#deserialize
|-- TypeInformationKeyValueSerializationSchema#deserialize
|-- KafkaDeserializationSchema#deserialize
|-- KafkaFetcher#partitionConsumerRecordsHandler // 到这里曾经很分明了,这里是由 FlinkKafkaConsumer new 进去的对象

3.3 缓冲池

还有一个比拟有意思的类是 LocalBufferPool,封装了MemorySegment。个别用于网络缓冲器(NetworkBuffer),NetworkBuffer 是网络替换数据的包装,当后果分区(ResultParition)开始写出数据的时候,须要向 LocalBufferPool 申请 Buffer 资源。


|-- Task#constructor // 结构工作
|-- NettyShuffleEnvironment#createResultPartitionWriters // 创立用于写入后果的后果分区
|-- ResultPartitionFactory#create
  \-- ResultPartitionFactory#createBufferPoolFactory // 在这里创立了一个简略的 BufferPoolFactory
|-- PipelinedResultPartition#constructor
|-- BufferWritingResultPartition#constructor
|-- SortMergeResultPartition#constructor or BufferWritingResultPartition#constructor
|-- ResultPartition#constructor
  \-- ResultPartition#steup // 注册缓冲池到这个后果分区中

另外,NetworkBuffer实现了 Netty 的AbstractReferenceCountedByteBuf。这意味着这里采纳了经典的援用计数算法,当 Buffer 不再被须要时,会被回收。

4. 其余

