共计 33028 个字符,预计需要花费 83 分钟才能阅读完成。
作者:闻乃松
应用 Flink 实时生产 kafka 数据时候,波及到 offset 的状态保护,为了保障 Flink 作业重启或者运行时的 Operator 级别的失败重试,如果要做到“断点续跑”,须要 Flink 的 Checkpoint 的反对。问题是,如果简略的开启 Flink 的 Checkpoint 机制,而不须要额定的编码工作,是否能达到目标?为答复该问题,本文首先要钻研 Flink 的 Checkpoint 的解决机制,而后再看 Flink 是否反对 Kafka 的状态存储,于是本文分以下四个局部:
- Flink Checkpoint 状态快照(snapshotState)次要流程
- Flink Checkpoint 状态初始化(initializeState)次要流程
- Kafka Source Operator 对 Flink Checkpoint 实现
- Kafka Source Operator 状态复原
为了精确形容起见,本文以 Flink 1.12.x 版本,Kafka 客户端版本 2.4.x 为例阐明。
Flink Checkpoint 状态快照(snapshotState)次要流程
咱们已知 Flink Checkpoint 由 CheckpointCoordinator 周期性发动,它通过向相干的 tasks 发送触发音讯和从各 tasks 收集确认音讯(Ack)来实现 checkpoint。这里省略 CheckpointCoordinator 发动调用逻辑解析,直奔音讯受体 TaskExecutor,来看 Checkpoint 的执行流程,在 TaskExecutor 中获取 Task 实例,触发 triggerCheckpointBarrier:
TaskExecutor.class | |
@Override | |
public CompletableFuture<Acknowledge> triggerCheckpoint( | |
ExecutionAttemptID executionAttemptID, | |
long checkpointId, | |
long checkpointTimestamp, | |
CheckpointOptions checkpointOptions) { | |
... | |
final Task task = taskSlotTable.getTask(executionAttemptID); | |
if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); | |
return CompletableFuture.completedFuture(Acknowledge.get()); | |
... | |
} | |
} |
Task 是在 TaskExecutor 中的调度执行单元,也响应 Checkpoint 申请:
Task.class | |
public void triggerCheckpointBarrier( | |
final long checkpointID, | |
final long checkpointTimestamp, | |
final CheckpointOptions checkpointOptions) { | |
... | |
final CheckpointMetaData checkpointMetaData = | |
new CheckpointMetaData(checkpointID, checkpointTimestamp); | |
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); | |
... | |
} |
其中的看点是 invokable,为 AbstractInvokable 类型的对象,依据调用类动静实例化:
// now load and instantiate the task's invokable code | |
AbstractInvokable invokable = | |
loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env); |
其中 nameOfInvokableClass 参数 在 Task 初始化时传入,动态创建 AbstractInvokable 实例,比方以一个 SourceOperator 为例,其类名称为:
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask
从 SourceOperatorStreamTask 类定义来看,它又是 StreamTask 的子类:
class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>>
triggerCheckpointAsync 办法接连调用 SourceOperatorStreamTask 和 StreamTask 类的 triggerCheckpointAsync 办法,次要逻辑是在 StreamTask 的 triggerCheckpointAsync 办法中:
StreamTask.class | |
@Override | |
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { | |
... | |
triggerCheckpoint(checkpointMetaData, checkpointOptions) | |
... | |
} | |
private boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) | |
throws Exception { | |
// No alignment if we inject a checkpoint | |
CheckpointMetricsBuilder checkpointMetrics = | |
new CheckpointMetricsBuilder() | |
.setAlignmentDurationNanos(0L) | |
.setBytesProcessedDuringAlignment(0L); | |
... | |
boolean success = | |
performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); | |
if (!success) {declineCheckpoint(checkpointMetaData.getCheckpointId()); | |
} | |
return success; | |
} | |
private boolean performCheckpoint( | |
CheckpointMetaData checkpointMetaData, | |
CheckpointOptions checkpointOptions, | |
CheckpointMetricsBuilder checkpointMetrics) | |
throws Exception { | |
... | |
subtaskCheckpointCoordinator.checkpointState( | |
checkpointMetaData, | |
checkpointOptions, | |
checkpointMetrics, | |
operatorChain, | |
this::isRunning); | |
... | |
} |
其中 subtaskCheckpointCoordinator 是 SubtaskCheckpointCoordinatorImpl 类型实例,负责协调子工作相干的 checkpoint 工作:
/** | |
* Coordinates checkpointing-related work for a subtask (i.e. {@link | |
* org.apache.flink.runtime.taskmanager.Task Task} and {@link StreamTask}). Responsibilities: | |
* | |
* <ol> | |
* <li>build a snapshot (invokable) | |
* <li>report snapshot to the JobManager | |
* <li>action upon checkpoint notification | |
* <li>maintain storage locations | |
* </ol> | |
*/ | |
@Internal | |
public interface SubtaskCheckpointCoordinator extends Closeable |
上面是 SubtaskCheckpointCoordinatorImpl 实现类中的 checkpointState 次要逻辑:
SubtaskCheckpointCoordinatorImpl.class | |
@Override | |
public void checkpointState( | |
CheckpointMetaData metadata, | |
CheckpointOptions options, | |
CheckpointMetricsBuilder metrics, | |
OperatorChain<?, ?> operatorChain, | |
Supplier<Boolean> isRunning) | |
throws Exception { | |
// All of the following steps happen as an atomic step from the perspective of barriers and | |
// records/watermarks/timers/callbacks. | |
// We generally try to emit the checkpoint barrier as soon as possible to not affect | |
// downstream | |
// checkpoint alignments | |
if (lastCheckpointId >= metadata.getCheckpointId()) { | |
LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", | |
lastCheckpointId, | |
metadata.getCheckpointId()); | |
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); | |
checkAndClearAbortedStatus(metadata.getCheckpointId()); | |
return; | |
} | |
lastCheckpointId = metadata.getCheckpointId(); | |
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { | |
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to | |
// checkpoint barrier align. | |
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId())); | |
return; | |
} | |
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. | |
// The pre-barrier work should be nothing or minimal in the common case. | |
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId()); | |
// Step (2): Send the checkpoint barrier downstream | |
operatorChain.broadcastEvent(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), | |
options.isUnalignedCheckpoint()); | |
// Step (3): Prepare to spill the in-flight buffers for input and output | |
if (options.isUnalignedCheckpoint()) { | |
// output data already written while broadcasting event | |
channelStateWriter.finishOutput(metadata.getCheckpointId()); | |
} | |
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact | |
// progress of the | |
// streaming topology | |
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = | |
new HashMap<>(operatorChain.getNumberOfOperators()); | |
if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning); | |
} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined")); | |
} | |
} |
在 Step (1) 中看到 prepareSnapshotPreBarrier,在正式 snapshot 之前做了一些轻量级的筹备工作,具体操作实现在 OperatorChain 中,顺次调用链中每个 StreamOperator 的 prepareSnapshotPreBarrier 办法:
OperatorChain.class | |
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { | |
// go forward through the operator chain and tell each operator | |
// to prepare the checkpoint | |
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {if (!operatorWrapper.isClosed()) {operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId); | |
} | |
} | |
} |
通过一系列快照查看验证、快照前的筹备、向上游播送事件操作,最终落脚到本类的 checkpointStreamOperator 办法:
SubtaskCheckpointCoordinatorImpl.class | |
private static OperatorSnapshotFutures checkpointStreamOperator( | |
StreamOperator<?> op, | |
CheckpointMetaData checkpointMetaData, | |
CheckpointOptions checkpointOptions, | |
CheckpointStreamFactory storageLocation, | |
Supplier<Boolean> isRunning) | |
throws Exception { | |
try { | |
return op.snapshotState(checkpointMetaData.getCheckpointId(), | |
checkpointMetaData.getTimestamp(), | |
checkpointOptions, | |
storageLocation); | |
} catch (Exception ex) {if (isRunning.get()) {LOG.info(ex.getMessage(), ex); | |
} | |
throw ex; | |
} | |
} |
该办法又调用 AbstractStreamOperator 的 snapshotState:
AbstractStreamOperator.class | |
@Override | |
public final OperatorSnapshotFutures snapshotState( | |
long checkpointId, | |
long timestamp, | |
CheckpointOptions checkpointOptions, | |
CheckpointStreamFactory factory) | |
throws Exception { | |
return stateHandler.snapshotState( | |
this, | |
Optional.ofNullable(timeServiceManager), | |
getOperatorName(), | |
checkpointId, | |
timestamp, | |
checkpointOptions, | |
factory, | |
isUsingCustomRawKeyedState()); | |
} |
snapshotState 又将 checkpoint 逻辑委派到 StreamOperatorStateHandler。StreamOperatorStateHandler 的逻辑下文再介绍。梳理上述 snapshot 逻辑流程,可视化体现为:
Flink Checkpoint 状态初始化(initializeState)次要流程
上文中提到的 Task 初始化启动会调用 AbstractInvokable 的 invoke 办法,
// now load and instantiate the task's invokable code | |
AbstractInvokable invokable = | |
loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env); | |
// run the invokable | |
invokable.invoke(); |
invoke 在其父类 StreamTask 中的 invoke 办法实现调用前、运行事件循环和调用后的 Template 策略动作:
StreamTask.class | |
@Override | |
public final void invoke() throws Exception {beforeInvoke(); | |
// final check to exit early before starting to run | |
if (canceled) {throw new CancelTaskException(); | |
} | |
// let the task do its work | |
runMailboxLoop(); | |
// if this left the run() method cleanly despite the fact that this was canceled, | |
// make sure the "clean shutdown" is not attempted | |
if (canceled) {throw new CancelTaskException(); | |
} | |
afterInvoke();} |
在 beforeInvoke 办法中通过 operatorChain 的 initializeStateAndOpenOperators 进行状态初始化:
StreamTask.class | |
protected void beforeInvoke() throws Exception {operatorChain = new OperatorChain<>(this, recordWriter); | |
... | |
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer()); | |
... | |
} |
在 operatorChain 中触发以后链中所有 StreamOperator:
OperatorChain.class | |
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {StreamOperator<?> operator = operatorWrapper.getStreamOperator(); | |
operator.initializeState(streamTaskStateInitializer); | |
operator.open();} | |
} |
持续跟进 AbstractStreamOperator 调用 initializeState:
AbstractStreamOperator.class | |
@Override | |
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) | |
throws Exception { | |
final StreamOperatorStateContext context = | |
streamTaskStateManager.streamOperatorStateContext(getOperatorID(), | |
getClass().getSimpleName(), | |
getProcessingTimeService(), | |
this, | |
keySerializer, | |
streamTaskCloseableRegistry, | |
metrics, | |
config.getManagedMemoryFractionOperatorUseCaseOfSlot( | |
ManagedMemoryUseCase.STATE_BACKEND, | |
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), | |
runtimeContext.getUserCodeClassLoader()), | |
isUsingCustomRawKeyedState()); | |
stateHandler = | |
new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry); | |
timeServiceManager = context.internalTimerServiceManager(); | |
stateHandler.initializeOperatorState(this); | |
} |
其中 stateHandler.initializeOperatorState 又将 initializeOperatorState 委派到了 StreamOperatorStateHandler 类,在其中实现具体 StreamOperator 子类的状态初始化。梳理初始化状态的逻辑,可视化体现为:
Kafka Source Operator 对 Flink Checkpoint 的反对
当初将 Checkpoint 的状态快照过程和状态初始化过程画在一起,会看到两者都汇总委派到 StreamOperatorStateHandler 来执行:
StreamOperatorStateHandler 类中 initializeOperatorState 和 snapshotState 办法实现如下,次要实现的是参数的构建:
StreamOperatorStateHandler.class | |
public void initializeOperatorState(CheckpointedStreamOperator streamOperator) | |
throws Exception { | |
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = | |
context.rawKeyedStateInputs(); | |
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = | |
context.rawOperatorStateInputs(); | |
StateInitializationContext initializationContext = | |
new StateInitializationContextImpl(context.isRestored(), // information whether we restore or start for | |
// the first time | |
operatorStateBackend, // access to operator state backend | |
keyedStateStore, // access to keyed state backend | |
keyedStateInputs, // access to keyed state stream | |
operatorStateInputs); // access to operator state stream | |
streamOperator.initializeState(initializationContext); | |
} | |
public OperatorSnapshotFutures snapshotState( | |
CheckpointedStreamOperator streamOperator, | |
Optional<InternalTimeServiceManager<?>> timeServiceManager, | |
String operatorName, | |
long checkpointId, | |
long timestamp, | |
CheckpointOptions checkpointOptions, | |
CheckpointStreamFactory factory, | |
boolean isUsingCustomRawKeyedState) | |
throws CheckpointException { | |
KeyGroupRange keyGroupRange = | |
null != keyedStateBackend | |
? keyedStateBackend.getKeyGroupRange() | |
: KeyGroupRange.EMPTY_KEY_GROUP_RANGE; | |
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); | |
StateSnapshotContextSynchronousImpl snapshotContext = | |
new StateSnapshotContextSynchronousImpl(checkpointId, timestamp, factory, keyGroupRange, closeableRegistry); | |
snapshotState( | |
streamOperator, | |
timeServiceManager, | |
operatorName, | |
checkpointId, | |
timestamp, | |
checkpointOptions, | |
factory, | |
snapshotInProgress, | |
snapshotContext, | |
isUsingCustomRawKeyedState); | |
return snapshotInProgress; | |
} | |
void snapshotState( | |
CheckpointedStreamOperator streamOperator, | |
Optional<InternalTimeServiceManager<?>> timeServiceManager, | |
String operatorName, | |
long checkpointId, | |
long timestamp, | |
CheckpointOptions checkpointOptions, | |
CheckpointStreamFactory factory, | |
OperatorSnapshotFutures snapshotInProgress, | |
StateSnapshotContextSynchronousImpl snapshotContext, | |
boolean isUsingCustomRawKeyedState) | |
throws CheckpointException {if (timeServiceManager.isPresent()) { | |
checkState( | |
keyedStateBackend != null, | |
"keyedStateBackend should be available with timeServiceManager"); | |
final InternalTimeServiceManager<?> manager = timeServiceManager.get(); | |
if (manager.isUsingLegacyRawKeyedStateSnapshots()) { | |
checkState( | |
!isUsingCustomRawKeyedState, | |
"Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write."); | |
manager.snapshotToRawKeyedState(snapshotContext.getRawKeyedOperatorStateOutput(), operatorName); | |
} | |
} | |
streamOperator.snapshotState(snapshotContext); | |
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); | |
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); | |
if (null != operatorStateBackend) { | |
snapshotInProgress.setOperatorStateManagedFuture( | |
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); | |
} | |
if (null != keyedStateBackend) { | |
snapshotInProgress.setKeyedStateManagedFuture( | |
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); | |
} | |
} |
值得一提的是两个办法中的 StreamOperator 参数要求是 CheckpointedStreamOperator 类型:
public interface CheckpointedStreamOperator {void initializeState(StateInitializationContext context) throws Exception; | |
void snapshotState(StateSnapshotContext context) throws Exception; | |
} |
比拟下 StreamOperator,其跟 Checkpoint 相干的三个办法定义如下:
尽管办法名字一样,参数不同,其实不必管这些,只须要晓得 StreamOperator 将快照的相干逻辑委派到了 StreamOperatorStateHandler,真正的快照逻辑都在 CheckpointedStreamOperator 中实现即可,于是,要想实现自定义快照逻辑,只须要实现 CheckpointedStreamOperato 接口,以 SourceOperator 为例,类定义:
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT> | |
implements OperatorEventHandler, PushingAsyncDataInput<OUT> |
而 AbstractStreamOperator 的类定义为:
public abstract class AbstractStreamOperator<OUT> | |
implements StreamOperator<OUT>, | |
SetupableStreamOperator<OUT>, | |
CheckpointedStreamOperator, | |
Serializable |
AbstractStreamOperator 曾经帮咱们实现了相干办法,只须要 extend AbstractStreamOperator,依然以 SourceOperator 为例来看它的实现:
SourceOperator.class | |
private ListState<SplitT> readerState; | |
@Override | |
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context); | |
final ListState<byte[]> rawState = | |
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC); | |
readerState = new SimpleVersionedListState<>(rawState, splitSerializer); | |
} | |
@Override | |
public void snapshotState(StateSnapshotContext context) throws Exception {long checkpointId = context.getCheckpointId(); | |
LOG.debug("Taking a snapshot for checkpoint {}", checkpointId); | |
readerState.update(sourceReader.snapshotState(checkpointId)); | |
} |
可见 SourceOperator 将快照状态存储在内存中的 SimpleVersionedListState 中,snapshotState 的具体操作转给了 SourceReader,来看 Flink Kafka Connector 提供的 KafkaSourceReader 如何实现 snapshotState:
KafkaSourceReader.class | |
KafkaSourceReader extends SourceReaderBase implements SourceReader | |
@Override | |
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId); | |
if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {offsetsToCommit.put(checkpointId, Collections.emptyMap()); | |
} else { | |
Map<TopicPartition, OffsetAndMetadata> offsetsMap = | |
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); | |
// Put the offsets of the active splits. | |
for (KafkaPartitionSplit split : splits) { | |
// If the checkpoint is triggered before the partition starting offsets | |
// is retrieved, do not commit the offsets for those partitions. | |
if (split.getStartingOffset() >= 0) { | |
offsetsMap.put(split.getTopicPartition(), | |
new OffsetAndMetadata(split.getStartingOffset())); | |
} | |
} | |
// Put offsets of all the finished splits. | |
offsetsMap.putAll(offsetsOfFinishedSplits); | |
} | |
return splits; | |
} |
下面是基于内存的状态存储,而长久化还须要内部零碎的反对,持续探索 StreamOperatorStateHandler 的 snapshot 办法逻辑,其中有这么一段:
if (null != operatorStateBackend) { | |
snapshotInProgress.setOperatorStateManagedFuture( | |
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); | |
} |
当配置了长久化后端存储,才会将状态数据长久化,以默认的 OperatorStateBackend 为例:
DefaultOperatorStateBackend.class | |
@Override | |
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( | |
long checkpointId, | |
long timestamp, | |
@Nonnull CheckpointStreamFactory streamFactory, | |
@Nonnull CheckpointOptions checkpointOptions) | |
throws Exception {long syncStartTime = System.currentTimeMillis(); | |
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner = | |
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions); | |
return snapshotRunner; | |
} |
snapshotStrategy.snapshot 执行逻辑实现在 DefaultOperatorStateBackendSnapshotStrategy 中:
DefaultOperatorStateBackendSnapshotStrategy.class | |
@Override | |
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( | |
final long checkpointId, | |
final long timestamp, | |
@Nonnull final CheckpointStreamFactory streamFactory, | |
@Nonnull final CheckpointOptions checkpointOptions) | |
throws IOException { | |
... | |
for (Map.Entry<String, PartitionableListState<?>> entry : | |
registeredOperatorStatesDeepCopies.entrySet()) { | |
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); | |
} | |
... | |
// ... write them all in the checkpoint stream ... | |
DataOutputView dov = new DataOutputViewStreamWrapper(localOut); | |
OperatorBackendSerializationProxy backendSerializationProxy = | |
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); | |
backendSerializationProxy.write(dov); | |
... | |
for (Map.Entry<String, PartitionableListState<?>> entry : | |
registeredOperatorStatesDeepCopies.entrySet()) {PartitionableListState<?> value = entry.getValue(); | |
long[] partitionOffsets = value.write(localOut); | |
} | |
} |
状态数据有元数据信息和状态自身的数据,状态数据通过 PartitionableListState 的 write 办法写入文件系统:
PartitionableListState.class | |
public long[] write(FSDataOutputStream out) throws IOException {long[] partitionOffsets = new long[internalList.size()]; | |
DataOutputView dov = new DataOutputViewStreamWrapper(out); | |
for (int i = 0; i < internalList.size(); ++i) {S element = internalList.get(i); | |
partitionOffsets[i] = out.getPos(); | |
getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov); | |
} | |
return partitionOffsets; | |
} |
Kafka Source Operator 状态复原
下面一部分介绍了 Kafka Source Operator 对 Flink Checkpoint 的反对,也是波及到 snapshot 和 initialState 两个局部,但次要介绍了 snapshot 的逻辑,再来看 SourceOperator 的如何初始化状态的:
SourceOperator.class | |
@Override | |
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context); | |
final ListState<byte[]> rawState = | |
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC); | |
readerState = new SimpleVersionedListState<>(rawState, splitSerializer); | |
} |
context.getOperatorStateStore 应用了 DefaultOperatorStateBackend 的 getListState 办法:
DefaultOperatorStateBackend.class | |
private final Map<String, PartitionableListState<?>> registeredOperatorStates; | |
@Override | |
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); | |
} | |
private <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) | |
throws StateMigrationException { | |
... | |
PartitionableListState<S> partitionableListState = | |
(PartitionableListState<S>) registeredOperatorStates.get(name); | |
... | |
return partitionableListState; | |
} |
而 getListState 仅仅是从名叫 registeredOperatorStates 的 Map> 中获取,那问题来了,registeredOperatorStates 从哪里来?为了找到答案,这部分通过一个 Kafka 生产示例来演示和阐明,首先创立 KafkaSource:
KafkaSource<MetaAndValue> kafkaSource = | |
KafkaSource.<ObjectNode>builder() | |
.setBootstrapServers(Constants.kafkaServers) | |
.setGroupId(KafkaSinkIcebergExample.class.getName()) | |
.setTopics(topic) | |
.setDeserializer(recordDeserializer) | |
.setStartingOffsets(OffsetsInitializer.earliest()) | |
.setBounded(OffsetsInitializer.latest()) | |
.setProperties(properties) | |
.build(); |
并且设置重启策略:StreamOperator 失败后立刻重启一次好快照距离:100 毫秒 1 次:
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(0))); | |
env.getCheckpointConfig().setCheckpointInterval(1 * 100L); |
而后在 Kafka 反序列时候,设置解析 100 条记录后抛出异样:
public static class TestingKafkaRecordDeserializer | |
implements KafkaRecordDeserializer<MetaAndValue> { | |
private static final long serialVersionUID = -3765473065594331694L; | |
private transient Deserializer<String> deserializer = new StringDeserializer(); | |
int parseNum=0; | |
@Override | |
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<MetaAndValue> collector) {if (deserializer == null) | |
deserializer = new StringDeserializer(); | |
MetaAndValue metaAndValue=new MetaAndValue(new TopicPartition(record.topic(), record.partition()), | |
deserializer.deserialize(record.topic(), record.value()), record.offset()); | |
if(parseNum++>100) {Map<String,Object> metaData=metaAndValue.getMetaData(); | |
throw new RuntimeException("for test"); | |
} | |
collector.collect(metaAndValue); | |
} | |
} |
JobMaster 初始化并创立 Scheduler 时候从 Checkpoint 进行状态初始化,如果从 Checkpoint 初始化失败,则试图从 Savepoint 复原。
SchedulerBase.class | |
private ExecutionGraph createAndRestoreExecutionGraph( | |
JobManagerJobMetricGroup currentJobManagerJobMetricGroup, | |
ShuffleMaster<?> shuffleMaster, | |
JobMasterPartitionTracker partitionTracker, | |
ExecutionDeploymentTracker executionDeploymentTracker, | |
long initializationTimestamp) | |
throws Exception { | |
ExecutionGraph newExecutionGraph = | |
createExecutionGraph( | |
currentJobManagerJobMetricGroup, | |
shuffleMaster, | |
partitionTracker, | |
executionDeploymentTracker, | |
initializationTimestamp); | |
final CheckpointCoordinator checkpointCoordinator = | |
newExecutionGraph.getCheckpointCoordinator(); | |
if (checkpointCoordinator != null) { | |
// check whether we find a valid checkpoint | |
if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(new HashSet<>(newExecutionGraph.getAllVertices().values()))) { | |
// check whether we can restore from a savepoint | |
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); | |
} | |
} | |
return newExecutionGraph; | |
} |
最初回到相熟的 CheckpointCoordinator,在其办法 restoreLatestCheckpointedStateInternal 中从 Checkpoint 目录加载最新快照状态:
CheckpointCoordinator.class | |
public boolean restoreInitialCheckpointIfPresent(final Set<ExecutionJobVertex> tasks) | |
throws Exception { | |
final OptionalLong restoredCheckpointId = | |
restoreLatestCheckpointedStateInternal( | |
tasks, | |
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, | |
false, // initial checkpoints exist only on JobManager failover. ok if not | |
// present. | |
false); // JobManager failover means JobGraphs match exactly. | |
return restoredCheckpointId.isPresent();} | |
private OptionalLong restoreLatestCheckpointedStateInternal( | |
final Set<ExecutionJobVertex> tasks, | |
final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior, | |
final boolean errorIfNoCheckpoint, | |
final boolean allowNonRestoredState) | |
throws Exception { | |
... | |
// Recover the checkpoints, TODO this could be done only when there is a new leader, not | |
// on each recovery | |
completedCheckpointStore.recover(); | |
// Restore from the latest checkpoint | |
CompletedCheckpoint latest = | |
completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery); | |
... | |
// re-assign the task states | |
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates(); | |
StateAssignmentOperation stateAssignmentOperation = | |
new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState); | |
stateAssignmentOperation.assignStates(); | |
... | |
} |
下面是利用初始启动的状态复原逻辑,那在利用运行期间的 Operator 失败重启的逻辑又是什么样的呢?实际上 JobMaster 会监听工作运行状态,并做相应解决,比方上面一个失败解决链路逻辑:
UpdateSchedulerNgOnInternalFailuresListener.class | |
@Override | |
public void notifyTaskFailure( | |
final ExecutionAttemptID attemptId, | |
final Throwable t, | |
final boolean cancelTask, | |
final boolean releasePartitions) { | |
final TaskExecutionState state = | |
new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t); | |
schedulerNg.updateTaskExecutionState(new TaskExecutionStateTransition(state, cancelTask, releasePartitions)); | |
} | |
SchedulerBase.class | |
@Override | |
public final boolean updateTaskExecutionState(final TaskExecutionStateTransition taskExecutionState) { | |
final Optional<ExecutionVertexID> executionVertexId = | |
getExecutionVertexId(taskExecutionState.getID()); | |
boolean updateSuccess = executionGraph.updateState(taskExecutionState); | |
if (updateSuccess) {checkState(executionVertexId.isPresent()); | |
if (isNotifiable(executionVertexId.get(), taskExecutionState)) {updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState); | |
} | |
return true; | |
} else {return false;} | |
} | |
DefaultScheduler.class | |
@Override | |
protected void updateTaskExecutionStateInternal( | |
final ExecutionVertexID executionVertexId, | |
final TaskExecutionStateTransition taskExecutionState) { | |
schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState()); | |
maybeHandleTaskFailure(taskExecutionState, executionVertexId); | |
} | |
private void maybeHandleTaskFailure( | |
final TaskExecutionStateTransition taskExecutionState, | |
final ExecutionVertexID executionVertexId) {if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {final Throwable error = taskExecutionState.getError(userCodeLoader); | |
handleTaskFailure(executionVertexId, error); | |
} | |
} | |
private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {setGlobalFailureCause(error); | |
notifyCoordinatorsAboutTaskFailure(executionVertexId, error); | |
final FailureHandlingResult failureHandlingResult = | |
executionFailureHandler.getFailureHandlingResult(executionVertexId, error); | |
maybeRestartTasks(failureHandlingResult); | |
} | |
private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {if (failureHandlingResult.canRestart()) { | |
// 调用 restartTasks | |
restartTasksWithDelay(failureHandlingResult); | |
} else {failJob(failureHandlingResult.getError()); | |
} | |
} | |
private Runnable restartTasks( | |
final Set<ExecutionVertexVersion> executionVertexVersions, | |
final boolean isGlobalRecovery) {return () -> { | |
final Set<ExecutionVertexID> verticesToRestart = | |
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); | |
removeVerticesFromRestartPending(verticesToRestart); | |
resetForNewExecutions(verticesToRestart); | |
try {restoreState(verticesToRestart, isGlobalRecovery); | |
} catch (Throwable t) {handleGlobalFailure(t); | |
return; | |
} | |
schedulingStrategy.restartTasks(verticesToRestart); | |
}; | |
} | |
SchedulerBase.class | |
protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) | |
throws Exception { | |
... | |
if (isGlobalRecovery) { | |
final Set<ExecutionJobVertex> jobVerticesToRestore = | |
getInvolvedExecutionJobVertices(vertices); | |
checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true); | |
} else { | |
final Map<ExecutionJobVertex, IntArrayList> subtasksToRestore = | |
getInvolvedExecutionJobVerticesAndSubtasks(vertices); | |
final OptionalLong restoredCheckpointId = | |
checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(subtasksToRestore.keySet()); | |
// Ideally, the Checkpoint Coordinator would call OperatorCoordinator.resetSubtask, but | |
// the Checkpoint Coordinator is not aware of subtasks in a local failover. It always | |
// assigns state to all subtasks, and for the subtask execution attempts that are still | |
// running (or not waiting to be deployed) the state assignment has simply no effect. | |
// Because of that, we need to do the "subtask restored" notification here. | |
// Once the Checkpoint Coordinator is properly aware of partial (region) recovery, | |
// this code should move into the Checkpoint Coordinator. | |
final long checkpointId = | |
restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT); | |
notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId); | |
} | |
... | |
} |
上述整个链路波及到 DefaultScheduler 和 SchedulerBase,实际上还是在一个运行对象实例中进行的,两者关系为:
public abstract class SchedulerBase implements SchedulerNG | |
public class DefaultScheduler extends SchedulerBase implements SchedulerOperations |
最初又回到了相熟的 CheckpointCoordinator:
CheckpointCoordinator.class | |
public OptionalLong restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception { | |
// when restoring subtasks only we accept potentially unmatched state for the | |
// following reasons | |
// - the set frequently does not include all Job Vertices (only the ones that are part | |
// of the restarted region), meaning there will be unmatched state by design. | |
// - because what we might end up restoring from an original savepoint with unmatched | |
// state, if there is was no checkpoint yet. | |
return restoreLatestCheckpointedStateInternal( | |
tasks, | |
OperatorCoordinatorRestoreBehavior | |
.SKIP, // local/regional recovery does not reset coordinators | |
false, // recovery might come before first successful checkpoint | |
true); // see explanation above | |
} |
在 schedulingStrategy.restartTasks 中,每个 Task 调配的状态被封装在 JobManagerTaskRestore 中,jobManagerTaskRestore 会作为 TaskDeploymentDescriptor 的一个属性下发到 TaskEXecutor 中。当 TaskDeploymentDescriptor 被提交给 TaskExecutor 之后,TaskExcutor 会应用 TaskStateManager 用于治理以后 Task 的状态,TaskStateManager 对象会基于调配的 JobManagerTaskRestore 和本地状态存储 TaskLocalStateStore 进行创立:
TaskEXecutor.class | |
@Override | |
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { | |
... | |
final TaskLocalStateStore localStateStore = | |
localStateStoresManager.localStateStoreForSubtask( | |
jobId, | |
tdd.getAllocationId(), | |
taskInformation.getJobVertexId(), | |
tdd.getSubtaskIndex()); | |
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); | |
final TaskStateManager taskStateManager = | |
new TaskStateManagerImpl( | |
jobId, | |
tdd.getExecutionAttemptId(), | |
localStateStore, | |
taskRestore, | |
checkpointResponder); | |
... | |
// 启动 Task | |
} |
启动 Task 会调用 StreamTask 的 invoke 办法,并在 beforeInvoke 中进行如下初始化:
StreamTask.class | |
protected void beforeInvoke() throws Exception { | |
... | |
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer()); | |
... | |
} | |
public StreamTaskStateInitializer createStreamTaskStateInitializer() { | |
InternalTimeServiceManager.Provider timerServiceProvider = | |
configuration.getTimerServiceProvider(getUserCodeClassLoader()); | |
return new StreamTaskStateInitializerImpl(getEnvironment(), | |
stateBackend, | |
TtlTimeProvider.DEFAULT, | |
timerServiceProvider != null | |
? timerServiceProvider | |
: InternalTimeServiceManagerImpl::create); | |
} |
再回到 operatorChain 的 initializeStateAndOpenOperators 办法:
OperatorChain.class | |
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {StreamOperator<?> operator = operatorWrapper.getStreamOperator(); | |
operator.initializeState(streamTaskStateInitializer); | |
operator.open();} | |
} |
其中 StreamOperator 的 initializeState 调用了子类 AbstractStreamOperator 的 initializeState,并在其中创立 StreamOperatorStateContext:
AbstractStreamOperator.class | |
@Override | |
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) | |
throws Exception { | |
final TypeSerializer<?> keySerializer = | |
config.getStateKeySerializer(getUserCodeClassloader()); | |
final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); | |
final CloseableRegistry streamTaskCloseableRegistry = | |
Preconditions.checkNotNull(containingTask.getCancelables()); | |
final StreamOperatorStateContext context = | |
streamTaskStateManager.streamOperatorStateContext(getOperatorID(), | |
getClass().getSimpleName(), | |
getProcessingTimeService(), | |
this, | |
keySerializer, | |
streamTaskCloseableRegistry, | |
metrics, | |
config.getManagedMemoryFractionOperatorUseCaseOfSlot( | |
ManagedMemoryUseCase.STATE_BACKEND, | |
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), | |
runtimeContext.getUserCodeClassLoader()), | |
isUsingCustomRawKeyedState()); | |
stateHandler = | |
new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry); | |
timeServiceManager = context.internalTimerServiceManager(); | |
stateHandler.initializeOperatorState(this); | |
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); | |
} |
从快照中读取状态数据并复原的理论动作就暗藏在 streamOperatorStateContext 的创立过程中:
StreamTaskStateInitializerImpl.class | |
@Override | |
public StreamOperatorStateContext streamOperatorStateContext( | |
// -------------- Keyed State Backend -------------- | |
keyedStatedBackend = | |
keyedStatedBackend( | |
keySerializer, | |
operatorIdentifierText, | |
prioritizedOperatorSubtaskStates, | |
streamTaskCloseableRegistry, | |
metricGroup, | |
managedMemoryFraction); | |
// -------------- Operator State Backend -------------- | |
operatorStateBackend = | |
operatorStateBackend( | |
operatorIdentifierText, | |
prioritizedOperatorSubtaskStates, | |
streamTaskCloseableRegistry); | |
// -------------- Raw State Streams -------------- | |
rawKeyedStateInputs = | |
rawKeyedStateInputs( | |
prioritizedOperatorSubtaskStates | |
.getPrioritizedRawKeyedState() | |
.iterator()); | |
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); | |
rawOperatorStateInputs = | |
rawOperatorStateInputs( | |
prioritizedOperatorSubtaskStates | |
.getPrioritizedRawOperatorState() | |
.iterator()); | |
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); | |
// -------------- Internal Timer Service Manager -------------- | |
if (keyedStatedBackend != null) { | |
// if the operator indicates that it is using custom raw keyed state, | |
// then whatever was written in the raw keyed state snapshot was NOT written | |
// by the internal timer services (because there is only ever one user of raw keyed | |
// state); | |
// in this case, timers should not attempt to restore timers from the raw keyed | |
// state. | |
final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers = | |
(prioritizedOperatorSubtaskStates.isRestored() | |
&& !isUsingCustomRawKeyedState) | |
? rawKeyedStateInputs | |
: Collections.emptyList(); | |
timeServiceManager = | |
timeServiceManagerProvider.create( | |
keyedStatedBackend, | |
environment.getUserCodeClassLoader().asClassLoader(), | |
keyContext, | |
processingTimeService, | |
restoredRawKeyedStateTimers); | |
} else {timeServiceManager = null;} | |
// -------------- Preparing return value -------------- | |
return new StreamOperatorStateContextImpl(prioritizedOperatorSubtaskStates.isRestored(), | |
operatorStateBackend, | |
keyedStatedBackend, | |
timeServiceManager, | |
rawOperatorStateInputs, | |
rawKeyedStateInputs); | |
} |
到此为止,咱们实现了梳理 Flink Kafka Source Operator Checkpoint 的状态复原流程,这部分逻辑大抵能够划分为两大部分:基于 StateInitializationContext 的状态初始化和从 Checkpoint 复原状态并生成 StateInitializationContext。后者的复原过程远比文章中介绍的简单,实际上 JobMaster 监听到工作失败后,会从 Checkpoint 长久化数据中装载最近一个快照的状态元数据,而后再将状态重新分配各子工作,特地是利用重启级别的复原,还波及到算子拓扑构造和并行度的扭转,JobMaster 状态复原之后再提交工作重启申请,在 TaskManager 端还可能再从本地快照(如果启用的话)复原状态数据。TaskManager 端的状态复原以创立实现 StreamOperatorStateContext 为标记,它包装了快照复原后的残缺数据,接下来就回到了失常的 StreamOperator 的 InitialState 办法调用流程。
总结
本文从 Flink Checkpoint 的解决流程(包含 snapshot 的创立和初始化两局部)和 Kafka 对 Flink Checkpoint 的反对几个局部,从 Flink 的代码实现角度来确定 Flink 的 Checkpoint 是反对 Kafka 的数据生产状态保护的,然而这个状态只是从 StateInitializationContext 对象中获取的,为了进一步验证 StateInitializationContext 的状态是否从 Checkpoint 长久化中获取,本文第四局部联合试验,从 Flink 利用重启和运行时 Operator 失败重试来梳理 Flink 的状态复原逻辑,确定 Flink 是反对从 Checkpoint 或 Savepoint 复原状态。
最初,依据前文的剖析,开发者在开发 Flink 利用时须要留神的是:尽管 Flink 可能将 Kafka 生产状态复原到最近一个 Checkpoint 快照状态,然而无奈防止在两个快照之间的反复生产。一个典型情景是 Sink 端不反对幂等时,有可能造成数据的反复,例如 PrintSink 就无奈撤回快照之间输入的数据。另外,在未开启 Flink Checkpoint 时须要依赖 Kafka Client 本身的 commit 的状态来实现状态保护。