聊聊flink的OperatorStateBackend

29次阅读

共计 28559 个字符,预计需要花费 72 分钟才能阅读完成。


本文主要研究一下 flink 的 OperatorStateBackend
OperatorStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateBackend.java
/**
* Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface
* {@link Snapshotable}
*
*/
public interface OperatorStateBackend extends
OperatorStateStore,
Snapshotable<SnapshotResult<OperatorStateHandle>, Collection<OperatorStateHandle>>,
Closeable,
Disposable {

@Override
void dispose();
}
OperatorStateBackend 接口继承了 OperatorStateStore、Snapshotable、Closeable、Disposable 接口
OperatorStateStore
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/OperatorStateStore.java
/**
* This interface contains methods for registering operator state with a managed store.
*/
@PublicEvolving
public interface OperatorStateStore {

<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

Set<String> getRegisteredStateNames();

Set<String> getRegisteredBroadcastStateNames();

// ——————————————————————————————-
// Deprecated methods
// ——————————————————————————————-

@Deprecated
<S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;

@Deprecated
<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
}
OperatorStateStore 定义了 getBroadcastState、getListState、getUnionListState 方法用于 create 或 restore BroadcastState 或者 ListState;同时也定义了 getRegisteredStateNames、getRegisteredBroadcastStateNames 用于返回当前注册的 state 的名称
Snapshotable
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Snapshotable.java
/**
* Interface for operators that can perform snapshots of their state.
*
* @param <S> Generic type of the state object that is created as handle to snapshots.
* @param <R> Generic type of the state object that used in restore.
*/
@Internal
public interface Snapshotable<S extends StateObject, R> extends SnapshotStrategy<S> {

/**
* Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state
* handles from which the old state is read.
*
* @param state the old state to restore.
*/
void restore(@Nullable R state) throws Exception;
}
Snapshotable 接口继承了 SnapshotStrategy 接口,同时定义了 restore 方法用于 restore state
SnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotStrategy.java
/**
* Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at
* least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints.
*
* @param <S> type of the returned state object that represents the result of the snapshot operation.
*/
@Internal
public interface SnapshotStrategy<S extends StateObject> {

/**
* Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
* returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if
* the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed
* first before obtaining the handle.
*
* @param checkpointId The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
* @param checkpointOptions Options for how to perform this checkpoint.
* @return A runnable future that will yield a {@link StateObject}.
*/
@Nonnull
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception;
}
SnapshotStrategy 定义了 snapshot 方法,给不同的 snapshot 策略去实现,这里要求 snapshot 结果返回的类型是 StateObject 类型
AbstractSnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractSnapshotStrategy.java
/**
* Abstract base class for implementing {@link SnapshotStrategy}, that gives a consistent logging across state backends.
*
* @param <T> type of the snapshot result.
*/
public abstract class AbstractSnapshotStrategy<T extends StateObject> implements SnapshotStrategy<SnapshotResult<T>> {

private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotStrategy.class);

private static final String LOG_SYNC_COMPLETED_TEMPLATE = “{} ({}, synchronous part) in thread {} took {} ms.”;
private static final String LOG_ASYNC_COMPLETED_TEMPLATE = “{} ({}, asynchronous part) in thread {} took {} ms.”;

/** Descriptive name of the snapshot strategy that will appear in the log outputs and {@link #toString()}. */
@Nonnull
protected final String description;

protected AbstractSnapshotStrategy(@Nonnull String description) {
this.description = description;
}

/**
* Logs the duration of the synchronous snapshot part from the given start time.
*/
public void logSyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
logCompletedInternal(LOG_SYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
}

/**
* Logs the duration of the asynchronous snapshot part from the given start time.
*/
public void logAsyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
logCompletedInternal(LOG_ASYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
}

private void logCompletedInternal(
@Nonnull String template,
@Nonnull Object checkpointOutDescription,
long startTime) {

long duration = (System.currentTimeMillis() – startTime);

LOG.debug(
template,
description,
checkpointOutDescription,
Thread.currentThread(),
duration);
}

