序本文主要研究一下flink的MemCheckpointStreamFactoryCheckpointStreamFactoryflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.java/** * A factory for checkpoint output streams, which are used to persist data for checkpoints. * * <p>Stream factories can be created from the {@link CheckpointStorage} through * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}. /public interface CheckpointStreamFactory { CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; abstract class CheckpointStateOutputStream extends FSDataOutputStream { @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; @Override public abstract void close() throws IOException; }}CheckpointStreamFactory为checkpoint output streams(用于持久化checkpoint的数据)的工厂,它定义了createCheckpointStateOutputStream方法,这里返回的是CheckpointStateOutputStream;CheckpointStateOutputStream继承了FSDataOutputStream,它定义了closeAndGetHandle及close两个抽象方法CheckpointStreamFactory有两个以factory命名的实现类,分别是MemCheckpointStreamFactory(它有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation)、FsCheckpointStreamFactory(它有一个子类为FsCheckpointStorageLocation)CheckpointStorageLocation接口继承了CheckpointStreamFactory接口,它有三个实现类,分别是NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocationFSDataOutputStreamflink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.java@Publicpublic abstract class FSDataOutputStream extends OutputStream { public abstract long getPos() throws IOException; public abstract void flush() throws IOException; public abstract void sync() throws IOException; public abstract void close() throws IOException;}FSDataOutputStream继承了java的OutputStream,它多定义了getPos、flush、sync、close几个抽象方法CheckpointStorageLocationflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.java/* * A storage location for one particular checkpoint, offering data persistent, metadata persistence, * and lifecycle/cleanup methods. * * <p>CheckpointStorageLocations are typically created and initialized via * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}. /public interface CheckpointStorageLocation extends CheckpointStreamFactory { CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException; void disposeOnFailure() throws IOException; CheckpointStorageLocationReference getLocationReference();}CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法;这里定义了createMetadataOutputStream方法用来创建CheckpointMetadataOutputStream;disposeOnFailure方法用于在checkpoint失败的时候dispose checkpoint location;getLocationReference用于返回CheckpointStorageLocationReferenceMemCheckpointStreamFactoryflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java/* * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. /public class MemCheckpointStreamFactory implements CheckpointStreamFactory { /* The maximal size that the snapshotted memory state may have / private final int maxStateSize; /* * Creates a new in-memory stream factory that accepts states whose serialized forms are * up to the given number of bytes. * * @param maxStateSize The maximal size of the serialized state / public MemCheckpointStreamFactory(int maxStateSize) { this.maxStateSize = maxStateSize; } @Override public CheckpointStateOutputStream createCheckpointStateOutputStream( CheckpointedStateScope scope) throws IOException { return new MemoryCheckpointOutputStream(maxStateSize); } @Override public String toString() { return “In-Memory Stream Factory”; } static void checkSize(int size, int maxSize) throws IOException { if (size > maxSize) { throw new IOException( “Size of the state is larger than the maximum permitted memory-backed state. Size=” + size + " , maxSize=" + maxSize + " . Consider using a different state backend, like the File System State backend."); } } /* * A {@code CheckpointStateOutputStream} that writes into a byte array. / public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); private final int maxSize; private AtomicBoolean closed; boolean isEmpty = true; public MemoryCheckpointOutputStream(int maxSize) { this.maxSize = maxSize; this.closed = new AtomicBoolean(false); } @Override public void write(int b) throws IOException { os.write(b); isEmpty = false; } @Override public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); isEmpty = false; } @Override public void flush() throws IOException { os.flush(); } @Override public void sync() throws IOException { } // ——————————————————————– @Override public void close() { if (closed.compareAndSet(false, true)) { closeInternal(); } } @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { if (isEmpty) { return null; } return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes()); } @Override public long getPos() throws IOException { return os.getPosition(); } public boolean isClosed() { return closed.get(); } /* * Closes the stream and returns the byte array containing the stream’s data. * @return The byte array containing the stream’s data. * @throws IOException Thrown if the size of the data exceeds the maximal / public byte[] closeAndGetBytes() throws IOException { if (closed.compareAndSet(false, true)) { checkSize(os.size(), maxSize); byte[] bytes = os.toByteArray(); closeInternal(); return bytes; } else { throw new IOException(“stream has already been closed”); } } private void closeInternal() { os.reset(); } }}MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,这里createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStreamMemoryCheckpointOutputStream继承了CheckpointStateOutputStream,里头使用了ByteArrayOutputStreamWithPos,它在closeAndGetHandle的时候会校验大小是否超过maxSize的限制,超出则抛出IOException异常MemCheckpointStreamFactory有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation,它们都实现了CheckpointStorageLocation接口NonPersistentMetadataCheckpointStorageLocationflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java/* * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence * for metadata has been configured. /public class NonPersistentMetadataCheckpointStorageLocation extends MemCheckpointStreamFactory implements CheckpointStorageLocation { /* The external pointer returned for checkpoints that are not externally addressable. / public static final String EXTERNAL_POINTER = “<checkpoint-not-externally-addressable>”; public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) { super(maxStateSize); } @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new MetadataOutputStream(); } @Override public void disposeOnFailure() {} @Override public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); } // ———————————————————————— // CompletedCheckpointStorageLocation // ———————————————————————— /* * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the * metadata in an internal byte array. / private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation { private static final long serialVersionUID = 1L; private final ByteStreamStateHandle metaDataHandle; NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) { this.metaDataHandle = metaDataHandle; } @Override public String getExternalPointer() { return EXTERNAL_POINTER; } @Override public StreamStateHandle getMetadataHandle() { return metaDataHandle; } @Override public void disposeStorageLocation() {} } // ———————————————————————— // CheckpointMetadataOutputStream // ———————————————————————— private static class MetadataOutputStream extends CheckpointMetadataOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); private boolean closed; @Override public void write(int b) throws IOException { os.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); } @Override public void flush() throws IOException { os.flush(); } @Override public long getPos() throws IOException { return os.getPosition(); } @Override public void sync() throws IOException { } @Override public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { synchronized (this) { if (!closed) { closed = true; byte[] bytes = os.toByteArray(); ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes); return new NonPersistentCompletedCheckpointStorageLocation(handle); } else { throw new IOException(“Already closed”); } } } @Override public void close() { if (!closed) { closed = true; os.reset(); } } }}MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是MetadataOutputStreamMetadataOutputStream继承了CheckpointMetadataOutputStream,里头使用的是ByteArrayOutputStreamWithPos,而closeAndFinalizeCheckpoint返回的是NonPersistentCompletedCheckpointStorageLocationNonPersistentCompletedCheckpointStorageLocation实现了CompletedCheckpointStorageLocation接口,其getMetadataHandle方法返回的是ByteStreamStateHandlePersistentMetadataCheckpointStorageLocationflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java/* * A checkpoint storage location for the {@link MemoryStateBackend} when it durably * persists the metadata in a file system. /public class PersistentMetadataCheckpointStorageLocation extends MemCheckpointStreamFactory implements CheckpointStorageLocation { private final FileSystem fileSystem; private final Path checkpointDirectory; private final Path metadataFilePath; /* * Creates a checkpoint storage persists metadata to a file system and stores state * in line in state handles with the metadata. * * @param fileSystem The file system to which the metadata will be written. * @param checkpointDir The directory where the checkpoint metadata will be written. */ public PersistentMetadataCheckpointStorageLocation( FileSystem fileSystem, Path checkpointDir, int maxStateSize) { super(maxStateSize); this.fileSystem = checkNotNull(fileSystem); this.checkpointDirectory = checkNotNull(checkpointDir); this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME); } // ———————————————————————— @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory); } @Override public void disposeOnFailure() throws IOException { // on a failure, no chunk in the checkpoint directory needs to be saved, so // we can drop it as a whole fileSystem.delete(checkpointDirectory, true); } @Override public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); }}MemoryBackendCheckpointStorage在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是FsCheckpointMetadataOutputStream;FsCheckpointMetadataOutputStream的构造器接收三个参数,分别是fileSystem、metadataFilePath、exclusiveCheckpointDir;其中fileSystem用于根据metadataFilePath来创建FSDataOutputStream,而exclusiveCheckpointDir则在返回FsCompletedCheckpointStorageLocation的时候用到小结MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocationNonPersistentMetadataCheckpointStorageLocation及PersistentMetadataCheckpointStorageLocation都继承了MemCheckpointStreamFactory类,同时实现了CheckpointStorageLocation接口(其createMetadataOutputStream方法返回的CheckpointMetadataOutputStream类型分别为MetadataOutputStream、FsCheckpointMetadataOutputStream)MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,它的createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream;CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法docThe MemoryStateBackend