摘要: Flink是jvm之上的大数据处理引擎。

Flink是jvm之上的大数据处理引擎,jvm存在java对象存储密度低、full gc时耗费性能,gc存在stw的问题,同时omm时会影响稳定性。同时针对频繁序列化和反序列化问题flink应用堆内堆外内存能够间接在一些场景下操作二进制数据,缩小序列化反序列化的耗费。同时基于大数据流式解决的特点,flink定制了本人的一套序列化框架。flink也会基于cpu L1 L2 L3高速缓存的机制以及局部性原理,设计应用缓存敌对的数据结构。flink内存治理和spark的tungsten的内存治理的出发点很类似。

内存模型

Flink能够应用堆内和堆外内存,内存模型如图所示:

flink应用内存划分为堆内内存和堆外内存。依照用处能够划分为task所用内存,network memory、managed memory、以及framework所用内存,其中task network managed所用内存计入slot内存。framework为taskmanager专用。

堆内内存蕴含用户代码所用内存、heapstatebackend、框架执行所用内存。

堆外内存是未经jvm虚拟化的内存,间接映射到操作系统的内存地址,堆外内存蕴含框架执行所用内存,jvm堆外内存、Direct、native等。

Direct memory内存可用于网络传输缓冲。network memory属于direct memory的领域,flink能够借助于此进行zero copy,从而缩小内核态到用户态copy次数,从而进行更高效的io操作。

jvm metaspace寄存jvm加载的类的元数据,加载的类越多,须要的空间越大,overhead用于jvm的其余开销,如native memory、code cache、thread stack等。

Managed Memory次要用于RocksDBStateBackend和批处理算子,也属于native memory的领域,其中rocksdbstatebackend对应rocksdb,rocksdb基于lsm数据结构实现,每个state对应一个列族,占有独立的writebuffer,rocksdb占用native内存大小为 blockCahe + writebufferNum * writeBuffer + index ,同时堆外内存是过程之间共享的,jvm虚拟化大量heap内存耗时较久,应用堆外内存的话能够无效的防止该环节。但堆外内存也有肯定的弊病,即监控调试应用绝对简单,对于生命周期较短的segment应用堆内内存开销更低,flink在一些状况下,间接操作二进制数据,防止一些反序列化带来的开销。如果须要解决的数据超出了内存限度,则会将局部数据存储到硬盘上。

内存治理

相似于OS中的page机制,flink模仿了操作系统的机制,通过page来治理内存,flink对应page的数据结构为dataview和MemorySegment,memorysegment是flink内存调配的最小单位,默认32kb,其能够在堆上也能够在堆外,flink通过MemorySegment的数据结构来拜访堆内堆外内存,借助于flink序列化机制(序列化机制会在下一大节解说),memorysegment提供了对二进制数据的读取和写入的办法,flink应用datainputview和dataoutputview进行memorysegment的二进制的读取和写入,flink能够通过HeapMemorySegment 治理堆内内存,通过HybridMemorySegment来治理堆内和堆外内存,MemorySegment治理jvm堆内存时,其定义一个字节数组的援用指向内存端,基于该外部字节数组的援用进行操作的HeapMemorySegment。

public abstract class MemorySegment { /**

 * The heap byte array object relative to which we access the memory. *  如果为堆内存,则指向拜访的内存的援用,否则若内存为非堆内存,则为null * <p>Is non-<tt>null</tt> if the memory is on the heap, and is <tt>null</tt>, if the memory is * off the heap. If we have this buffer, we must never void this reference, or the memory * segment will point to undefined addresses outside the heap and may in out-of-order execution * cases cause segmentation faults. */protected final byte[] heapMemory; /** * The address to the data, relative to the heap memory byte array. If the heap memory byte * array is <tt>null</tt>, this becomes an absolute memory address outside the heap. * 字节数组对应的绝对地址 */protected long address;  

}

HeapMemorySegment用来调配堆上内存。

public final class HeapMemorySegment extends MemorySegment { /**

 * An extra reference to the heap memory, so we can let byte array checks fail by the built-in * checks automatically without extra checks. * 字节数组的援用指向该内存段 */private byte[] memory; public void free() {    super.free(); this.memory = null;} public final void get(DataOutput out, int offset, int length) throws IOException { out.write(this.memory, offset, length);}

}

HybridMemorySegment即反对onheap和offheap内存,flink通过jvm的unsafe操作,如果对象o不为null,为onheap的场景,并且前面的地址或者地位是绝对地位,那么会间接对以后对象(比方数组)的绝对地位进行操作。如果对象o为null,操作的内存块不是JVM堆内存,为off-heap的场景,并且前面的地址是某个内存块的相对地址,那么这些办法的调用也相当于对该内存块进行操作。