@Override
public String toString() {
return “SnapshotStrategy {” + description + “}”;
}
}
AbstractSnapshotStrategy 是个抽象类,它没有实现 SnapshotStrategy 定义的 snapshot 方法,这里只是提供了 logSyncCompleted 方法打印 debug 信息
StateObject
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateObject.java
/**
* Base of all handles that represent checkpointed state in some form. The object may hold
* the (small) state directly, or contain a file path (state is in the file), or contain the
* metadata to access the state stored in some external database.
*
* <p>State objects define how to {@link #discardState() discard state} and how to access the
* {@link #getStateSize() size of the state}.
*
* <p>State Objects are transported via RPC between <i>JobManager</i> and
* <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to support that.
*
* <p>Some State Objects are stored in the checkpoint/savepoint metadata. For long-term
* compatibility, they are not stored via {@link java.io.Serializable Java Serialization},
* but through custom serializers.
*/
public interface StateObject extends Serializable {

void discardState() throws Exception;

long getStateSize();
}
StateObject 继承了 Serializable 接口,因为会通过 rpc 在 JobManager 及 TaskManager 之间进行传输;这个接口定义了 discardState 及 getStateSize 方法,discardState 用于清理资源,而 getStateSize 用于返回 state 的大小
StreamStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StreamStateHandle.java
/**
* A {@link StateObject} that represents state that was written to a stream. The data can be read
* back via {@link #openInputStream()}.
*/
public interface StreamStateHandle extends StateObject {

/**
* Returns an {@link FSDataInputStream} that can be used to read back the data that
* was previously written to the stream.
*/
FSDataInputStream openInputStream() throws IOException;
}
StreamStateHandle 继承了 StateObject 接口,多定义了 openInputStream 方法
OperatorStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateHandle.java
/**
* Interface of a state handle for operator state.
*/
public interface OperatorStateHandle extends StreamStateHandle {

/**
* Returns a map of meta data for all contained states by their name.
*/
Map<String, StateMetaInfo> getStateNameToPartitionOffsets();

/**
* Returns an input stream to read the operator state information.
*/
@Override
FSDataInputStream openInputStream() throws IOException;

/**
* Returns the underlying stream state handle that points to the state data.
*/
StreamStateHandle getDelegateStateHandle();

//……
}
OperatorStateHandle 继承了 StreamStateHandle,它多定义了 getStateNameToPartitionOffsets、getDelegateStateHandle 方法,其中 getStateNameToPartitionOffsets 提供了 state name 到可用 partitions 的 offset 的映射信息
OperatorStreamStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
/**
* State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a
* map that contains the offsets to the partitions of named states in the stream.
*/
public class OperatorStreamStateHandle implements OperatorStateHandle {

private static final long serialVersionUID = 35876522969227335L;

/**
* unique state name -> offsets for available partitions in the handle stream
*/
private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
private final StreamStateHandle delegateStateHandle;

public OperatorStreamStateHandle(
Map<String, StateMetaInfo> stateNameToPartitionOffsets,
StreamStateHandle delegateStateHandle) {

this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle);
this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets);
}

@Override
public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
return stateNameToPartitionOffsets;
}

@Override
public void discardState() throws Exception {
delegateStateHandle.discardState();
}

@Override
public long getStateSize() {
return delegateStateHandle.getStateSize();
}

@Override
public FSDataInputStream openInputStream() throws IOException {
return delegateStateHandle.openInputStream();
}

@Override
public StreamStateHandle getDelegateStateHandle() {
return delegateStateHandle;
}

