聊聊flink的ListCheckpointed

59次阅读

共计 4011 个字符,预计需要花费 11 分钟才能阅读完成。


本文主要研究一下 flink 的 ListCheckpointed
实例
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {

/** current offset for exactly once semantics */
private Long offset;

/** flag for job cancellation */
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();

while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}

@Override
public void cancel() {
isRunning = false;
}

@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}

@Override
public void restoreState(List<Long> state) {
for (Long s : state)
offset = s;
}
}
CounterSource 是一个有状态的 RichParallelSourceFunction,它实现了 ListCheckpointed 接口,snapshotState 方法返回了当前的 offset,而 restoreState 方法则根据传入的 state 来恢复本地的 offset;这里要注意,如果要在 failure 或者 recovery 的时候达到 exactly-once 的语义,这里更新 offset 的时候要使用 SourceContext.getCheckpointLock 来进行同步操作
ListCheckpointed
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@PublicEvolving
public interface ListCheckpointed<T extends Serializable> {

/**
* Gets the current state of the function. The state must reflect the result of all prior
* invocations to this function.
*
* <p>The returned list should contain one entry for redistributable unit of state. See
* the {@link ListCheckpointed class docs} for an illustration how list-style state
* redistribution works.
*
* <p>As special case, the returned list may be null or empty (if the operator has no state)
* or it may contain a single element (if the operator state is indivisible).
*
* @param checkpointId The ID of the checkpoint – a unique and monotonously increasing value.
* @param timestamp The wall clock timestamp when the checkpoint was triggered by the master.
*
* @return The operator state in a list of redistributable, atomic sub-states.
* Should not return null, but empty list instead.
*
* @throws Exception Thrown if the creation of the state object failed. This causes the
* checkpoint to fail. The system may decide to fail the operation (and trigger
* recovery), or to discard this checkpoint attempt and to continue running
* and to try again with the next checkpoint attempt.
*/
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

/**
* Restores the state of the function or operator to that of a previous checkpoint.
* This method is invoked when the function is executed after a failure recovery.
* The state list may be empty if no state is to be recovered by the particular parallel instance
* of the function.
*
* <p>The given state list will contain all the <i>sub states</i> that this parallel
* instance of the function needs to handle. Refer to the {@link ListCheckpointed class docs}
* for an illustration how list-style state redistribution works.
*
* <p><b>Important:</b> When implementing this interface together with {@link RichFunction},
* then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}.
*
* @param state The state to be restored as a list of atomic sub-states.
*
* @throws Exception Throwing an exception in this method causes the recovery to fail.
* The exact consequence depends on the configured failure handling strategy,
* but typically the system will re-attempt the recovery, or try recovering
* from a different checkpoint.
*/
void restoreState(List<T> state) throws Exception;
}

ListCheckpointed 定义了两个接口,一个是 snapshotState 方法,一个是 restoreState 方法
snapshotState 方法,方法有个 checkpointId 参数,是唯一单调递增的数字,而 timestamp 则是 master 触发 checkpoint 的时间戳,该方法要返回当前的 state(List 结构)
restoreState 方法会在 failure recovery 的时候被调用,传递的参数为 List 类型的 state,方法里头可以将 state 恢复到本地

小结

stateful function 可以通过 CheckpointedFunction 接口或者 ListCheckpointed 接口来使用 managed operator state;对于 manageed operator state,目前仅仅支持 list-style 的形式,即要求 state 是 serializable objects 的 List 结构,方便在 rescale 的时候进行 redistributed;关于 redistribution schemes 的模式目前有两种,分别是 Even-split redistribution(在 restore/redistribution 的时候每个 operator 仅仅得到整个 state 的 sublist) 及 Union redistribution(在 restore/redistribution 的时候每个 operator 得到整个 state 的完整 list)
ListCheckpointed 是 CheckpointedFunction 的限制版,它只能支持 Even-split redistribution 模式的 list-style state
ListCheckpointed 定义了两个方法,分别是 snapshotState 方法及 restoreState 方法;snapshotState 方法在 master 触发 checkpoint 的时候被调用,用户需要返回当前的状态,而 restoreState 方法会在 failure recovery 的时候被调用,传递的参数为 List 类型的 state,方法里头可以将 state 恢复到本地

doc

Working with State listcheckpointed
聊聊 flink 的 CheckpointedFunction

正文完
 0