public final class HybridMemorySegment extends MemorySegment {
@Override public ByteBuffer wrap(int offset, int length) { if (address <= addressLimit) { if (heapMemory != null) { return ByteBuffer.wrap(heapMemory, offset, length);

        } else { try {                ByteBuffer wrapper = offHeapBuffer.duplicate();                wrapper.limit(offset + length);                wrapper.position(offset); return wrapper;            } catch (IllegalArgumentException e) { throw new IndexOutOfBoundsException();            }        }    } else { throw new IllegalStateException("segment has been freed");    }}

}

flink通过MemorySegmentFactory来创立memorySegment,memorySegment是flink内存调配的最小单位。对于跨memorysegment的数据方位,flink形象出一个拜访视图,数据读取datainputView,数据写入dataoutputview。

/**

  • This interface defines a view over some memory that can be used to sequentially read the contents of the memory.
  • The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. */ @Public public interface DataInputView extends DataInput { private MemorySegment[] memorySegments; // view持有的MemorySegment的援用, 该组memorysegment能够视为一个内存页,

flink能够程序读取memorysegmet中的数据 /**

 * Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}. * It returns the number of read bytes or -1 if there is no more data left. * @param b byte array to store the data to * @param off offset into byte array * @param len byte length to read * @return the number of actually read bytes of -1 if there is no more data left */int read(byte[] b, int off, int len) throws IOException;

}

dataoutputview是数据写入的视图,outputview持有多个memorysegment的援用,flink能够程序的写入segment。

/**

  • This interface defines a view over some memory that can be used to sequentially write contents to the memory.
  • The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. / @Public public interface DataOutputView extends DataOutput { private final List<MemorySegment> memory; // memorysegment的援用 /*

    * Copies {@code numBytes} bytes from the source to this view.* @param source The source to copy the bytes from.* @param numBytes The number of bytes to copy.

    void write(DataInputView source, int numBytes) throws IOException;

}

上一大节中讲到的managedmemory内存局部,flink应用memorymanager来治理该内存,managedmemory只应用堆外内存,次要用于批处理中的sorting、hashing、以及caching(社区音讯,将来流解决也会应用到该局部),在流计算中作为rocksdbstatebackend的局部内存。memeorymanager通过memorypool来治理memorysegment。

/**

  • The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends
  • (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain
  • size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
  • Any allocated memory has to be released to be reused later.
  • <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}).
  • Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately
  • releases the underlying memory. */