//……
}
OperatorStreamStateHandle 实现了 OperatorStateHandle 接口,它定义了 stateNameToPartitionOffsets 属性 (Map<String, StateMetaInfo>),而 getStateNameToPartitionOffsets 方法就是返回的 stateNameToPartitionOffsets 属性
SnapshotResult
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotResult.java
/**
* This class contains the combined results from the snapshot of a state backend:
* <ul>
* <li>A state object representing the state that will be reported to the Job Manager to acknowledge the checkpoint.</li>
* <li>A state object that represents the state for the {@link TaskLocalStateStoreImpl}.</li>
* </ul>
*
* Both state objects are optional and can be null, e.g. if there was no state to snapshot in the backend. A local
* state object that is not null also requires a state to report to the job manager that is not null, because the
* Job Manager always owns the ground truth about the checkpointed state.
*/
public class SnapshotResult<T extends StateObject> implements StateObject {

private static final long serialVersionUID = 1L;

/** An singleton instance to represent an empty snapshot result. */
private static final SnapshotResult<?> EMPTY = new SnapshotResult<>(null, null);

/** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
private final T jobManagerOwnedSnapshot;

/** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
private final T taskLocalSnapshot;

/**
* Creates a {@link SnapshotResult} for the given jobManagerOwnedSnapshot and taskLocalSnapshot. If the
* jobManagerOwnedSnapshot is null, taskLocalSnapshot must also be null.
*
* @param jobManagerOwnedSnapshot Snapshot for report to job manager. Can be null.
* @param taskLocalSnapshot Snapshot for report to local state manager. This is optional and requires
* jobManagerOwnedSnapshot to be not null if this is not also null.
*/
private SnapshotResult(T jobManagerOwnedSnapshot, T taskLocalSnapshot) {

if (jobManagerOwnedSnapshot == null && taskLocalSnapshot != null) {
throw new IllegalStateException(“Cannot report local state snapshot without corresponding remote state!”);
}

this.jobManagerOwnedSnapshot = jobManagerOwnedSnapshot;
this.taskLocalSnapshot = taskLocalSnapshot;
}

public T getJobManagerOwnedSnapshot() {
return jobManagerOwnedSnapshot;
}

public T getTaskLocalSnapshot() {
return taskLocalSnapshot;
}

@Override
public void discardState() throws Exception {

Exception aggregatedExceptions = null;

if (jobManagerOwnedSnapshot != null) {
try {
jobManagerOwnedSnapshot.discardState();
} catch (Exception remoteDiscardEx) {
aggregatedExceptions = remoteDiscardEx;
}
}

if (taskLocalSnapshot != null) {
try {
taskLocalSnapshot.discardState();
} catch (Exception localDiscardEx) {
aggregatedExceptions = ExceptionUtils.firstOrSuppressed(localDiscardEx, aggregatedExceptions);
}
}

if (aggregatedExceptions != null) {
throw aggregatedExceptions;
}
}

@Override
public long getStateSize() {
return jobManagerOwnedSnapshot != null ? jobManagerOwnedSnapshot.getStateSize() : 0L;
}

@SuppressWarnings(“unchecked”)
public static <T extends StateObject> SnapshotResult<T> empty() {
return (SnapshotResult<T>) EMPTY;
}

public static <T extends StateObject> SnapshotResult<T> of(@Nullable T jobManagerState) {
return jobManagerState != null ? new SnapshotResult<>(jobManagerState, null) : empty();
}

public static <T extends StateObject> SnapshotResult<T> withLocalState(
@Nonnull T jobManagerState,
@Nonnull T localState) {
return new SnapshotResult<>(jobManagerState, localState);
}
}
SnapshotResult 类实现了 StateObject 接口,它包装了 snapshot 的结果,这里包括 jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的 discardState 方法,调用了 jobManagerOwnedSnapshot 及 taskLocalSnapshot 的 discardState 方法;getStateSize 方法则返回的是 jobManagerOwnedSnapshot 的 stateSize
DefaultOperatorStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
/**
* Default implementation of OperatorStateStore that provides the ability to make snapshots.
*/
@Internal
public class DefaultOperatorStateBackend implements OperatorStateBackend {

private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);

/**
* The default namespace for state in cases where no state name is provided
*/
public static final String DEFAULT_OPERATOR_STATE_NAME = “_default_”;

/**
* Map for all registered operator states. Maps state name -> state
*/
private final Map<String, PartitionableListState<?>> registeredOperatorStates;

/**
* Map for all registered operator broadcast states. Maps state name -> state
*/
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

/**
* CloseableRegistry to participate in the tasks lifecycle.
*/
private final CloseableRegistry closeStreamOnCancelRegistry;

