本文首发于泊浮目标简书:https://www.jianshu.com/u/204...
版本日期备注
1.02021.12.20文章首发

0. 前言

在最后接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来解决海量数据。在这种场景下,如何防止JVM GC带来StopTheWorld带来的副作用这样的问题始终盘绕在我心头。直到用了Flink当前,浏览了相干的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。

1. JVM内存治理的有余

除了上述提到的StopTheWorld,JVM的内存治理还会带来以下问题:

  • 内存节约:一个Java对象在内存中存储时会分为三个局部:对象头、实例数据、对其填充局部。首先,32位和64位的实现中,对象头别离要占用32bit和64bit。而为了提供整体的应用效率,JVM内存中的数据不是间断存储的,而是依照8byte的整数倍进行存储。哪怕你只有1byte,会主动padding7byte。
  • 缓存未命中:大家都晓得CPU是有L1、2、3级缓存的,当CPU去读取内存中的数据时,会将内存中邻近的数据读到缓存中——这是程序局部性原理的一种实际伎俩。最近被CPU拜访的数据,短期内CPU还要拜访(工夫);被CPU拜访的数据左近的数据,CPU短期内还要拜访(空间)。但咱们后面提到,Java对象在堆上存储的时候并不是间断的,所以CPU去读取JVM上的对象时,缓存的邻近内存区域数据往往不是CPU下一步计算所须要的。这时CPU只能空转期待从内存里读取数据(两者的速度不是一个量级)。如果数据恰好被swap到硬盘里,那就是难上加难了。

2. Flink的演进计划

在v0.10之前,Flink应用了堆上内存的实现。简略来说就是通过Unsafe来分配内存,并用byte数组的形式将其援用起来,应用层本人保护类型信息来获取相应的数据。但这样依然会有问题:

  • 在堆内内存过大的状况下,JVM启动工夫会很长,而且Full GC会达到分钟级。
  • IO效率低:堆上内存写磁盘或网络至多须要1次内存复制。

因而在v0.10后,Flink引入了堆外内存治理性能。见Jira:Add an off-heap variant of the managed memory。除了解决堆内内存的问题,还会带来一些益处:

  • 堆外内存能够做成过程之间共享。这象征Flink能够做此一些不便的故障复原。

当然,凡事都是有双面性的,毛病是:

  • 调配短生命周期的对象,比起堆上内存,在堆外内存上调配开销更高。
  • 堆外内存出错时排错更为简单。

这种实现在Spark中也能够找到,它叫做MemoryPool,同时反对堆内和堆外的内存形式,具体见MemoryMode.scala;Kafka也有相似的思路——通过Java NIO的ByteBuffer来保留它的音讯。

3. 源码剖析

总的来说,Flink在这一块的实现是比拟清晰的——和操作系统一样有内存段,也有内存页这样的数据结构。

3.1 内存段

次要实现为MemorySegment。在v1.12前MemorySegment
仅仅为一个接口,它的实现有两个HybridMemorySegmentHeapMemorySegment。在之后的倒退中,大家发现HeapMemorySegment根本都没有人用了,而是都用HybridMemorySegment了,为了优化性能——防止运行时每次都去查函数表确认调用的函数,去掉了HeapMemorySegment,并将HybridMemorySegment移到了MemorySegment中——这会见带来近2.7倍的调用速度优化。:Off-heap Memory in Apache Flink and the curious JIT compiler以及Jira:Don't explicitly use HeapMemorySegment in raw format serde。

MemorySegment次要负责援用内存段,并其中数据进行读写——它对根本类型反对的很好,而简单类型则须要内部来做序列化。具体的实现还是比较简单的,从field的申明中就能够大抵看出实现了。惟一须要讲一下的是LITTLE_ENDIAN:不同的CPU架构会才不同的存储程序——PowerPC会采纳Big Endian形式,低地址寄存最低无效字节;而x86会采纳Little Endian形式存储数据,低地址寄存最高无效字节。

说实话,读到这个代码的时候笔者还是略震惊的,因为写Java这么多年简直对底层的硬件是无感知的。没想到Java代码还要思考兼容CPU架构的逻辑。