public class MemoryManager { /**

 * Allocates a set of memory segments from this memory manager. * <p>The total allocated memory will not exceed its size limit, announced in the constructor. * @param owner The owner to associate with the memory segment, for the fallback release. * @param target The list into which to put the allocated memory pages. * @param numberOfPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount *                                   of memory pages any more. */public void allocatePages(        Object owner,        Collection<MemorySegment> target, int numberOfPages) throws MemoryAllocationException {

} private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {

    segment.free(); if (segments != null) {        segments.remove(segment);    }} /** * Frees this memory segment. * <p>After this operation has been called, no further operations are possible on the memory * segment and will fail. The actual memory (heap or off-heap) will only be released after this * memory segment object has become garbage collected. */public void free() { // this ensures we can place no more data and trigger // the checks for the freed segment    address = addressLimit + 1;}

}

对于上一大节中提到的NetWorkMemory的内存,flink应用networkbuffer做了一层buffer封装。buffer的底层也是memorysegment,flink通过bufferpool来治理buffer,每个taskmanager都有一个netwokbufferpool,该tm上的各个task共享该networkbufferpool,同时task对应的localbufferpool所需的内存须要从networkbufferpool申请而来,它们都是flink申请的堆外内存。

上游算子向resultpartition写入数据时,申请buffer资源,应用bufferbuilder将数据写入memorysegment,上游算子从resultsubpartition生产数据时,利用bufferconsumer从memorysegment中读取数据,bufferbuilder与bufferconsumer一一对应。同时这一流程也和flink的反压机制相干。如图

/**

  • A buffer pool used to manage a number of {@link Buffer} instances from the
  • {@link NetworkBufferPool}.
  • <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
  • free operation of the network stack by limiting the number of buffers per
  • local buffer pool. It also implements the default mechanism for buffer
  • recycling, which ensures that every buffer is ultimately returned to the
  • network buffer pool.
  • <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  • will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
  • match its new size. */

class LocalBufferPool implements BufferPool {
@Nullable private MemorySegment requestMemorySegment(int targetChannel) throws IOException {

    MemorySegment segment = null;    synchronized (availableMemorySegments) {        returnExcessMemorySegments(); if (availableMemorySegments.isEmpty()) {            segment = requestMemorySegmentFromGlobal();        } // segment may have been released by buffer pool owner        if (segment == null) {            segment = availableMemorySegments.poll();        } if (segment == null) {            availabilityHelper.resetUnavailable();        } if (segment != null && targetChannel != UNKNOWN_CHANNEL) { if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {                unavailableSubpartitionsCount++;                availabilityHelper.resetUnavailable();            }        }    } return segment;}} /**
  • A result partition for data produced by a single task.

*

  • <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
  • a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
  • or more {@link ResultSubpartition} instances, which further partition the data depending on the
  • number of consuming tasks and the data {@link DistributionPattern}.
  • <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
  • happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
    The life-cycle of each result partition has three (possibly overlapping) phases:
    Produce Consume Release Buffer management State management */

public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {

  @Override public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {    checkInProduceState(); return bufferPool.requestBufferBuilderBlocking(targetChannel);}}

}

自定义序列化框架

flink对本身反对的根本数据类型,实现了定制的序列化机制,flink数据集对象绝对固定,能够只保留一份schema信息,从而节俭存储空间,数据序列化就是java对象和二进制数据之间的数据转换,flink应用TypeInformation的createSerializer接口负责创立每种类型的序列化器,进行数据的序列化反序列化,类型信息在构建streamtransformation时通过typeextractor依据办法签名类信息等提取类型信息并存储在streamconfig中。

/**

 * Creates a serializer for the type. The serializer may use the ExecutionConfig * for parameterization. * 创立出对应类型的序列化器 * @param config The config used to parameterize the serializer. * @return A serializer for this type. */ @PublicEvolving public abstract TypeSerializer<T> createSerializer(ExecutionConfig config); /**
  • A utility for reflection analysis on classes, to determine the return type of implementations of transformation
  • functions. / @Public public class TypeExtractor { /*
  • Creates a {@link TypeInformation} from the given parameters.

    * If the given {@code instance} implements {@link ResultTypeQueryable}, its information* is used to determine the type information. Otherwise, the type information is derived* based on the given class information.* @param instance            instance to determine type information for* @param baseClass            base class of {@code instance}* @param clazz                class of {@code instance}* @param returnParamPos    index of the return type in the type arguments of {@code clazz}* @param <OUT>                output type* @return type information */ @SuppressWarnings("unchecked")

    @PublicEvolving public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz, int returnParamPos) { if (instance instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) instance).getProducedType();

       } else { return createTypeInfo(baseClass, clazz, returnParamPos, null, null);   }

    }

}

对于嵌套的数据类型,flink从最内层的字段开始序列化,内层序列化的后果将组成外层序列化后果,反序列时,从内存中程序读取二进制数据,依据偏移量反序列化为java对象。flink自带序列化机制存储密度很高,序列化对应的类型值即可。

flink中的table模块在memorysegment的根底上应用了BinaryRow的数据结构,能够更好地缩小反序列化开销,须要反序列化是能够只序列化相应的字段,而无需序列化整个对象。

同时你也能够注册子类型和自定义序列化器,对于flink无奈序列化的类型,会交给kryo进行解决,如果kryo也无奈解决,将强制应用avro来序列化,kryo序列化性能绝对flink自带序列化机制较低,开发时能够应用env.getConfig().disableGenericTypes()来禁用kryo,尽量应用flink框架自带的序列化器对应的数据类型。

缓存敌对的数据结构

cpu中L1、L2、L3的缓存读取速度比从内存中读取数据快很多,高速缓存的访问速度是主存的访问速度的很多倍。另外一个重要的程序个性是局部性原理,程序经常应用它们最近应用的数据和指令,其中两种局部性类型,工夫局部性指最近拜访的内容很可能短期内被再次拜访,空间局部性是指地址互相邻近的我的项目很可能短时间内被再次拜访。

联合这两个个性设计缓存敌对的数据结构能够无效的晋升缓存命中率和本地化个性,该个性次要用于排序操作中,惯例状况下一个指针指向一个<key,v>对象,排序时须要依据指针pointer获取到理论数据,而后再进行比拟,这个环节波及到内存的随机拜访,缓存本地化会很低,应用序列化的定长key + pointer,这样key就会间断存储到内存中,防止的内存的随机拜访,还能够晋升cpu缓存命中率。对两条记录进行排序时首先比拟key,如果大小不同间接返回后果,只需替换指针即可,不必替换理论数据,如果雷同,则比拟指针理论指向的数据。

后记

flink社区已走向流批一体的倒退,后继将更多的关注与流批一体的引擎实现及联合存储层面的实现。flink服务请应用华为云 EI DLI-FLINK serverless服务。

参考

 [1]: https://ci.apache.org/project...

 [2]: https://github.com/apache/flink

 [3]: https://ververica.cn/

点击关注,第一工夫理解华为云陈腐技术~