/**
* Default serializer. Only used for the default operator state.
*/
private final JavaSerializer<Serializable> javaSerializer;

/**
* The user code classloader.
*/
private final ClassLoader userClassloader;

/**
* The execution configuration.
*/
private final ExecutionConfig executionConfig;

/**
* Flag to de/activate asynchronous snapshots.
*/
private final boolean asynchronousSnapshots;

/**
* Map of state names to their corresponding restored state meta info.
*
* <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, StateMetaInfoSnapshot> restoredOperatorStateMetaInfos;

/**
* Map of state names to their corresponding restored broadcast state meta info.
*/
private final Map<String, StateMetaInfoSnapshot> restoredBroadcastStateMetaInfos;

/**
* Cache of already accessed states.
*
* <p>In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated
* with restored state, this map is always empty at the beginning.
*
* <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
*
* @see <a href=”https://issues.apache.org/jira/browse/FLINK-6849″>FLINK-6849</a>
*/
private final HashMap<String, PartitionableListState<?>> accessedStatesByName;

private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;

private final AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy;

public DefaultOperatorStateBackend(
ClassLoader userClassLoader,
ExecutionConfig executionConfig,
boolean asynchronousSnapshots) {

this.closeStreamOnCancelRegistry = new CloseableRegistry();
this.userClassloader = Preconditions.checkNotNull(userClassLoader);
this.executionConfig = executionConfig;
this.javaSerializer = new JavaSerializer<>();
this.registeredOperatorStates = new HashMap<>();
this.registeredBroadcastStates = new HashMap<>();
this.asynchronousSnapshots = asynchronousSnapshots;
this.accessedStatesByName = new HashMap<>();
this.accessedBroadcastStatesByName = new HashMap<>();
this.restoredOperatorStateMetaInfos = new HashMap<>();
this.restoredBroadcastStateMetaInfos = new HashMap<>();
this.snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy();
}

@Override
public Set<String> getRegisteredStateNames() {
return registeredOperatorStates.keySet();
}

@Override
public Set<String> getRegisteredBroadcastStateNames() {
return registeredBroadcastStates.keySet();
}

@Override
public void close() throws IOException {
closeStreamOnCancelRegistry.close();
}

@Override
public void dispose() {
IOUtils.closeQuietly(closeStreamOnCancelRegistry);
registeredOperatorStates.clear();
registeredBroadcastStates.clear();
}

// ——————————————————————————————-
// State access methods
// ——————————————————————————————-

@SuppressWarnings(“unchecked”)
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
//……
}

@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}

@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}

@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {

long syncStartTime = System.currentTimeMillis();

RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);

snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
return snapshotRunner;
}

//……
}

DefaultOperatorStateBackend 实现了 OperatorStateBackend 接口
getRegisteredStateNames 方法返回的是 registeredOperatorStates.keySet();getRegisteredBroadcastStateNames 方法返回的是 registeredBroadcastStates.keySet(),可以看到这两个都是基于内存的 Map 来实现的
close 方法主要是调用 closeStreamOnCancelRegistry 的 close 方法;dispose 方法也会关闭 closeStreamOnCancelRegistry,同时清空 registeredOperatorStates 及 registeredBroadcastStates
getListState 及 getUnionListState 方法都调用了 getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode) 方法
snapshot 方法使用的 snapshotStrategy 是 DefaultOperatorStateBackendSnapshotStrategy

DefaultOperatorStateBackend.getListState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
private <S> ListState<S> getListState(
ListStateDescriptor<S> stateDescriptor,
OperatorStateHandle.Mode mode) throws StateMigrationException {

Preconditions.checkNotNull(stateDescriptor);
String name = Preconditions.checkNotNull(stateDescriptor.getName());

@SuppressWarnings(“unchecked”)
PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name);
if (previous != null) {
checkStateNameAndMode(
previous.getStateMetaInfo().getName(),
name,
previous.getStateMetaInfo().getAssignmentMode(),
mode);
return previous;
}

