聊聊flink的MemCheckpointStreamFactory

30次阅读

共计 12009 个字符,预计需要花费 31 分钟才能阅读完成。


本文主要研究一下 flink 的 MemCheckpointStreamFactory
CheckpointStreamFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.java
/**
* A factory for checkpoint output streams, which are used to persist data for checkpoints.
*
* <p>Stream factories can be created from the {@link CheckpointStorage} through
* {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
*/
public interface CheckpointStreamFactory {

CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException;

abstract class CheckpointStateOutputStream extends FSDataOutputStream {

@Nullable
public abstract StreamStateHandle closeAndGetHandle() throws IOException;

@Override
public abstract void close() throws IOException;
}
}

CheckpointStreamFactory 为 checkpoint output streams(用于持久化 checkpoint 的数据) 的工厂,它定义了 createCheckpointStateOutputStream 方法,这里返回的是 CheckpointStateOutputStream;CheckpointStateOutputStream 继承了 FSDataOutputStream,它定义了 closeAndGetHandle 及 close 两个抽象方法
CheckpointStreamFactory 有两个以 factory 命名的实现类,分别是 MemCheckpointStreamFactory(它有两个子类分别为 NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation)、FsCheckpointStreamFactory(它有一个子类为 FsCheckpointStorageLocation)
CheckpointStorageLocation 接口继承了 CheckpointStreamFactory 接口,它有三个实现类,分别是 NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocation

FSDataOutputStream
flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.java
@Public
public abstract class FSDataOutputStream extends OutputStream {

public abstract long getPos() throws IOException;

public abstract void flush() throws IOException;

public abstract void sync() throws IOException;

public abstract void close() throws IOException;
}
FSDataOutputStream 继承了 java 的 OutputStream,它多定义了 getPos、flush、sync、close 几个抽象方法
CheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.java
/**
* A storage location for one particular checkpoint, offering data persistent, metadata persistence,
* and lifecycle/cleanup methods.
*
* <p>CheckpointStorageLocations are typically created and initialized via
* {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
* {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
*/
public interface CheckpointStorageLocation extends CheckpointStreamFactory {

CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException;

void disposeOnFailure() throws IOException;

CheckpointStorageLocationReference getLocationReference();
}
CheckpointStorageLocation 继承了 CheckpointStreamFactory 接口,它通常是由 CheckpointStorage 来创建及初始化,提供数据持久化、metadata 存储及 lifecycle/cleanup 相关方法;这里定义了 createMetadataOutputStream 方法用来创建 CheckpointMetadataOutputStream;disposeOnFailure 方法用于在 checkpoint 失败的时候 dispose checkpoint location;getLocationReference 用于返回 CheckpointStorageLocationReference
MemCheckpointStreamFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
/**
* {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays.
*/
public class MemCheckpointStreamFactory implements CheckpointStreamFactory {

/** The maximal size that the snapshotted memory state may have */
private final int maxStateSize;

/**
* Creates a new in-memory stream factory that accepts states whose serialized forms are
* up to the given number of bytes.
*
* @param maxStateSize The maximal size of the serialized state
*/
public MemCheckpointStreamFactory(int maxStateSize) {
this.maxStateSize = maxStateSize;
}

@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope) throws IOException
{
return new MemoryCheckpointOutputStream(maxStateSize);
}

@Override
public String toString() {
return “In-Memory Stream Factory”;
}

static void checkSize(int size, int maxSize) throws IOException {
if (size > maxSize) {
throw new IOException(
“Size of the state is larger than the maximum permitted memory-backed state. Size=”
+ size + ” , maxSize=” + maxSize
+ ” . Consider using a different state backend, like the File System State backend.”);
}
}

/**
* A {@code CheckpointStateOutputStream} that writes into a byte array.
*/
public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {

private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();

private final int maxSize;

private AtomicBoolean closed;

boolean isEmpty = true;

public MemoryCheckpointOutputStream(int maxSize) {
this.maxSize = maxSize;
this.closed = new AtomicBoolean(false);
}

@Override
public void write(int b) throws IOException {
os.write(b);
isEmpty = false;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
os.write(b, off, len);
isEmpty = false;
}

@Override
public void flush() throws IOException {
os.flush();
}

@Override
public void sync() throws IOException {}

// ——————————————————————–

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
closeInternal();
}
}

@Nullable
@Override
public StreamStateHandle closeAndGetHandle() throws IOException {
if (isEmpty) {
return null;
}
return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes());
}

@Override
public long getPos() throws IOException {
return os.getPosition();
}

public boolean isClosed() {
return closed.get();
}

/**
* Closes the stream and returns the byte array containing the stream’s data.
* @return The byte array containing the stream’s data.
* @throws IOException Thrown if the size of the data exceeds the maximal
*/
public byte[] closeAndGetBytes() throws IOException {
if (closed.compareAndSet(false, true)) {
checkSize(os.size(), maxSize);
byte[] bytes = os.toByteArray();
closeInternal();
return bytes;
} else {
throw new IOException(“stream has already been closed”);
}
}

private void closeInternal() {
os.reset();
}
}
}

MemCheckpointStreamFactory 实现了 CheckpointStreamFactory 接口,这里 createCheckpointStateOutputStream 方法返回 MemoryCheckpointOutputStream
MemoryCheckpointOutputStream 继承了 CheckpointStateOutputStream,里头使用了 ByteArrayOutputStreamWithPos,它在 closeAndGetHandle 的时候会校验大小是否超过 maxSize 的限制,超出则抛出 IOException 异常
MemCheckpointStreamFactory 有两个子类分别为 NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation,它们都实现了 CheckpointStorageLocation 接口

NonPersistentMetadataCheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
/**
* A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence
* for metadata has been configured.
*/
public class NonPersistentMetadataCheckpointStorageLocation
extends MemCheckpointStreamFactory
implements CheckpointStorageLocation {

/** The external pointer returned for checkpoints that are not externally addressable. */
public static final String EXTERNAL_POINTER = “<checkpoint-not-externally-addressable>”;

public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) {
super(maxStateSize);
}

@Override
public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
return new MetadataOutputStream();
}

@Override
public void disposeOnFailure() {}

@Override
public CheckpointStorageLocationReference getLocationReference() {
return CheckpointStorageLocationReference.getDefault();
}

// ————————————————————————
// CompletedCheckpointStorageLocation
// ————————————————————————

/**
* A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the
* metadata in an internal byte array.
*/
private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation {

private static final long serialVersionUID = 1L;

private final ByteStreamStateHandle metaDataHandle;

NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) {
this.metaDataHandle = metaDataHandle;
}

@Override
public String getExternalPointer() {
return EXTERNAL_POINTER;
}

@Override
public StreamStateHandle getMetadataHandle() {
return metaDataHandle;
}

@Override
public void disposeStorageLocation() {}
}

// ————————————————————————
// CheckpointMetadataOutputStream
// ————————————————————————

private static class MetadataOutputStream extends CheckpointMetadataOutputStream {

private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();

private boolean closed;

@Override
public void write(int b) throws IOException {
os.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
os.write(b, off, len);
}

@Override
public void flush() throws IOException {
os.flush();
}

@Override
public long getPos() throws IOException {
return os.getPosition();
}

@Override
public void sync() throws IOException {}

@Override
public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException {
synchronized (this) {
if (!closed) {
closed = true;

byte[] bytes = os.toByteArray();
ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes);
return new NonPersistentCompletedCheckpointStorageLocation(handle);
} else {
throw new IOException(“Already closed”);
}
}
}

@Override
public void close() {
if (!closed) {
closed = true;
os.reset();
}
}
}
}

MemoryBackendCheckpointStorage 在没有配置 checkpointsDirectory 的时候创建的是 NonPersistentMetadataCheckpointStorageLocation;其 createMetadataOutputStream 方法创建的是 MetadataOutputStream
MetadataOutputStream 继承了 CheckpointMetadataOutputStream,里头使用的是 ByteArrayOutputStreamWithPos,而 closeAndFinalizeCheckpoint 返回的是 NonPersistentCompletedCheckpointStorageLocation
NonPersistentCompletedCheckpointStorageLocation 实现了 CompletedCheckpointStorageLocation 接口,其 getMetadataHandle 方法返回的是 ByteStreamStateHandle

PersistentMetadataCheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
/**
* A checkpoint storage location for the {@link MemoryStateBackend} when it durably
* persists the metadata in a file system.
*/
public class PersistentMetadataCheckpointStorageLocation
extends MemCheckpointStreamFactory
implements CheckpointStorageLocation {

private final FileSystem fileSystem;

private final Path checkpointDirectory;

private final Path metadataFilePath;

/**
* Creates a checkpoint storage persists metadata to a file system and stores state
* in line in state handles with the metadata.
*
* @param fileSystem The file system to which the metadata will be written.
* @param checkpointDir The directory where the checkpoint metadata will be written.
*/
public PersistentMetadataCheckpointStorageLocation(
FileSystem fileSystem,
Path checkpointDir,
int maxStateSize) {

super(maxStateSize);

this.fileSystem = checkNotNull(fileSystem);
this.checkpointDirectory = checkNotNull(checkpointDir);
this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
}

// ————————————————————————

@Override
public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory);
}

@Override
public void disposeOnFailure() throws IOException {
// on a failure, no chunk in the checkpoint directory needs to be saved, so
// we can drop it as a whole
fileSystem.delete(checkpointDirectory, true);
}

@Override
public CheckpointStorageLocationReference getLocationReference() {
return CheckpointStorageLocationReference.getDefault();
}
}
MemoryBackendCheckpointStorage 在配置了 checkpointsDirectory 的时候创建的是 PersistentMetadataCheckpointStorageLocation;其 createMetadataOutputStream 方法创建的是 FsCheckpointMetadataOutputStream;FsCheckpointMetadataOutputStream 的构造器接收三个参数,分别是 fileSystem、metadataFilePath、exclusiveCheckpointDir;其中 fileSystem 用于根据 metadataFilePath 来创建 FSDataOutputStream,而 exclusiveCheckpointDir 则在返回 FsCompletedCheckpointStorageLocation 的时候用到
小结

MemoryBackendCheckpointStorage 在没有配置 checkpointsDirectory 的时候创建的是 NonPersistentMetadataCheckpointStorageLocation;在配置了 checkpointsDirectory 的时候创建的是 PersistentMetadataCheckpointStorageLocation
NonPersistentMetadataCheckpointStorageLocation 及 PersistentMetadataCheckpointStorageLocation 都继承了 MemCheckpointStreamFactory 类,同时实现了 CheckpointStorageLocation 接口 (其 createMetadataOutputStream 方法返回的 CheckpointMetadataOutputStream 类型分别为 MetadataOutputStream、FsCheckpointMetadataOutputStream)
MemCheckpointStreamFactory 实现了 CheckpointStreamFactory 接口,它的 createCheckpointStateOutputStream 方法返回 MemoryCheckpointOutputStream;CheckpointStorageLocation 继承了 CheckpointStreamFactory 接口,它通常是由 CheckpointStorage 来创建及初始化,提供数据持久化、metadata 存储及 lifecycle/cleanup 相关方法

doc
The MemoryStateBackend

正文完
 0