序本文主要研究一下flink的MemoryBackendCheckpointStorageCheckpointStorageflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java/** * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, * created by this class. /public interface CheckpointStorage { boolean supportsHighlyAvailableStorage(); boolean hasDefaultSavepointLocation(); CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException; CheckpointStorageLocation initializeLocationForSavepoint( long checkpointId, @Nullable String externalLocationPointer) throws IOException; CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException; CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;}CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回该backend是否支持highly available storage;hasDefaultSavepointLocation方法是否有默认的savepoint location;resolveCheckpoint方法用于解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根据checkpointId来初始化storage location;initializeLocationForSavepoint方法用于根据checkpointId来初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用于打开一个stream来持久化checkpoint stateAbstractFsCheckpointStorageflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java/* * An implementation of durable checkpoint storage to file systems. /public abstract class AbstractFsCheckpointStorage implements CheckpointStorage { // ———————————————————————— // Constants // ———————————————————————— /* The prefix of the directory containing the data exclusive to a checkpoint. / public static final String CHECKPOINT_DIR_PREFIX = “chk-”; /* The name of the directory for shared checkpoint state. / public static final String CHECKPOINT_SHARED_STATE_DIR = “shared”; /* The name of the directory for state not owned/released by the master, but by the TaskManagers. / public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = “taskowned”; /* The name of the metadata files in checkpoints / savepoints. / public static final String METADATA_FILE_NAME = “_metadata”; /* The magic number that is put in front of any reference. / private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 }; // ———————————————————————— // Fields and properties // ———————————————————————— /* The jobId, written into the generated savepoint directories. / private final JobID jobId; /* The default location for savepoints. Null, if none is configured. / @Nullable private final Path defaultSavepointDirectory; @Override public boolean hasDefaultSavepointLocation() { return defaultSavepointDirectory != null; } @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException { return resolveCheckpointPointer(checkpointPointer); } /* * Creates a file system based storage location for a savepoint. * * <p>This methods implements the logic that decides which location to use (given optional * parameters for a configured location and a location passed for this specific savepoint) * and how to name and initialize the savepoint directory. * * @param externalLocationPointer The target location pointer for the savepoint. * Must be a valid URI. Null, if not supplied. * @param checkpointId The checkpoint ID of the savepoint. * * @return The checkpoint storage location for the savepoint. * * @throws IOException Thrown if the target directory could not be created. / @Override public CheckpointStorageLocation initializeLocationForSavepoint( @SuppressWarnings(“unused”) long checkpointId, @Nullable String externalLocationPointer) throws IOException { // determine where to write the savepoint to final Path savepointBasePath; if (externalLocationPointer != null) { savepointBasePath = new Path(externalLocationPointer); } else if (defaultSavepointDirectory != null) { savepointBasePath = defaultSavepointDirectory; } else { throw new IllegalArgumentException(“No savepoint location given and no default location configured.”); } // generate the savepoint directory final FileSystem fs = savepointBasePath.getFileSystem(); final String prefix = “savepoint-” + jobId.toString().substring(0, 6) + ‘-’; Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix)); try { if (fs.mkdirs(path)) { // we make the path qualified, to make it independent of default schemes and authorities final Path qp = path.makeQualified(fs); return createSavepointLocation(fs, qp); } } catch (Exception e) { latestException = e; } } throw new IOException(“Failed to create savepoint directory at " + savepointBasePath, latestException); } protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException; //……}AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法resolveCheckpoint方法主要做两件事情,一个是解析checkpoint/savepoint path,一个是解析checkpoint/savepoint的metadata path,获取他们的FileStatus,然后创建FsCompletedCheckpointStorageLocationinitializeLocationForSavepoint方法主要是给savepoint创建一个CheckpointStorageLocation,它可以根据externalLocationPointer来创建,该值为null的话则使用defaultSavepointDirectory,该方法里头调用了createSavepointLocation抽象方法,由子类去实现MemoryBackendCheckpointStorageflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java/* * An implementation of a checkpoint storage for the {@link MemoryStateBackend}. * Depending on whether this is created with a checkpoint location, the setup supports * durable checkpoints (durable metadata) or not. /public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage { /* The target directory for checkpoints (here metadata files only). Null, if not configured. / @Nullable private final Path checkpointsDirectory; /* The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. / @Nullable private final FileSystem fileSystem; /* The maximum size of state stored in a state handle. / private final int maxStateSize; /* * Creates a new MemoryBackendCheckpointStorage. * * @param jobId The ID of the job writing the checkpoints. * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null, * in which case this storage does not support durable persistence. * @param defaultSavepointLocation The default savepoint directory, or null, if none is set. * @param maxStateSize The maximum size of each individual piece of state. * * @throws IOException Thrown if a checkpoint base directory is given configured and the * checkpoint directory cannot be created within that directory. / public MemoryBackendCheckpointStorage( JobID jobId, @Nullable Path checkpointsBaseDirectory, @Nullable Path defaultSavepointLocation, int maxStateSize) throws IOException { super(jobId, defaultSavepointLocation); checkArgument(maxStateSize > 0); this.maxStateSize = maxStateSize; if (checkpointsBaseDirectory == null) { checkpointsDirectory = null; fileSystem = null; } else { this.fileSystem = checkpointsBaseDirectory.getFileSystem(); this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId); fileSystem.mkdirs(checkpointsDirectory); } } // ———————————————————————— // Properties // ———————————————————————— /* * Gets the size (in bytes) that a individual chunk of state may have at most. */ public int getMaxStateSize() { return maxStateSize; } // ———————————————————————— // Checkpoint Storage // ———————————————————————— @Override public boolean supportsHighlyAvailableStorage() { return checkpointsDirectory != null; } @Override public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException { checkArgument(checkpointId >= 0); if (checkpointsDirectory != null) { // configured for durable metadata // prepare all the paths needed for the checkpoints checkState(fileSystem != null); final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId); // create the checkpoint exclusive directory fileSystem.mkdirs(checkpointDir); return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize); } else { // no durable metadata - typical in IDE or test setup case return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize); } } @Override public CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException { // no matter where the checkpoint goes, we always return the storage location that stores // state inline with the state handles. return new MemCheckpointStreamFactory(maxStateSize); } @Override public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { return new MemoryCheckpointOutputStream(maxStateSize); } @Override protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException { return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize); } // ———————————————————————— // Utilities // ———————————————————————— @Override public String toString() { return “MemoryBackendCheckpointStorage {” + “checkpointsDirectory=” + checkpointsDirectory + “, fileSystem=” + fileSystem + “, maxStateSize=” + maxStateSize + ‘}’; }}MemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,这里返回的是PersistentMetadataCheckpointStorageLocationMemoryBackendCheckpointStorage还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStreamsupportsHighlyAvailableStorage是根据是否有配置checkpointsDirectory来判断;initializeLocationForCheckpoint这个根据checkpointsDirectory是否有设置来创建,为null的话,创建的是NonPersistentMetadataCheckpointStorageLocation,不为null创建的是PersistentMetadataCheckpointStorageLocation;resolveCheckpointStorageLocation这里创建的是MemCheckpointStreamFactory;而createTaskOwnedStateStream创建的是MemoryCheckpointOutputStream小结CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同时定义了一个抽象方法createSavepointLocationMemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,同时还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream这里可以看到MemoryBackendCheckpointStorage虽然是memory的,但是如果有配置checkpointsDirectory(highly available storage),checkpoint location使用的是PersistentMetadataCheckpointStorageLocation,否则使用NonPersistentMetadataCheckpointStorageLocation;而savepoint location使用的是PersistentMetadataCheckpointStorageLocation(checkpiont可以选择是否使用文件存储,而metadata只能使用文件存储)docThe MemoryStateBackend