// end up here if its the first time access after execution for the
// provided state name; check compatibility of restored state, if any
// TODO with eager registration in place, these checks should be moved to restore()

stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());

@SuppressWarnings(“unchecked”)
PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name);

if (null == partitionableListState) {
// no restored state for the state name; simply create new state holder

partitionableListState = new PartitionableListState<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));

registeredOperatorStates.put(name, partitionableListState);
} else {
// has restored state; check compatibility of new state access

checkStateNameAndMode(
partitionableListState.getStateMetaInfo().getName(),
name,
partitionableListState.getStateMetaInfo().getAssignmentMode(),
mode);

StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);

// check compatibility to determine if state migration is required
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();

@SuppressWarnings(“unchecked”)
TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
(TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));

TypeSerializerSchemaCompatibility<S> stateCompatibility =
stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException(“The new state serializer for operator state must not be incompatible.”);
}

partitionableListState.setStateMetaInfo(
new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
}

accessedStatesByName.put(name, partitionableListState);
return partitionableListState;
}
从 registeredOperatorStates 获取对应 PartitionableListState,没有的话则创建,有的话则检查下兼容性,然后往 partitionableListState 设置 stateMetaInfo
DefaultOperatorStateBackendSnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
/**
* Snapshot strategy for this backend.
*/
private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {

protected DefaultOperatorStateBackendSnapshotStrategy() {
super(“DefaultOperatorStateBackend snapshot”);
}

@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull final CheckpointOptions checkpointOptions) throws IOException {

if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
return DoneFuture.of(SnapshotResult.empty());
}

final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
new HashMap<>(registeredOperatorStates.size());
final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
new HashMap<>(registeredBroadcastStates.size());

ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(userClassloader);
try {
// eagerly create deep copies of the list and the broadcast states (if any)
// in the synchronous phase, so that we can use them in the async writing.

if (!registeredOperatorStates.isEmpty()) {
for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
PartitionableListState<?> listState = entry.getValue();
if (null != listState) {
listState = listState.deepCopy();
}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
}
}

if (!registeredBroadcastStates.isEmpty()) {
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
if (null != broadcastState) {
broadcastState = broadcastState.deepCopy();
}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
}
}
} finally {
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}

AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {

@Override
protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {

CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
registerCloseableForCancellation(localOut);

// get the registered operator state infos …
List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
new ArrayList<>(registeredOperatorStatesDeepCopies.size());

for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}

// … get the registered broadcast operator state infos …
List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}

// … write them all in the checkpoint stream …
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

backendSerializationProxy.write(dov);

// … and then go for the states …

// we put BOTH normal and broadcast state metadata here
int initialMapCapacity =
registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
new HashMap<>(initialMapCapacity);

for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {

PartitionableListState<?> value = entry.getValue();
long[] partitionOffsets = value.write(localOut);
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}

// … and the broadcast states themselves …
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {

BackendWritableBroadcastState<?, ?> value = entry.getValue();
long[] partitionOffsets = {value.write(localOut)};
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}

// … and, finally, create the state handle.
OperatorStateHandle retValue = null;

if (unregisterCloseableFromCancellation(localOut)) {

StreamStateHandle stateHandle = localOut.closeAndGetHandle();

if (stateHandle != null) {
retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
}

return SnapshotResult.of(retValue);
} else {
throw new IOException(“Stream was already unregistered.”);
}
}

@Override
protected void cleanupProvidedResources() {
// nothing to do
}

@Override
protected void logAsyncSnapshotComplete(long startTime) {
if (asynchronousSnapshots) {
logAsyncCompleted(streamFactory, startTime);
}
}
};

final FutureTask<SnapshotResult<OperatorStateHandle>> task =
snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);

if (!asynchronousSnapshots) {
task.run();
}

return task;
}
}