这个时候就会有同学问了,那这个MemorySegments是如何在Flink中运作的呢?咱们能够看个测试用例:BinaryRowDataTest里的testPagesSer:
先是有MemorySegments,通过对应的BinaryRowWriter写入数据到RowData,再用BinaryRowDataSerializer写RowData到RandomAccessOutputView:

    @Test    public void testPagesSer() throws IOException {        MemorySegment[] memorySegments = new MemorySegment[5];        ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();        for (int i = 0; i < 5; i++) {            memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);            memorySegmentList.add(memorySegments[i]);        }        {            // multi memorySegments            String str = "啦啦啦啦啦我是高兴的粉刷匠,啦啦啦啦啦我是高兴的粉刷匠," + "啦啦啦啦啦我是高兴的粉刷匠。";            BinaryRowData row = new BinaryRowData(1);            BinaryRowWriter writer = new BinaryRowWriter(row);            writer.writeString(0, fromString(str));            writer.complete();            RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);            BinaryRowDataSerializer serializer = new BinaryRowDataSerializer(1);            serializer.serializeToPages(row, out);            BinaryRowData mapRow = serializer.createInstance();            mapRow =                    serializer.mapFromPages(                            mapRow, new RandomAccessInputView(memorySegmentList, 64));            writer.reset();            writer.writeString(0, mapRow.getString(0));            writer.complete();            assertEquals(str, row.getString(0).toString());            BinaryRowData deserRow =                    serializer.deserializeFromPages(                            new RandomAccessInputView(memorySegmentList, 64));            writer.reset();            writer.writeString(0, deserRow.getString(0));            writer.complete();            assertEquals(str, row.getString(0).toString());        }     // ignore some code    }

3.2 内存页

一个MemorySegment默认对应了32KB大小的内存块。在流解决中,很容易呈现超过32KB的数据,这时就须要跨MemorySegment。那么对于编写相应逻辑的人就须要持有多个MemorySegment,因而Flink提供了内存页的实现,它会持有多个MemorySegment实例,不便框架的开发人员来疾速的编写Memory相干的代码,而无需关注一个个的MemorySegment。

其形象为DataInputView和DataOutputView,别离对了数据读取和数据写入。

接下来,还是关联理论的代码看一下。咱们以咱们最常见的KafkaProducer应用为例:

|-- KafkaProducer#invoke //在这里指定了serializedValue  \-- KeyedSerializationSchema#serializeValue //序列化record 的value

咱们挑一个实现看看,以TypeInformationKeyValueSerializationSchema为例:

|-- TypeInformationKeyValueSerializationSchema#deserialize //KeyedSerializationSchema的实现类|-- DataInputDeserializer#setBuffer // 这是DataInputView的实现,用外部的byte数组存储数据。这里很奇怪的是并没有应用MemorySegement。|-- TypeSerializer#deserialize  // 它的实现会针对不同的类型,从DataInputView里读出数据返回
其实这里的例子不太失当。因为KeyedSerializationSchema曾经被标记为了废除。社区更倡议咱们应用KafkaSerializationSchema。第一个起因是因为KeyedSerializationSchema的形象并不适合Kafka,当Kafka在Record新加字段时,是很难形象当这个接口里的——这个接口仅仅关注了key、value以及topic。

KafkaSerializationSchema开展的话,咱们能够看典型的实现——KafkaSerializationSchemaWrapper,咱们关怀的中央很容找到:

    @Override    public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {        byte[] serialized = serializationSchema.serialize(element);        final Integer partition;        if (partitioner != null) {            partition = partitioner.partition(element, null, serialized, topic, partitions);        } else {            partition = null;        }        final Long timestampToWrite;        if (writeTimestamp) {            timestampToWrite = timestamp;        } else {            timestampToWrite = null;        }        return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);    }

这个serializationSchema的申明是一个名为SerializationSchema的接口。能够看到它有大量的实现,其中很多对应了DataStream还有SQL API中的format。咱们以TypeInformationSerializationSchema为例持续跟踪:

@Publicpublic class TypeInformationSerializationSchema<T>        implements DeserializationSchema<T>, SerializationSchema<T> {    //ignore some filed    /** The serializer for the actual de-/serialization. */    private final TypeSerializer<T> serializer;....

又看到咱们相熟的接口TypeSerializer了。就像下面说的,它的实现会针对不同的类型,从DataInputView、DataOutputView进行互动,提供序列化和反序列化的能力。在它的办法签名中也是能够看到的:

