本文首发于泊浮目标简书:https://www.jianshu.com/u/204...
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2021.12.20 | 文章首发 |
0. 前言
在最后接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来解决海量数据。在这种场景下,如何防止JVM GC带来StopTheWorld带来的副作用
这样的问题始终盘绕在我心头。直到用了Flink当前,浏览了相干的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
1. JVM内存治理的有余
除了上述提到的StopTheWorld,JVM的内存治理还会带来以下问题:
- 内存节约:一个Java对象在内存中存储时会分为三个局部:对象头、实例数据、对其填充局部。首先,32位和64位的实现中,对象头别离要占用32bit和64bit。而为了提供整体的应用效率,JVM内存中的数据不是间断存储的,而是依照8byte的整数倍进行存储。哪怕你只有1byte,会主动padding7byte。
- 缓存未命中:大家都晓得CPU是有L1、2、3级缓存的,当CPU去读取内存中的数据时,会将内存中邻近的数据读到缓存中——这是程序局部性原理的一种实际伎俩。
最近被CPU拜访的数据,短期内CPU还要拜访(工夫);被CPU拜访的数据左近的数据,CPU短期内还要拜访(空间)。
但咱们后面提到,Java对象在堆上存储的时候并不是间断的,所以CPU去读取JVM上的对象时,缓存的邻近内存区域数据往往不是CPU下一步计算所须要的。这时CPU只能空转期待从内存里读取数据(两者的速度不是一个量级)。如果数据恰好被swap到硬盘里,那就是难上加难了。
2. Flink的演进计划
在v0.10之前,Flink应用了堆上内存的实现。简略来说就是通过Unsafe来分配内存,并用byte数组的形式将其援用起来,应用层本人保护类型信息来获取相应的数据。但这样依然会有问题:
- 在堆内内存过大的状况下,JVM启动工夫会很长,而且Full GC会达到分钟级。
- IO效率低:堆上内存写磁盘或网络至多须要1次内存复制。
因而在v0.10后,Flink引入了堆外内存治理性能。见Jira:Add an off-heap variant of the managed memory。除了解决堆内内存的问题,还会带来一些益处:
- 堆外内存能够做成过程之间共享。这象征Flink能够做此一些不便的故障复原。
当然,凡事都是有双面性的,毛病是:
- 调配短生命周期的对象,比起堆上内存,在堆外内存上调配开销更高。
- 堆外内存出错时排错更为简单。
这种实现在Spark中也能够找到,它叫做MemoryPool
,同时反对堆内和堆外的内存形式,具体见MemoryMode.scala
;Kafka也有相似的思路——通过Java NIO的ByteBuffer来保留它的音讯。
3. 源码剖析
总的来说,Flink在这一块的实现是比拟清晰的——和操作系统一样有内存段,也有内存页这样的数据结构。
3.1 内存段
次要实现为MemorySegment
。在v1.12前MemorySegment
仅仅为一个接口,它的实现有两个HybridMemorySegment
和HeapMemorySegment
。在之后的倒退中,大家发现HeapMemorySegment
根本都没有人用了,而是都用HybridMemorySegment
了,为了优化性能——防止运行时每次都去查函数表确认调用的函数,去掉了HeapMemorySegment
,并将HybridMemorySegment
移到了MemorySegment
中——这会见带来近2.7倍的调用速度优化。:Off-heap Memory in Apache Flink and the curious JIT compiler以及Jira:Don't explicitly use HeapMemorySegment in raw format serde。
MemorySegment
次要负责援用内存段,并其中数据进行读写——它对根本类型反对的很好,而简单类型则须要内部来做序列化。具体的实现还是比较简单的,从field的申明中就能够大抵看出实现了。惟一须要讲一下的是LITTLE_ENDIAN
:不同的CPU架构会才不同的存储程序——PowerPC会采纳Big Endian形式,低地址寄存最低无效字节;而x86会采纳Little Endian形式存储数据,低地址寄存最高无效字节。
说实话,读到这个代码的时候笔者还是略震惊的,因为写Java这么多年简直对底层的硬件是无感知的。没想到Java代码还要思考兼容CPU架构的逻辑。
这个时候就会有同学问了,那这个MemorySegments是如何在Flink中运作的呢?咱们能够看个测试用例:BinaryRowDataTest里的testPagesSer:
先是有MemorySegments,通过对应的BinaryRowWriter写入数据到RowData,再用BinaryRowDataSerializer写RowData到RandomAccessOutputView:
@Test public void testPagesSer() throws IOException { MemorySegment[] memorySegments = new MemorySegment[5]; ArrayList<MemorySegment> memorySegmentList = new ArrayList<>(); for (int i = 0; i < 5; i++) { memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]); memorySegmentList.add(memorySegments[i]); } { // multi memorySegments String str = "啦啦啦啦啦我是高兴的粉刷匠,啦啦啦啦啦我是高兴的粉刷匠," + "啦啦啦啦啦我是高兴的粉刷匠。"; BinaryRowData row = new BinaryRowData(1); BinaryRowWriter writer = new BinaryRowWriter(row); writer.writeString(0, fromString(str)); writer.complete(); RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64); BinaryRowDataSerializer serializer = new BinaryRowDataSerializer(1); serializer.serializeToPages(row, out); BinaryRowData mapRow = serializer.createInstance(); mapRow = serializer.mapFromPages( mapRow, new RandomAccessInputView(memorySegmentList, 64)); writer.reset(); writer.writeString(0, mapRow.getString(0)); writer.complete(); assertEquals(str, row.getString(0).toString()); BinaryRowData deserRow = serializer.deserializeFromPages( new RandomAccessInputView(memorySegmentList, 64)); writer.reset(); writer.writeString(0, deserRow.getString(0)); writer.complete(); assertEquals(str, row.getString(0).toString()); } // ignore some code }
3.2 内存页
一个MemorySegment默认对应了32KB大小的内存块。在流解决中,很容易呈现超过32KB的数据,这时就须要跨MemorySegment。那么对于编写相应逻辑的人就须要持有多个MemorySegment,因而Flink提供了内存页的实现,它会持有多个MemorySegment实例,不便框架的开发人员来疾速的编写Memory相干的代码,而无需关注一个个的MemorySegment。
其形象为DataInputView和DataOutputView,别离对了数据读取和数据写入。
接下来,还是关联理论的代码看一下。咱们以咱们最常见的KafkaProducer应用为例:
|-- KafkaProducer#invoke //在这里指定了serializedValue \-- KeyedSerializationSchema#serializeValue //序列化record 的value
咱们挑一个实现看看,以TypeInformationKeyValueSerializationSchema
为例:
|-- TypeInformationKeyValueSerializationSchema#deserialize //KeyedSerializationSchema的实现类|-- DataInputDeserializer#setBuffer // 这是DataInputView的实现,用外部的byte数组存储数据。这里很奇怪的是并没有应用MemorySegement。|-- TypeSerializer#deserialize // 它的实现会针对不同的类型,从DataInputView里读出数据返回
其实这里的例子不太失当。因为KeyedSerializationSchema曾经被标记为了废除。社区更倡议咱们应用KafkaSerializationSchema。第一个起因是因为KeyedSerializationSchema的形象并不适合Kafka,当Kafka在Record新加字段时,是很难形象当这个接口里的——这个接口仅仅关注了key、value以及topic。
以KafkaSerializationSchema
开展的话,咱们能够看典型的实现——KafkaSerializationSchemaWrapper
,咱们关怀的中央很容找到:
@Override public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) { byte[] serialized = serializationSchema.serialize(element); final Integer partition; if (partitioner != null) { partition = partitioner.partition(element, null, serialized, topic, partitions); } else { partition = null; } final Long timestampToWrite; if (writeTimestamp) { timestampToWrite = timestamp; } else { timestampToWrite = null; } return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized); }
这个serializationSchema的申明是一个名为SerializationSchema
的接口。能够看到它有大量的实现,其中很多对应了DataStream还有SQL API中的format。咱们以TypeInformationSerializationSchema
为例持续跟踪:
@Publicpublic class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>, SerializationSchema<T> { //ignore some filed /** The serializer for the actual de-/serialization. */ private final TypeSerializer<T> serializer;....
又看到咱们相熟的接口TypeSerializer
了。就像下面说的,它的实现会针对不同的类型,从DataInputView、DataOutputView进行互动,提供序列化和反序列化的能力。在它的办法签名中也是能够看到的:
/** * Serializes the given record to the given target output view. * * @param record The record to serialize. * @param target The output view to write the serialized data to. * @throws IOException Thrown, if the serialization encountered an I/O related error. Typically * raised by the output view, which may have an underlying I/O channel to which it * delegates. */ public abstract void serialize(T record, DataOutputView target) throws IOException; /** * De-serializes a record from the given source input view. * * @param source The input view from which to read the data. * @return The deserialized element. * @throws IOException Thrown, if the de-serialization encountered an I/O related error. * Typically raised by the input view, which may have an underlying I/O channel from which * it reads. */ public abstract T deserialize(DataInputView source) throws IOException; /** * De-serializes a record from the given source input view into the given reuse record instance * if mutable. * * @param reuse The record instance into which to de-serialize the data. * @param source The input view from which to read the data. * @return The deserialized element. * @throws IOException Thrown, if the de-serialization encountered an I/O related error. * Typically raised by the input view, which may have an underlying I/O channel from which * it reads. */ public abstract T deserialize(T reuse, DataInputView source) throws IOException; /** * Copies exactly one record from the source input view to the target output view. Whether this * operation works on binary data or partially de-serializes the record to determine its length * (such as for records of variable length) is up to the implementer. Binary copies are * typically faster. A copy of a record containing two integer numbers (8 bytes total) is most * efficiently implemented as {@code target.write(source, 8);}. * * @param source The input view from which to read the record. * @param target The target output view to which to write the record. * @throws IOException Thrown if any of the two views raises an exception. */ public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
那么TypeSerializer#deserialize
到底是怎么被调用到的呢?这些细节并不是这篇文章须要关怀的。在这里咱们展现一下调用链,有趣味的读者能够沿着这个调用链看一下具体的代码:
|-- TypeSerializer#deserialize|-- StreamElementSerializer#deserialize|-- TypeInformationKeyValueSerializationSchema#deserialize|-- KafkaDeserializationSchema#deserialize|-- KafkaFetcher#partitionConsumerRecordsHandler //到这里曾经很分明了,这里是由FlinkKafkaConsumer new进去的对象
3.3 缓冲池
还有一个比拟有意思的类是LocalBufferPool
,封装了MemorySegment
。个别用于网络缓冲器(NetworkBuffer),NetworkBuffer
是网络替换数据的包装,当后果分区(ResultParition)开始写出数据的时候,须要向LocalBufferPool申请Buffer资源。
写入逻辑:
|-- Task#constructor //结构工作|-- NettyShuffleEnvironment#createResultPartitionWriters // 创立用于写入后果的后果分区|-- ResultPartitionFactory#create \-- ResultPartitionFactory#createBufferPoolFactory //在这里创立了一个简略的BufferPoolFactory|-- PipelinedResultPartition#constructor|-- BufferWritingResultPartition#constructor|-- SortMergeResultPartition#constructor or BufferWritingResultPartition#constructor|-- ResultPartition#constructor \-- ResultPartition#steup // 注册缓冲池到这个后果分区中
另外,NetworkBuffer
实现了Netty的AbstractReferenceCountedByteBuf
。这意味着这里采纳了经典的援用计数算法,当Buffer不再被须要时,会被回收。
4. 其余
4.1 相干Flink Jira
以下是我在写本文时参考过的Jira列表:
- Add an off-heap variant of the managed memory:https://issues.apache.org/jir...
- Separate type specific memory segments.:https://issues.apache.org/jir...
- Investigate potential out-of-memory problems due to managed unsafe memory allocation:https://issues.apache.org/jir...
- Adjust GC Cleaner for unsafe memory and Java 11:https://issues.apache.org/jir...
- FLIP-49 Unified Memory Configuration for TaskExecutors:https://issues.apache.org/jir...
- Don't explicitly use HeapMemorySegment in raw format serde:https://issues.apache.org/jir...
- Refactor HybridMemorySegment:https://issues.apache.org/jir...
- use flink's buffers in netty:https://issues.apache.org/jir...
- Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment:https://issues.apache.org/jir...