DefaultOperatorStateBackendSnapshotStrategy 继承了 AbstractSnapshotStrategy,它实现的 snapshot 方法主要是创建 registeredOperatorStatesDeepCopies 及 registeredBroadcastStatesDeepCopies,然后通过 AsyncSnapshotCallable 来实现
AsyncSnapshotCallable 抽象类实现了 Callable 接口的 call 方法,该方法会调用 callInternal 方法,然后再执行 logAsyncSnapshotComplete 方法
AsyncSnapshotCallable 的 callInternal 方法返回的是 SnapshotResult<OperatorStateHandle>,它里头主要是将 registeredOperatorStatesDeepCopies 及 registeredBroadcastStatesDeepCopies 的数据写入到 CheckpointStreamFactory(比如 MemCheckpointStreamFactory).CheckpointStateOutputStream 及 writtenStatesMetaData,最后通过 CheckpointStateOutputStream 的 closeAndGetHandle 返回的 stateHandle 及 writtenStatesMetaData 创建 OperatorStreamStateHandle 返回

小结

OperatorStateBackend 接口继承了 OperatorStateStore、Snapshotable、Closeable、Disposable 接口
OperatorStateStore 定义了 getBroadcastState、getListState、getUnionListState 方法用于 create 或 restore BroadcastState 或者 ListState;同时也定义了 getRegisteredStateNames、getRegisteredBroadcastStateNames 用于返回当前注册的 state 的名称;DefaultOperatorStateBackend 实现了 OperatorStateStore 接口,getRegisteredStateNames 方法返回的是 registeredOperatorStates.keySet();getRegisteredBroadcastStateNames 方法返回的是 registeredBroadcastStates.keySet()(registeredOperatorStates 及 registeredBroadcastStates 这两个都是内存的 Map);getListState 及 getUnionListState 方法都调用了 getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode) 方法
Snapshotable 接口继承了 SnapshotStrategy 接口,同时定义了 restore 方法用于 restore state;SnapshotStrategy 定义了 snapshot 方法,给不同的 snapshot 策略去实现,这里要求 snapshot 结果返回的类型是 StateObject 类型;AbstractSnapshotStrategy 是个抽象类,它没有实现 SnapshotStrategy 定义的 snapshot 方法,这里只是提供了 logSyncCompleted 方法打印 debug 信息
DefaultOperatorStateBackend 实现了 Snapshotable 接口,snapshot 方法使用的 snapshotStrategy 是 DefaultOperatorStateBackendSnapshotStrategy;DefaultOperatorStateBackendSnapshotStrategy 继承了 AbstractSnapshotStrategy,它实现的 snapshot 方法主要是创建 registeredOperatorStatesDeepCopies 及 registeredBroadcastStatesDeepCopies,然后通过 AsyncSnapshotCallable 来实现,它里头主要是将 registeredOperatorStatesDeepCopies 及 registeredBroadcastStatesDeepCopies 的数据写入到 CheckpointStreamFactory(比如 MemCheckpointStreamFactory).CheckpointStateOutputStream 及 writtenStatesMetaData
Snapshotable 接口要求 source 的泛型为 StateObject 类型,StateObject 继承了 Serializable 接口,因为会通过 rpc 在 JobManager 及 TaskManager 之间进行传输;OperatorStateBackend 继承 Snapshotable 接口时,指定 source 为 SnapshotResult<OperatorStateHandle>,而 result 的为 Collection<OperatorStateHandle> 类型
StreamStateHandle 继承了 StateObject 接口,多定义了 openInputStream 方法;OperatorStateHandle 继承了 StreamStateHandle,它多定义了 getStateNameToPartitionOffsets、getDelegateStateHandle 方法,其中 getStateNameToPartitionOffsets 提供了 state name 到可用 partitions 的 offset 的映射信息;OperatorStreamStateHandle 实现了 OperatorStateHandle 接口,它定义了 stateNameToPartitionOffsets 属性 (Map<String,StateMetaInfo>),而 getStateNameToPartitionOffsets 方法就是返回的 stateNameToPartitionOffsets 属性
SnapshotResult 类实现了 StateObject 接口,它包装了 snapshot 的结果,这里包括 jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的 discardState 方法,调用了 jobManagerOwnedSnapshot 及 taskLocalSnapshot 的 discardState 方法;getStateSize 方法则返回的是 jobManagerOwnedSnapshot 的 stateSize

doc
State Backends

正文完
 0