共计 9157 个字符,预计需要花费 23 分钟才能阅读完成。
本文转载自:https://ververica.cn/develope…
作者:伍翀(云邪)
现在,大数据畛域的开源框架(Hadoop,Spark,Storm)都应用的 JVM,当然也包含 Flink。基于 JVM 的数据分析引擎都须要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题:
- Java 对象存储密度低。一个只蕴含 boolean 属性的对象占用了 16 个字节内存:对象头占了 8 个,boolean 属性占了 1 个,对齐填充占了 7 个。而实际上只须要一个 bit(1/ 8 字节)就够了。
- Full GC 会极大地影响性能,尤其是为了解决更大数据而开了很大内存空间的 JVM 来说,GC 会达到秒级甚至分钟级。
- OOM 问题影响稳定性。OutOfMemoryError 是分布式计算框架常常会遇到的问题,当 JVM 中所有对象大小超过调配给 JVM 的内存大小时,就会产生 OutOfMemoryError 谬误,导致 JVM 解体,分布式框架的健壮性和性能都会受到影响。
所以目前,越来越多的大数据我的项目开始本人治理 JVM 内存了,像 Spark、Flink、HBase,为的就是取得像 C 一样的性能以及防止 OOM 的产生。本文将会探讨 Flink 是如何解决下面的问题的,次要内容包含内存治理、定制的序列化工具、缓存敌对的数据结构和算法、堆外内存、JIT 编译优化等。
踊跃的内存治理
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预调配的内存块上,这个内存块叫做 **MemorySegment**
,它代表了一段固定长度的内存(默认大小为 32KB),也是 Flink 中最小的内存调配单元,并且提供了十分高效的读写办法。你能够把 MemorySegment 设想成是为 Flink 定制的 **java.nio.ByteBuffer**
。它的底层能够是一个一般的 Java 字节数组(**byte[]**
),也能够是一个申请在堆外的 **ByteBuffer**
。每条记录都会以序列化的模式存储在一个或多个 **MemorySegment**
中。
Flink 中的 Worker 名叫 TaskManager,是用来运行用户代码的 JVM 过程。TaskManager 的堆内存次要被分成了三个局部:
-
Network Buffers: 肯定数量的 32KB 大小的 buffer,次要用于数据的网络传输。在 TaskManager 启动的时候就会调配。默认数量是 2048 个,能够通过
**taskmanager.network.numberOfBuffers**
来配置。(浏览 这篇文章 理解更多 Network Buffer 的治理) -
Memory Manager Pool: 这是一个由
MemoryManager
治理的,由泛滥MemorySegment
组成的超大汇合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,应用完后开释回内存池。默认状况下,池子占了堆内存的 70% 的大小。 - Remaining (Free) Heap: 这部分的内存是留给用户代码以及 TaskManager 的数据结构应用的。因为这些数据结构个别都很小,所以基本上这些内存都是给用户代码应用的。从 GC 的角度来看,能够把这里看成的新生代,也就是说这里次要都是由用户代码生成的短期对象。
留神:Memory Manager Pool 次要在 Batch 模式下应用。在 Steaming 模式下,该池子不会预分配内存,也不会向该池子申请内存块。也就是说该局部的内存都是能够给用户代码应用的。不过社区是打算在 Streaming 模式下也能将该池子利用起来。
Flink 采纳相似 DBMS 的 sort 和 join 算法,间接操作二进制数据,从而使序列化 / 反序列化带来的开销达到最小。所以 Flink 的外部实现更像 C/C++ 而非 Java。如果须要解决的数据超出了内存限度,则会将局部数据存储到硬盘上。如果要操作多块 MemorySegment 就像操作一块大的间断内存一样,Flink 会应用逻辑视图(**AbstractPagedInputView**
)来不便操作。下图形容了 Flink 如何存储序列化后的数据到内存块中,以及在须要的时候如何将数据存储到磁盘上。
从下面咱们可能得出 Flink 踊跃的内存治理以及间接操作二进制数据有以下几点益处:
-
缩小 GC 压力。不言而喻,因为所有常驻型数据都以二进制的模式存在 Flink 的
MemoryManager
中,这些MemorySegment
始终呆在老年代而不会被 GC 回收。其余的数据对象基本上是由用户代码生成的短生命周期对象,这部分对象能够被 Minor GC 疾速回收。只有用户不去创立大量相似缓存的常驻型对象,那么老年代的大小是不会变的,Major GC 也就永远不会产生。从而无效地升高了垃圾回收的压力。另外,这里的内存块还能够是堆外内存,这能够使得 JVM 内存更小,从而减速垃圾回收。 -
防止了 OOM。所有的运行时数据结构和算法只能通过内存池申请内存,保障了其应用的内存大小是固定的,不会因为运行时数据结构和算法而产生 OOM。在内存吃紧的状况下,算法(sort/join 等)会高效地将一大批内存块写到磁盘,之后再读回来。因而,
**OutOfMemoryErrors**
能够无效地被防止。 - 节俭内存空间。Java 对象在存储上有很多额定的耗费(如上一节所谈)。如果只存储理论数据的二进制内容,就能够防止这部分耗费。
- 高效的二进制操作 & 缓存敌对的计算。二进制数据以定义好的格局存储,能够高效地比拟与操作。另外,该二进制模式能够把相干的值,以及 hash 值,键值和指针等相邻地放进内存中。这使得数据结构能够对高速缓存更敌对,能够从 L1/L2/L3 缓存取得性能的晋升(下文会具体解释)。
为 Flink 量身定制的序列化框架
目前 Java 生态圈提供了泛滥的序列化框架:Java serialization, Kryo, Apache Avro 等等。然而 Flink 实现了本人的序列化框架。因为在 Flink 中解决的数据流通常是同一类型,因为数据集对象的类型固定,对于数据集能够只保留一份对象 Schema 信息,节俭大量的存储空间。同时,对于固定大小的类型,也可通过固定的偏移地位存取。当咱们须要拜访某个对象成员变量的时候,通过定制的序列化工具,并不需要反序列化整个 Java 对象,而是能够间接通过偏移量,只是反序列化特定的对象成员变量。如果对象的成员变量较多时,可能大大减少 Java 对象的创立开销,以及内存数据的拷贝大小。
Flink 反对任意的 Java 或是 Scala 类型。Flink 在数据类型上有很大的提高,不须要实现一个特定的接口(像 Hadoop 中的 **org.apache.hadoop.io.Writable**
),Flink 可能自动识别数据类型。Flink 通过 Java Reflection 框架剖析基于 Java 的 Flink 程序 UDF (User Define Function) 的返回类型的类型信息,通过 Scala Compiler 剖析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由 **TypeInformation**
类示意,TypeInformation 反对以下几种类型:
-
BasicTypeInfo
: 任意 Java 根本类型(装箱的)或 String 类型。 -
BasicArrayTypeInfo
: 任意 Java 根本类型数组(装箱的)或 String 数组。 -
**WritableTypeInfo**
: 任意 Hadoop Writable 接口的实现类。 -
TupleTypeInfo
: 任意的 Flink Tuple 类型(反对 Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的 Java Tuple 实现。 -
CaseClassTypeInfo
: 任意的 Scala CaseClass(包含 Scala tuples)。 -
PojoTypeInfo
: 任意的 POJO (Java or Scala),例如,Java 对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 办法。 -
GenericTypeInfo
: 任意无奈匹配之前几种类型的类。
前六种数据类型基本上能够满足绝大部分的 Flink 程序,针对前六种类型数据集,Flink 皆能够主动生成对应的 TypeSerializer,能十分高效地对数据集进行序列化和反序列化。对于最初一种数据类型,Flink 会应用 Kryo 进行序列化和反序列化。每个 TypeInformation 中,都蕴含了 serializer,类型会主动通过 serializer 进行序列化,而后用 Java Unsafe 接口写入 MemorySegments。对于能够用作 key 的数据类型,Flink 还同时主动生成 TypeComparator,用来辅助间接对序列化后的二进制数据进行 compare、hash 等操作。对于 Tuple、CaseClass、POJO 等组合类型,其 TypeSerializer 和 TypeComparator 也是组合的,序列化和比拟时会委托给对应的 serializers 和 comparators。如下图展现 一个内嵌型的 Tuple3<Integer,Double,Person> 对象的序列化过程。
能够看出这种序列化形式存储密度是相当紧凑的。其中 int 占 4 字节,double 占 8 字节,POJO 多个一个字节的 header,PojoSerializer 只负责将 header 序列化进去,并委托每个字段对应的 serializer 对字段进行序列化。
Flink 的类型零碎能够很轻松地扩大出自定义的 TypeInformation、Serializer 以及 Comparator,来晋升数据类型在序列化和比拟时的性能。
Flink 如何间接操作二进制数据
Flink 提供了如 group、sort、join 等操作,这些操作都须要拜访海量数据。这里,咱们以 sort 为例,这是一个在 Flink 中应用十分频繁的操作。
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,咱们把这批 MemorySegment 称作 sort buffer,用来寄存排序的数据。
咱们会把 sort buffer 分成两块区域。一个区域是用来寄存所有对象残缺的二进制数据。另一个区域用来寄存指向残缺二进制数据的指针以及定长的序列化后的 key(key+pointer)。如果须要序列化的 key 是个变长类型,如 String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有 key)会被加到第二个区域。
将理论的数据和指针加定长 key 离开寄存有两个目标。第一,替换定长块(key+pointer)更高效,不必替换实在的数据也不必挪动其余 key 和 pointer。第二,这样做是缓存敌对的,因为 key 都是间断存储在内存中的,能够大大减少 cache miss(前面会具体解释)。
排序的要害是比大小和替换。Flink 中,会先用 key 比大小,这样就能够间接用二进制的 key 比拟而不须要反序列化出整个对象。因为 key 是定长的,所以如果 key 雷同(或者没有提供二进制 key),那就必须将实在的二进制数据反序列化进去,而后再做比拟。之后,只须要替换 key+pointer 就能够达到排序的成果,实在的数据不必挪动。
最初,拜访排序后的数据,能够沿着排好序的 key+pointer 区域程序拜访,通过 pointer 找到对应的实在数据,并写到内存或内部(更多细节能够看这篇文章 Joins in Flink)。
缓存敌对的数据结构和算法
随着磁盘 IO 和网络 IO 越来越快,CPU 逐步成为了大数据畛域的瓶颈。从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。通过性能剖析能够发现,CPU 工夫中的很大一部分都是节约在期待数据从主内存过去上。如果这些数据能够从 L1/L2/L3 缓存过去,那么这些等待时间能够极大地升高,并且所有的算法会因而而受害。
在下面探讨中咱们谈到的,Flink 通过定制的序列化框架将算法中须要操作的数据(如 sort 中的 key)间断存储,而残缺数据存储在其余中央。因为对于残缺的数据来说,key+pointer 更容易装进缓存,这大大提高了缓存命中率,从而进步了根底算法的效率。这对于下层利用是齐全通明的,能够充沛享受缓存敌对带来的性能晋升。
走向堆外内存
Flink 基于堆内存的内存管理机制曾经能够解决很多 JVM 现存问题了,为什么还要引入堆外内存?
- 启动超大内存(上百 GB)的 JVM 须要很长时间,GC 停留时间也会很长(分钟级)。应用堆外内存的话,能够极大地减小堆内存(只须要调配 Remaining Heap 那一块),使得 TaskManager 扩大到上百 GB 内存不是问题。
- 高效的 IO 操作。堆外内存在写磁盘或网络传输时是 zero-copy,而堆内存的话,至多须要 copy 一次。
- 堆外内存是过程间共享的。也就是说,即便 JVM 过程解体也不会失落数据。这能够用来做故障复原(Flink 临时没有利用起这个,不过将来很可能会去做)。
然而弱小的货色总是会有其负面的一面,不然为何大家不都用堆外内存呢。
- 堆内存的应用、监控、调试都要简略很多。堆外内存意味着更简单更麻烦。
- Flink 有时须要调配短生命周期的
MemorySegment
,这个申请在堆上会更便宜。 - 有些操作在堆内存上会快一点点。
Flink 用通过 **ByteBuffer.allocateDirect(numBytes)**
来申请堆外内存,用 **sun.misc.Unsafe**
来操作堆外内存。
基于 Flink 优良的设计,实现堆外内存是很不便的。Flink 将原来的 **MemorySegment**
变成了抽象类,并生成了两个子类。**HeapMemorySegment**
和 **HybridMemorySegment**
。从字面意思上也很容易了解,前者是用来调配堆内存的,后者是用来调配堆外内存 和堆内存 的。是的,你没有看错,后者既能够调配堆外内存又能够调配堆内存。为什么要这样设计呢?
首先假如 **HybridMemorySegment**
只提供调配堆外内存。在上述堆外内存的有余中的第二点谈到,Flink 有时须要调配短生命周期的 buffer,这些 buffer 用 **HeapMemorySegment**
会更高效。那么当应用堆外内存时,为了也满足堆内存的需要,咱们须要同时加载两个子类。这就波及到了 JIT 编译优化的问题。因为以前 **MemorySegment**
是一个独自的 final 类,没有子类。JIT 编译时,所有要调用的办法都是确定的,所有的办法调用都能够被去虚化(de-virtualized)和内联(inlined),这能够极大地提高性能(MemroySegment 的应用相当频繁)。然而如果同时加载两个子类,那么 JIT 编译器就只能在真正运行到的时候才晓得是哪个子类,这样就无奈提前做优化。理论测试的性能差距在 2.7 被左右。
Flink 应用了两种计划:
计划 1:只能有一种 MemorySegment 实现被加载
代码中所有的短生命周期和长生命周期的 MemorySegment 都实例化其中一个子类,另一个子类基本没有实例化过(应用工厂模式来管制)。那么运行一段时间后,JIT 会意识到所有调用的办法都是确定的,而后会做优化。
计划 2:提供一种实现能同时解决堆内存和堆外内存
这就是 **HybridMemorySegment**
了,能同时解决堆与堆外内存,这样就不须要子类了。这里 Flink 优雅地实现了一份代码能同时操作堆和堆外内存。这次要归功于 **sun.misc.Unsafe**
提供的一系列办法,如 getLong 办法:
sun.misc.Unsafe.getLong(Object reference, long offset)
- 如果 reference 不为空,则会取该对象的地址,加上前面的 offset,从绝对地址处取出 8 字节并失去 long。这对应了堆内存的场景。
- 如果 reference 为空,则 offset 就是要操作的相对地址,从该地址处取出数据。这对应了堆外内存的场景。
这里咱们看下 **MemorySegment**
及其子类的实现。
public abstract class MemorySegment {
// 堆内存援用
protected final byte[] heapMemory;
// 堆外内存地址
protected long address;
// 堆内存的初始化
MemorySegment(byte[] buffer, Object owner) {
// 一些先验查看
...
this.heapMemory = buffer;
this.address = BYTE_ARRAY_BASE_OFFSET;
...
}
// 堆外内存的初始化
MemorySegment(long offHeapAddress, int size, Object owner) {
// 一些先验查看
...
this.heapMemory = null;
this.address = offHeapAddress;
...
}
public final long getLong(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit - 8) {
// 这是咱们关注的中央,应用 Unsafe 来操作 on-heap & off-heap
return UNSAFE.getLong(heapMemory, pos);
}
else if (address > addressLimit) {throw new IllegalStateException("segment has been freed");
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();}
}
...
}
public final class HeapMemorySegment extends MemorySegment {
// 指向 heapMemory 的额定援用,用来如数组越界的查看
private byte[] memory;
// 只能初始化堆内存
HeapMemorySegment(byte[] memory, Object owner) {super(Objects.requireNonNull(memory), owner);
this.memory = memory;
}
...
}
public final class HybridMemorySegment extends MemorySegment {
private final ByteBuffer offHeapBuffer;
// 堆外内存初始化
HybridMemorySegment(ByteBuffer buffer, Object owner) {super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}
// 堆内存初始化
HybridMemorySegment(byte[] buffer, Object owner) {super(buffer, owner);
this.offHeapBuffer = null;
}
...
}
能够发现,HybridMemorySegment 中的很多办法其实都下沉到了父类去实现。包含堆内堆外内存的初始化。**MemorySegment**
中的 **getXXX**
/**putXXX**
办法都是调用了 unsafe 办法,能够说 **MemorySegment**
曾经具备了些 Hybrid 的意思了。**HeapMemorySegment**
只调用了父类的 **MemorySegment(byte[] buffer, Object owner)**
办法,也就只能申请堆内存。另外,浏览代码你会发现,许多办法(大量的 getXXX/putXXX)都被标记成了 final,两个子类也是 final 类型,为的也是优化 JIT 编译器,会揭示 JIT 这个办法是能够被去虚化和内联的。
对于堆外内存,应用 **HybridMemorySegment**
能同时用来代表堆和堆外内存。这样只须要一个类就能代表长生命周期的堆外内存和短生命周期的堆内存。既然 **HybridMemorySegment**
曾经这么全能,为什么还要计划 1 呢?因为咱们须要工厂模式来保障只有一个子类被加载(为了更高的性能),而且 HeapMemorySegment 比 heap 模式的 HybridMemorySegment 要快。
下方是一些性能测试数据,更具体的数据请参考 这篇文章。
Segment | Time |
---|---|
HeapMemorySegment, exclusive | 1,441 msecs |
HeapMemorySegment, mixed | 3,841 msecs |
HybridMemorySegment, heap, exclusive | 1,626 msecs |
HybridMemorySegment, off-heap, exclusive | 1,628 msecs |
HybridMemorySegment, heap, mixed | 3,848 msecs |
HybridMemorySegment, off-heap, mixed | 3,847 msecs |
总结
本文次要总结了 Flink 面对 JVM 存在的问题,而在内存治理的路线上越走越深。从本人治理内存,到序列化框架,再到堆外内存。其实纵观大数据生态圈,其实会发现各个开源我的项目都有同样的趋势。比方最近炒的很炽热的 Spark Tungsten 我的项目,与 Flink 在内存治理上的思维是及其类似的。
参考资料
- Off-heap Memory in Apache Flink and the curious JIT compiler
- Juggling with Bits and Bytes
- Peeking into Apache Flink’s Engine Room
- Flink: Memory Management
- Big Data Performance Engineering
- sun.misc.misc.Unsafe usage for C style memory management
- sun.misc.misc.Unsafe usage for C style memory management – How to do it.
- Memory usage of Java objects: general guide
- 脱离 JVM?Hadoop 生态圈的挣扎与演变