序
本文主要研究一下 flink 的 MemoryBackendCheckpointStorage
CheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java
/**
* CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
* An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
* created by this class.
*/
public interface CheckpointStorage {
boolean supportsHighlyAvailableStorage();
boolean hasDefaultSavepointLocation();
CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;
CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;
CheckpointStorageLocation initializeLocationForSavepoint(
long checkpointId,
@Nullable String externalLocationPointer) throws IOException;
CheckpointStreamFactory resolveCheckpointStorageLocation(
long checkpointId,
CheckpointStorageLocationReference reference) throws IOException;
CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
CheckpointStorage 接口主要定义了持久化 checkpoint data 及 metadata streams 的基本方法;supportsHighlyAvailableStorage 方法返回该 backend 是否支持 highly available storage;hasDefaultSavepointLocation 方法是否有默认的 savepoint location;resolveCheckpoint 方法用于解析 checkpoint location 返回 CompletedCheckpointStorageLocation;initializeLocationForCheckpoint 方法根据 checkpointId 来初始化 storage location;initializeLocationForSavepoint 方法用于根据 checkpointId 来初始化 savepoint 的 storage location;resolveCheckpointStorageLocation 方法解析 CheckpointStorageLocationReference 返回 CheckpointStreamFactory;createTaskOwnedStateStream 方法用于打开一个 stream 来持久化 checkpoint state
AbstractFsCheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
/**
* An implementation of durable checkpoint storage to file systems.
*/
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
// ————————————————————————
// Constants
// ————————————————————————
/** The prefix of the directory containing the data exclusive to a checkpoint. */
public static final String CHECKPOINT_DIR_PREFIX = “chk-“;
/** The name of the directory for shared checkpoint state. */
public static final String CHECKPOINT_SHARED_STATE_DIR = “shared”;
/** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = “taskowned”;
/** The name of the metadata files in checkpoints / savepoints. */
public static final String METADATA_FILE_NAME = “_metadata”;
/** The magic number that is put in front of any reference. */
private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] {0x05, 0x5F, 0x3F, 0x18};
// ————————————————————————
// Fields and properties
// ————————————————————————
/** The jobId, written into the generated savepoint directories. */
private final JobID jobId;
/** The default location for savepoints. Null, if none is configured. */
@Nullable
private final Path defaultSavepointDirectory;
@Override
public boolean hasDefaultSavepointLocation() {
return defaultSavepointDirectory != null;
}
@Override
public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
return resolveCheckpointPointer(checkpointPointer);
}
/**
* Creates a file system based storage location for a savepoint.
*
* <p>This methods implements the logic that decides which location to use (given optional
* parameters for a configured location and a location passed for this specific savepoint)
* and how to name and initialize the savepoint directory.
*
* @param externalLocationPointer The target location pointer for the savepoint.
* Must be a valid URI. Null, if not supplied.
* @param checkpointId The checkpoint ID of the savepoint.
*
* @return The checkpoint storage location for the savepoint.
*
* @throws IOException Thrown if the target directory could not be created.
*/
@Override
public CheckpointStorageLocation initializeLocationForSavepoint(
@SuppressWarnings(“unused”) long checkpointId,
@Nullable String externalLocationPointer) throws IOException {
// determine where to write the savepoint to
final Path savepointBasePath;
if (externalLocationPointer != null) {
savepointBasePath = new Path(externalLocationPointer);
}
else if (defaultSavepointDirectory != null) {
savepointBasePath = defaultSavepointDirectory;
}
else {
throw new IllegalArgumentException(“No savepoint location given and no default location configured.”);
}
// generate the savepoint directory
final FileSystem fs = savepointBasePath.getFileSystem();
final String prefix = “savepoint-” + jobId.toString().substring(0, 6) + ‘-‘;
Exception latestException = null;
for (int attempt = 0; attempt < 10; attempt++) {
final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));
try {
if (fs.mkdirs(path)) {
// we make the path qualified, to make it independent of default schemes and authorities
final Path qp = path.makeQualified(fs);
return createSavepointLocation(fs, qp);
}
} catch (Exception e) {
latestException = e;
}
}
throw new IOException(“Failed to create savepoint directory at ” + savepointBasePath, latestException);
}
protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;
//……
}
AbstractFsCheckpointStorage 主要是实现了 CheckpointStorage 接口的 hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint 方法
resolveCheckpoint 方法主要做两件事情,一个是解析 checkpoint/savepoint path,一个是解析 checkpoint/savepoint 的 metadata path,获取他们的 FileStatus,然后创建 FsCompletedCheckpointStorageLocation
initializeLocationForSavepoint 方法主要是给 savepoint 创建一个 CheckpointStorageLocation,它可以根据 externalLocationPointer 来创建,该值为 null 的话则使用 defaultSavepointDirectory,该方法里头调用了 createSavepointLocation 抽象方法,由子类去实现
MemoryBackendCheckpointStorage
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
/**
* An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
* Depending on whether this is created with a checkpoint location, the setup supports
* durable checkpoints (durable metadata) or not.
*/
public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {
/** The target directory for checkpoints (here metadata files only). Null, if not configured. */
@Nullable
private final Path checkpointsDirectory;
/** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
@Nullable
private final FileSystem fileSystem;
/** The maximum size of state stored in a state handle. */
private final int maxStateSize;
/**
* Creates a new MemoryBackendCheckpointStorage.
*
* @param jobId The ID of the job writing the checkpoints.
* @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
* in which case this storage does not support durable persistence.
* @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
* @param maxStateSize The maximum size of each individual piece of state.
*
* @throws IOException Thrown if a checkpoint base directory is given configured and the
* checkpoint directory cannot be created within that directory.
*/
public MemoryBackendCheckpointStorage(
JobID jobId,
@Nullable Path checkpointsBaseDirectory,
@Nullable Path defaultSavepointLocation,
int maxStateSize) throws IOException {
super(jobId, defaultSavepointLocation);
checkArgument(maxStateSize > 0);
this.maxStateSize = maxStateSize;
if (checkpointsBaseDirectory == null) {
checkpointsDirectory = null;
fileSystem = null;
}
else {
this.fileSystem = checkpointsBaseDirectory.getFileSystem();
this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);
fileSystem.mkdirs(checkpointsDirectory);
}
}
// ————————————————————————
// Properties
// ————————————————————————
/**
* Gets the size (in bytes) that a individual chunk of state may have at most.
*/
public int getMaxStateSize() {
return maxStateSize;
}
// ————————————————————————
// Checkpoint Storage
// ————————————————————————
@Override
public boolean supportsHighlyAvailableStorage() {
return checkpointsDirectory != null;
}
@Override
public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
checkArgument(checkpointId >= 0);
if (checkpointsDirectory != null) {
// configured for durable metadata
// prepare all the paths needed for the checkpoints
checkState(fileSystem != null);
final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
// create the checkpoint exclusive directory
fileSystem.mkdirs(checkpointDir);
return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize);
}
else {
// no durable metadata – typical in IDE or test setup case
return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize);
}
}
@Override
public CheckpointStreamFactory resolveCheckpointStorageLocation(
long checkpointId,
CheckpointStorageLocationReference reference) throws IOException {
// no matter where the checkpoint goes, we always return the storage location that stores
// state inline with the state handles.
return new MemCheckpointStreamFactory(maxStateSize);
}
@Override
public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
return new MemoryCheckpointOutputStream(maxStateSize);
}
@Override
protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
}
// ————————————————————————
// Utilities
// ————————————————————————
@Override
public String toString() {
return “MemoryBackendCheckpointStorage {” +
“checkpointsDirectory=” + checkpointsDirectory +
“, fileSystem=” + fileSystem +
“, maxStateSize=” + maxStateSize +
‘}’;
}
}
MemoryBackendCheckpointStorage 继承了 AbstractFsCheckpointStorage,实现了它定义的 createSavepointLocation 方法,这里返回的是 PersistentMetadataCheckpointStorageLocation
MemoryBackendCheckpointStorage 还实现了 CheckpointStorage 接口定义的 AbstractFsCheckpointStorage 未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
supportsHighlyAvailableStorage 是根据是否有配置 checkpointsDirectory 来判断;initializeLocationForCheckpoint 这个根据 checkpointsDirectory 是否有设置来创建,为 null 的话,创建的是 NonPersistentMetadataCheckpointStorageLocation,不为 null 创建的是 PersistentMetadataCheckpointStorageLocation;resolveCheckpointStorageLocation 这里创建的是 MemCheckpointStreamFactory;而 createTaskOwnedStateStream 创建的是 MemoryCheckpointOutputStream
小结
CheckpointStorage 接口主要定义了持久化 checkpoint data 及 metadata streams 的基本方法;AbstractFsCheckpointStorage 主要是实现了 CheckpointStorage 接口的 hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint 方法,同时定义了一个抽象方法 createSavepointLocation
MemoryBackendCheckpointStorage 继承了 AbstractFsCheckpointStorage,实现了它定义的 createSavepointLocation 方法,同时还实现了 CheckpointStorage 接口定义的 AbstractFsCheckpointStorage 未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
这里可以看到 MemoryBackendCheckpointStorage 虽然是 memory 的,但是如果有配置 checkpointsDirectory(highly available storage),checkpoint location 使用的是 PersistentMetadataCheckpointStorageLocation,否则使用 NonPersistentMetadataCheckpointStorageLocation;而 savepoint location 使用的是 PersistentMetadataCheckpointStorageLocation(checkpiont 可以选择是否使用文件存储,而 metadata 只能使用文件存储)
doc
The MemoryStateBackend