本文首发于泊浮目标简书:https://www.jianshu.com/u/204…
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2021.12.20 | 文章首发 |
0. 前言
在最后接触到 Flink 时,是来自于业界里一些头部玩家的分享——大家会用其来解决海量数据。在这种场景下,如何防止 JVM GC 带来 StopTheWorld 带来的副作用
这样的问题始终盘绕在我心头。直到用了 Flink 当前,浏览了相干的源码(以 1.14.0 为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
1. JVM 内存治理的有余
除了上述提到的 StopTheWorld,JVM 的内存治理还会带来以下问题:
- 内存节约:一个 Java 对象在内存中存储时会分为三个局部:对象头、实例数据、对其填充局部。首先,32 位和 64 位的实现中,对象头别离要占用 32bit 和 64bit。而为了提供整体的应用效率,JVM 内存中的数据不是间断存储的,而是依照 8byte 的整数倍进行存储。哪怕你只有 1byte,会主动 padding7byte。
- 缓存未命中:大家都晓得 CPU 是有 L1、2、3 级缓存的,当 CPU 去读取内存中的数据时,会将内存中邻近的数据读到缓存中——这是程序局部性原理的一种实际伎俩。
最近被 CPU 拜访的数据,短期内 CPU 还要拜访(工夫);被 CPU 拜访的数据左近的数据,CPU 短期内还要拜访(空间)。
但咱们后面提到,Java 对象在堆上存储的时候并不是间断的,所以 CPU 去读取 JVM 上的对象时,缓存的邻近内存区域数据往往不是 CPU 下一步计算所须要的。这时 CPU 只能空转期待从内存里读取数据(两者的速度不是一个量级)。如果数据恰好被 swap 到硬盘里,那就是难上加难了。
2. Flink 的演进计划
在 v0.10 之前,Flink 应用了堆上内存的实现。简略来说就是通过 Unsafe 来分配内存,并用 byte 数组的形式将其援用起来,应用层本人保护类型信息来获取相应的数据。但这样依然会有问题:
- 在堆内内存过大的状况下,JVM 启动工夫会很长,而且 Full GC 会达到分钟级。
- IO 效率低:堆上内存写磁盘或网络至多须要 1 次内存复制。
因而在 v0.10 后,Flink 引入了堆外内存治理性能。见 Jira:Add an off-heap variant of the managed memory。除了解决堆内内存的问题,还会带来一些益处:
- 堆外内存能够做成过程之间共享。这象征 Flink 能够做此一些不便的故障复原。
当然,凡事都是有双面性的,毛病是:
- 调配短生命周期的对象,比起堆上内存,在堆外内存上调配开销更高。
- 堆外内存出错时排错更为简单。
这种实现在 Spark 中也能够找到,它叫做MemoryPool
,同时反对堆内和堆外的内存形式,具体见MemoryMode.scala
;Kafka 也有相似的思路——通过 Java NIO 的 ByteBuffer 来保留它的音讯。
3. 源码剖析
总的来说,Flink 在这一块的实现是比拟清晰的——和操作系统一样有内存段,也有内存页这样的数据结构。
3.1 内存段
次要实现为 MemorySegment
。在 v1.12 前MemorySegment
仅仅为一个接口,它的实现有两个 HybridMemorySegment
和HeapMemorySegment
。在之后的倒退中,大家发现 HeapMemorySegment
根本都没有人用了,而是都用 HybridMemorySegment
了,为了优化性能——防止运行时每次都去查函数表确认调用的函数,去掉了 HeapMemorySegment
,并将HybridMemorySegment
移到了 MemorySegment
中——这会见带来近 2.7 倍的调用速度优化。:Off-heap Memory in Apache Flink and the curious JIT compiler 以及 Jira:Don’t explicitly use HeapMemorySegment in raw format serde。
MemorySegment
次要负责援用内存段,并其中数据进行读写——它对根本类型反对的很好,而简单类型则须要内部来做序列化。具体的实现还是比较简单的,从 field 的申明中就能够大抵看出实现了。惟一须要讲一下的是LITTLE_ENDIAN
:不同的 CPU 架构会才不同的存储程序——PowerPC 会采纳 Big Endian 形式,低地址寄存最低无效字节;而 x86 会采纳 Little Endian 形式存储数据,低地址寄存最高无效字节。
说实话,读到这个代码的时候笔者还是略震惊的,因为写 Java 这么多年简直对底层的硬件是无感知的。没想到 Java 代码还要思考兼容 CPU 架构的逻辑。
这个时候就会有同学问了,那这个 MemorySegments 是如何在 Flink 中运作的呢?咱们能够看个测试用例:BinaryRowDataTest 里的 testPagesSer:
先是有 MemorySegments,通过对应的 BinaryRowWriter 写入数据到 RowData,再用 BinaryRowDataSerializer 写 RowData 到 RandomAccessOutputView:
@Test
public void testPagesSer() throws IOException {MemorySegment[] memorySegments = new MemorySegment[5];
ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
for (int i = 0; i < 5; i++) {memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);
memorySegmentList.add(memorySegments[i]);
}
{
// multi memorySegments
String str = "啦啦啦啦啦我是高兴的粉刷匠,啦啦啦啦啦我是高兴的粉刷匠," + "啦啦啦啦啦我是高兴的粉刷匠。";
BinaryRowData row = new BinaryRowData(1);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.writeString(0, fromString(str));
writer.complete();
RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
BinaryRowDataSerializer serializer = new BinaryRowDataSerializer(1);
serializer.serializeToPages(row, out);
BinaryRowData mapRow = serializer.createInstance();
mapRow =
serializer.mapFromPages(mapRow, new RandomAccessInputView(memorySegmentList, 64));
writer.reset();
writer.writeString(0, mapRow.getString(0));
writer.complete();
assertEquals(str, row.getString(0).toString());
BinaryRowData deserRow =
serializer.deserializeFromPages(new RandomAccessInputView(memorySegmentList, 64));
writer.reset();
writer.writeString(0, deserRow.getString(0));
writer.complete();
assertEquals(str, row.getString(0).toString());
}
// ignore some code
}
3.2 内存页
一个 MemorySegment 默认对应了 32KB 大小的内存块。在流解决中,很容易呈现超过 32KB 的数据,这时就须要跨 MemorySegment。那么对于编写相应逻辑的人就须要持有多个 MemorySegment,因而 Flink 提供了内存页的实现,它会持有多个 MemorySegment 实例,不便框架的开发人员来疾速的编写 Memory 相干的代码,而无需关注一个个的 MemorySegment。
其形象为 DataInputView 和 DataOutputView,别离对了数据读取和数据写入。
接下来,还是关联理论的代码看一下。咱们以咱们最常见的 KafkaProducer 应用为例:
|-- KafkaProducer#invoke // 在这里指定了 serializedValue
\-- KeyedSerializationSchema#serializeValue // 序列化 record 的 value
咱们挑一个实现看看,以 TypeInformationKeyValueSerializationSchema
为例:
|-- TypeInformationKeyValueSerializationSchema#deserialize //KeyedSerializationSchema 的实现类
|-- DataInputDeserializer#setBuffer // 这是 DataInputView 的实现,用外部的 byte 数组存储数据。这里很奇怪的是并没有应用 MemorySegement。|-- TypeSerializer#deserialize // 它的实现会针对不同的类型,从 DataInputView 里读出数据返回
其实这里的例子不太失当。因为 KeyedSerializationSchema 曾经被标记为了废除。社区更倡议咱们应用 KafkaSerializationSchema。第一个起因是因为 KeyedSerializationSchema 的形象并不适合 Kafka,当 Kafka 在 Record 新加字段时,是很难形象当这个接口里的——这个接口仅仅关注了 key、value 以及 topic。
以 KafkaSerializationSchema
开展的话,咱们能够看典型的实现——KafkaSerializationSchemaWrapper
,咱们关怀的中央很容找到:
@Override
public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {byte[] serialized = serializationSchema.serialize(element);
final Integer partition;
if (partitioner != null) {partition = partitioner.partition(element, null, serialized, topic, partitions);
} else {partition = null;}
final Long timestampToWrite;
if (writeTimestamp) {timestampToWrite = timestamp;} else {timestampToWrite = null;}
return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);
}
这个 serializationSchema 的申明是一个名为 SerializationSchema
的接口。能够看到它有大量的实现,其中很多对应了 DataStream 还有 SQL API 中的 format。咱们以 TypeInformationSerializationSchema
为例持续跟踪:
@Public
public class TypeInformationSerializationSchema<T>
implements DeserializationSchema<T>, SerializationSchema<T> {
//ignore some filed
/** The serializer for the actual de-/serialization. */
private final TypeSerializer<T> serializer;
....
又看到咱们相熟的接口 TypeSerializer
了。就像下面说的,它的实现会针对不同的类型,从 DataInputView、DataOutputView 进行互动,提供序列化和反序列化的能力。在它的办法签名中也是能够看到的:
/**
* Serializes the given record to the given target output view.
*
* @param record The record to serialize.
* @param target The output view to write the serialized data to.
* @throws IOException Thrown, if the serialization encountered an I/O related error. Typically
* raised by the output view, which may have an underlying I/O channel to which it
* delegates.
*/
public abstract void serialize(T record, DataOutputView target) throws IOException;
/**
* De-serializes a record from the given source input view.
*
* @param source The input view from which to read the data.
* @return The deserialized element.
* @throws IOException Thrown, if the de-serialization encountered an I/O related error.
* Typically raised by the input view, which may have an underlying I/O channel from which
* it reads.
*/
public abstract T deserialize(DataInputView source) throws IOException;
/**
* De-serializes a record from the given source input view into the given reuse record instance
* if mutable.
*
* @param reuse The record instance into which to de-serialize the data.
* @param source The input view from which to read the data.
* @return The deserialized element.
* @throws IOException Thrown, if the de-serialization encountered an I/O related error.
* Typically raised by the input view, which may have an underlying I/O channel from which
* it reads.
*/
public abstract T deserialize(T reuse, DataInputView source) throws IOException;
/**
* Copies exactly one record from the source input view to the target output view. Whether this
* operation works on binary data or partially de-serializes the record to determine its length
* (such as for records of variable length) is up to the implementer. Binary copies are
* typically faster. A copy of a record containing two integer numbers (8 bytes total) is most
* efficiently implemented as {@code target.write(source, 8);}.
*
* @param source The input view from which to read the record.
* @param target The target output view to which to write the record.
* @throws IOException Thrown if any of the two views raises an exception.
*/
public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
那么 TypeSerializer#deserialize
到底是怎么被调用到的呢?这些细节并不是这篇文章须要关怀的。在这里咱们展现一下调用链,有趣味的读者能够沿着这个调用链看一下具体的代码:
|-- TypeSerializer#deserialize
|-- StreamElementSerializer#deserialize
|-- TypeInformationKeyValueSerializationSchema#deserialize
|-- KafkaDeserializationSchema#deserialize
|-- KafkaFetcher#partitionConsumerRecordsHandler // 到这里曾经很分明了,这里是由 FlinkKafkaConsumer new 进去的对象
3.3 缓冲池
还有一个比拟有意思的类是 LocalBufferPool
,封装了MemorySegment
。个别用于网络缓冲器(NetworkBuffer),NetworkBuffer
是网络替换数据的包装,当后果分区(ResultParition)开始写出数据的时候,须要向 LocalBufferPool 申请 Buffer 资源。
写入逻辑:
|-- Task#constructor // 结构工作
|-- NettyShuffleEnvironment#createResultPartitionWriters // 创立用于写入后果的后果分区
|-- ResultPartitionFactory#create
\-- ResultPartitionFactory#createBufferPoolFactory // 在这里创立了一个简略的 BufferPoolFactory
|-- PipelinedResultPartition#constructor
|-- BufferWritingResultPartition#constructor
|-- SortMergeResultPartition#constructor or BufferWritingResultPartition#constructor
|-- ResultPartition#constructor
\-- ResultPartition#steup // 注册缓冲池到这个后果分区中
另外,NetworkBuffer
实现了 Netty 的AbstractReferenceCountedByteBuf
。这意味着这里采纳了经典的援用计数算法,当 Buffer 不再被须要时,会被回收。
4. 其余
4.1 相干 Flink Jira
以下是我在写本文时参考过的 Jira 列表:
- Add an off-heap variant of the managed memory:https://issues.apache.org/jir…
- Separate type specific memory segments.:https://issues.apache.org/jir…
- Investigate potential out-of-memory problems due to managed unsafe memory allocation:https://issues.apache.org/jir…
- Adjust GC Cleaner for unsafe memory and Java 11:https://issues.apache.org/jir…
- FLIP-49 Unified Memory Configuration for TaskExecutors:https://issues.apache.org/jir…
- Don’t explicitly use HeapMemorySegment in raw format serde:https://issues.apache.org/jir…
- Refactor HybridMemorySegment:https://issues.apache.org/jir…
- use flink’s buffers in netty:https://issues.apache.org/jir…
- Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment:https://issues.apache.org/jir…