乐趣区

聊聊flink的CheckpointedFunction


本文主要研究一下 flink 的 CheckpointedFunction
实例
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {

private final int threshold;

private transient ListState<Tuple2<String, Integer>> checkpointedState;

private List<Tuple2<String, Integer>> bufferedElements;

public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}

@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
“buffered-elements”,
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}

这个 BufferingSink 实现了 CheckpointedFunction 接口,它定义了 ListState 类型的 checkpointedState,以及 List 结构的 bufferedElements
在 invoke 方法里头先将 value 缓存到 bufferedElements,缓存个数触发阈值时,执行 sink 操作,然后清空 bufferedElements
在 snapshotState 方法里头对 bufferedElements 进行 snapshot 操作,在 initializeState 先创建 ListStateDescriptor,然后通过 FunctionInitializationContext.getOperatorStateStore().getListState(descriptor)来获取 ListState,之后判断 state 是否有在前一次 execution 的 snapshot 中 restored,如果有则将 ListState 中的数据恢复到 bufferedElements

CheckpointedFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@PublicEvolving
@SuppressWarnings(“deprecation”)
public interface CheckpointedFunction {

/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;

/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;

}

CheckpointedFunction 是 stateful transformation functions 的核心接口,用于跨 stream 维护 state
snapshotState 在 checkpoint 的时候会被调用,用于 snapshot state,通常用于 flush、commit、synchronize 外部系统
initializeState 在 parallel function 被创建时调用,通常用于初始化存储于 state 的数据

FunctionSnapshotContext
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/FunctionSnapshotContext.java
/**
* This interface provides a context in which user functions that use managed state (i.e. state that is managed by state
* backends) can participate in a snapshot. As snapshots of the backends themselves are taken by the system, this
* interface mainly provides meta information about the checkpoint.
*/
@PublicEvolving
public interface FunctionSnapshotContext extends ManagedSnapshotContext {
}
FunctionSnapshotContext 继承了 ManagedSnapshotContext 接口
ManagedSnapshotContext
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ManagedSnapshotContext.java
/**
* This interface provides a context in which operators that use managed state (i.e. state that is managed by state
* backends) can perform a snapshot. As snapshots of the backends themselves are taken by the system, this interface
* mainly provides meta information about the checkpoint.
*/
@PublicEvolving
public interface ManagedSnapshotContext {

/**
* Returns the ID of the checkpoint for which the snapshot is taken.
*
* <p>The checkpoint ID is guaranteed to be strictly monotonously increasing across checkpoints.
* For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > ID_A} means that checkpoint
* <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i> contains a later state
* than checkpoint <i>A</i>.
*/
long getCheckpointId();

/**
* Returns timestamp (wall clock time) when the master node triggered the checkpoint for which
* the state snapshot is taken.
*/
long getCheckpointTimestamp();
}
ManagedSnapshotContext 定义了 getCheckpointId、getCheckpointTimestamp 方法
FunctionInitializationContext
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/FunctionInitializationContext.java
/**
* This interface provides a context in which user functions can initialize by registering to managed state (i.e. state
* that is managed by state backends).
*
* <p>
* Operator state is available to all functions, while keyed state is only available for functions after keyBy.
*
* <p>
* For the purpose of initialization, the context signals if the state is empty or was restored from a previous
* execution.
*
*/
@PublicEvolving
public interface FunctionInitializationContext extends ManagedInitializationContext {
}
FunctionInitializationContext 继承了 ManagedInitializationContext 接口
ManagedInitializationContext
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ManagedInitializationContext.java
/**
* This interface provides a context in which operators can initialize by registering to managed state (i.e. state that
* is managed by state backends).
*
* <p>
* Operator state is available to all operators, while keyed state is only available for operators after keyBy.
*
* <p>
* For the purpose of initialization, the context signals if the state is empty (new operator) or was restored from
* a previous execution of this operator.
*
*/
public interface ManagedInitializationContext {

/**
* Returns true, if state was restored from the snapshot of a previous execution. This returns always false for
* stateless tasks.
*/
boolean isRestored();

/**
* Returns an interface that allows for registering operator state with the backend.
*/
OperatorStateStore getOperatorStateStore();

/**
* Returns an interface that allows for registering keyed state with the backend.
*/
KeyedStateStore getKeyedStateStore();

}
ManagedInitializationContext 接口定义了 isRestored、getOperatorStateStore、getKeyedStateStore 方法
小结

flink 有两种基本的 state,分别是 Keyed State 以及 Operator State(non-keyed state);其中 Keyed State 只能在 KeyedStream 上的 functions 及 operators 上使用;每个 operator state 会跟 parallel operator 中的一个实例绑定;Operator State 支持 parallelism 变更时进行 redistributing
Keyed State 及 Operator State 都分别有 managed 及 raw 两种形式,managed 由 flink runtime 来管理,由 runtime 负责 encode 及写入 checkpoint;raw 形式的 state 由 operators 自己管理,flink runtime 无法了解该 state 的数据结构,将其视为 raw bytes;所有的 datastream function 都可以使用 managed state,而 raw state 一般仅限于自己实现 operators 来使用
stateful function 可以通过 CheckpointedFunction 接口或者 ListCheckpointed 接口来使用 managed operator state;CheckpointedFunction 定义了 snapshotState、initializeState 两个方法;每当 checkpoint 执行的时候,snapshotState 会被调用;而 initializeState 方法在每次用户定义的 function 初始化的时候 (第一次初始化或者从前一次 checkpoint recover 的时候) 被调用,该方法不仅可以用来初始化 state,还可以用于处理 state recovery 的逻辑
对于 manageed operator state,目前仅仅支持 list-style 的形式,即要求 state 是 serializable objects 的 List 结构,方便在 rescale 的时候进行 redistributed;关于 redistribution schemes 的模式目前有两种,分别是 Even-split redistribution(在 restore/redistribution 的时候每个 operator 仅仅得到整个 state 的 sublist)及 Union redistribution(在 restore/redistribution 的时候每个 operator 得到整个 state 的完整 list)
FunctionSnapshotContext 继承了 ManagedSnapshotContext 接口,它定义了 getCheckpointId、getCheckpointTimestamp 方法;FunctionInitializationContext 继承了 ManagedInitializationContext 接口,它定义了 isRestored、getOperatorStateStore、getKeyedStateStore 方法,可以用来判断是否是在前一次 execution 的 snapshot 中 restored,以及获取 OperatorStateStore、KeyedStateStore 对象

doc
Using Managed Operator State

退出移动版