    /**     * Serializes the given record to the given target output view.     *     * @param record The record to serialize.     * @param target The output view to write the serialized data to.     * @throws IOException Thrown, if the serialization encountered an I/O related error. Typically     *     raised by the output view, which may have an underlying I/O channel to which it     *     delegates.     */    public abstract void serialize(T record, DataOutputView target) throws IOException;    /**     * De-serializes a record from the given source input view.     *     * @param source The input view from which to read the data.     * @return The deserialized element.     * @throws IOException Thrown, if the de-serialization encountered an I/O related error.     *     Typically raised by the input view, which may have an underlying I/O channel from which     *     it reads.     */    public abstract T deserialize(DataInputView source) throws IOException;    /**     * De-serializes a record from the given source input view into the given reuse record instance     * if mutable.     *     * @param reuse The record instance into which to de-serialize the data.     * @param source The input view from which to read the data.     * @return The deserialized element.     * @throws IOException Thrown, if the de-serialization encountered an I/O related error.     *     Typically raised by the input view, which may have an underlying I/O channel from which     *     it reads.     */    public abstract T deserialize(T reuse, DataInputView source) throws IOException;    /**     * Copies exactly one record from the source input view to the target output view. Whether this     * operation works on binary data or partially de-serializes the record to determine its length     * (such as for records of variable length) is up to the implementer. Binary copies are     * typically faster. A copy of a record containing two integer numbers (8 bytes total) is most     * efficiently implemented as {@code target.write(source, 8);}.     *     * @param source The input view from which to read the record.     * @param target The target output view to which to write the record.     * @throws IOException Thrown if any of the two views raises an exception.     */    public abstract void copy(DataInputView source, DataOutputView target) throws IOException;

那么TypeSerializer#deserialize到底是怎么被调用到的呢?这些细节并不是这篇文章须要关怀的。在这里咱们展现一下调用链,有趣味的读者能够沿着这个调用链看一下具体的代码:

|-- TypeSerializer#deserialize|-- StreamElementSerializer#deserialize|-- TypeInformationKeyValueSerializationSchema#deserialize|-- KafkaDeserializationSchema#deserialize|-- KafkaFetcher#partitionConsumerRecordsHandler //到这里曾经很分明了,这里是由FlinkKafkaConsumer new进去的对象

3.3 缓冲池

还有一个比拟有意思的类是LocalBufferPool,封装了MemorySegment。个别用于网络缓冲器(NetworkBuffer),NetworkBuffer是网络替换数据的包装,当后果分区(ResultParition)开始写出数据的时候,须要向LocalBufferPool申请Buffer资源。

写入逻辑:

|-- Task#constructor //结构工作|-- NettyShuffleEnvironment#createResultPartitionWriters // 创立用于写入后果的后果分区|-- ResultPartitionFactory#create  \-- ResultPartitionFactory#createBufferPoolFactory //在这里创立了一个简略的BufferPoolFactory|-- PipelinedResultPartition#constructor|-- BufferWritingResultPartition#constructor|-- SortMergeResultPartition#constructor or BufferWritingResultPartition#constructor|-- ResultPartition#constructor  \-- ResultPartition#steup // 注册缓冲池到这个后果分区中

另外,NetworkBuffer实现了Netty的AbstractReferenceCountedByteBuf。这意味着这里采纳了经典的援用计数算法,当Buffer不再被须要时,会被回收。

4. 其余

4.1 相干Flink Jira

以下是我在写本文时参考过的Jira列表:

  • Add an off-heap variant of the managed memory:https://issues.apache.org/jir...
  • Separate type specific memory segments.:https://issues.apache.org/jir...
  • Investigate potential out-of-memory problems due to managed unsafe memory allocation:https://issues.apache.org/jir...
  • Adjust GC Cleaner for unsafe memory and Java 11:https://issues.apache.org/jir...
  • FLIP-49 Unified Memory Configuration for TaskExecutors:https://issues.apache.org/jir...
  • Don't explicitly use HeapMemorySegment in raw format serde:https://issues.apache.org/jir...
  • Refactor HybridMemorySegment:https://issues.apache.org/jir...
  • use flink's buffers in netty:https://issues.apache.org/jir...
  • Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment:https://issues.apache.org/jir...