共计 13404 个字符,预计需要花费 34 分钟才能阅读完成。
序
本文主要研究一下 flink StreamOperator 的 initializeState 方法
Task.run
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener {
public void run() {
// —————————-
// Initial State transition
// —————————-
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
// success, we can start our work
break;
}
}
else if (current == ExecutionState.FAILED) {
// we were immediately failed. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
else if (current == ExecutionState.CANCELING) {
if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
// we were immediately canceled. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
}
else {
if (metrics != null) {
metrics.close();
}
throw new IllegalStateException(“Invalid state for beginning of operation of task ” + this + ‘.’);
}
}
// all resource acquisitions and registrations from here on
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
AbstractInvokable invokable = null;
try {
// —————————-
// Task Bootstrap – We periodically
// check for canceling as a shortcut
// —————————-
//……
// —————————————————————-
// call the user code initialization methods
// —————————————————————-
TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
Environment env = new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
producedPartitions,
inputGates,
network.getTaskEventDispatcher(),
checkpointResponder,
taskManagerConfig,
metrics,
this);
// now load and instantiate the task’s invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// —————————————————————-
// actual task core work
// —————————————————————-
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
// make sure, we enter the catch block if the task leaves the invoke() method due
// to the fact that it has been canceled
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// —————————————————————-
// finalization of a successful execution
// —————————————————————-
// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartition partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
}
// try to mark the task as finished
// if that fails, the task was canceled/failed in the meantime
if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
}
catch (Throwable t) {
//……
}
finally {
//……
}
}
//……
}
Task 的 run 方法会调用 invokable.invoke(),这里的 invokable 为 StreamTask
StreamTask.invoke
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
// ——– Initialize ———
LOG.debug(“Initializing {}.”, getName());
asyncOperationsThreadPool = Executors.newCachedThreadPool();
CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
getExecutionConfig().isFailTaskOnCheckpointError(),
getEnvironment());
asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
“Time Trigger for ” + getName(), getUserCodeClassLoader());
timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
}
operatorChain = new OperatorChain<>(this, streamRecordWriters);
headOperator = operatorChain.getHeadOperator();
// task specific initialization
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// ——– Invoke ——–
LOG.debug(“Invoking {}”, getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
synchronized (lock) {
// both the following operations are protected by the lock
// so that we avoid race conditions in the case that initializeState()
// registers a timer, that fires before the open() is called.
initializeState();
openAllOperators();
}
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
isRunning = true;
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the “clean shutdown” is not attempted
if (canceled) {
throw new CancelTaskException();
}
LOG.debug(“Finished task {}”, getName());
// make sure no further checkpoint and notification actions happen.
// we make sure that no other thread is currently in the locked scope before
// we close the operators by trying to acquire the checkpoint scope lock
// we also need to make sure that no triggers fire concurrently with the close logic
// at the same time, this makes sure that during any “regular” exit where still
synchronized (lock) {
// this is part of the main logic, so if this fails, the task is considered failed
closeAllOperators();
// make sure no new timers can come
timerService.quiesce();
// only set the StreamTask to not running after all operators have been closed!
// See FLINK-7430
isRunning = false;
}
// make sure all timers finish
timerService.awaitPendingAfterQuiesce();
LOG.debug(“Closed operators for task {}”, getName());
// make sure all buffered data is flushed
operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
tryDisposeAllOperators();
disposed = true;
}
finally {
//……
}
}
private void initializeState() throws Exception {
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (StreamOperator<?> operator : allOperators) {
if (null != operator) {
operator.initializeState();
}
}
}
//……
}
StreamTask 的 invoke 方法会调用 initializeState 方法,该方法会遍历 operatorChain 上的 allOperators(StreamOperator),调用其 initializeState 方法;比如这里的 operator 为 StreamSource
StreamOperator.initializeState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
/**
* Provides a context to initialize all state in the operator.
*/
void initializeState() throws Exception;
//……
}
StreamOperator 接口定义了 initializeState 方法用于初始化 operator 的 state
StreamSource.initializeState
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
//……
}
StreamSource 继承了 AbstractUdfStreamOperator,它没有覆盖 initializeState,而 AbstractUdfStreamOperator 也没有覆盖 initializeState 方法,因而是执行的是 AbstractUdfStreamOperator 的父类 AbstractStreamOperator 的 initializeState
AbstractStreamOperator.initializeState
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, Serializable {
@Override
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask =
Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics);
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
if (keyedStateBackend != null) {
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
timeServiceManager = context.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
}
}
/**
* Stream operators with state which can be restored need to override this hook method.
*
* @param context context that allows to register different states.
*/
public void initializeState(StateInitializationContext context) throws Exception {
}
//……
}
AbstractStreamOperator 实现了 StreamOperator 接口定义的 initializeState 方法,该方法会调用 initializeState(initializationContext) 方法,其子类 AbstractUdfStreamOperator 对该方法进行了覆盖
AbstractUdfStreamOperator.initializeState(initializationContext)
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
//……
}
initializeState(initializationContext) 方法这里调用了 StreamingFunctionUtils.restoreFunctionState
StreamingFunctionUtils.restoreFunctionState
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
public static void restoreFunctionState(
StateInitializationContext context,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
while (true) {
if (tryRestoreFunction(context, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean tryRestoreFunction(
StateInitializationContext context,
Function userFunction) throws Exception {
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).initializeState(context);
return true;
}
if (context.isRestored() && userFunction instanceof ListCheckpointed) {
@SuppressWarnings(“unchecked”)
ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
ListState<Serializable> listState = context.getOperatorStateStore().
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
List<Serializable> list = new ArrayList<>();
for (Serializable serializable : listState.get()) {
list.add(serializable);
}
try {
listCheckpointedFun.restoreState(list);
} catch (Exception e) {
throw new Exception(“Failed to restore state to function: ” + e.getMessage(), e);
}
return true;
}
return false;
}
restoreFunctionState 主要是调用了 tryRestoreFunction 方法,而该方法会判断,如果 userFunction 实现了 CheckpointedFunction 接口则调用其 initializeState 方法,如果 userFunction 实现了 ListCheckpointed 接口,而且是 context.isRestored() 为 true,那么就会从 OperatorStateStore 获取 ListState,将里头的值转换为 List,调用 ListCheckpointed.restoreState 方法
小结
Task 的 run 方法会触发 invokable.invoke(),这里的 invokable 为 StreamTask,StreamTask 的 invoke 方法会调用 initializeState 方法,该方法会遍历 operatorChain 上的 allOperators(StreamOperator),调用其 initializeState 方法;比如这里的 operator 为 StreamSource,它继承了 AbstractUdfStreamOperator
StreamOperator 接口定义了 initializeState 方法用于初始化 operator 的 state,其抽象子类 AbstractStreamOperator 实现了 initializeState 方法,但是它内部会调用调用 initializeState(initializationContext) 方法,而其子类 AbstractUdfStreamOperator 对该方法进行了覆盖
AbstractUdfStreamOperator 的 initializeState(initializationContext) 方法调用了 StreamingFunctionUtils.restoreFunctionState,而后者会判断,如果 userFunction 实现了 CheckpointedFunction 接口则调用其 initializeState 方法,如果 userFunction 实现了 ListCheckpointed 接口,而且是 context.isRestored() 为 true,那么就会从 OperatorStateStore 获取 ListState,将里头的值转换为 List,调用 ListCheckpointed.restoreState 方法
doc
Working with State