作者:闻乃松
应用 Flink 实时生产 kafka 数据时候,波及到 offset 的状态保护,为了保障 Flink 作业重启或者运行时的 Operator 级别的失败重试,如果要做到“断点续跑”,须要 Flink 的 Checkpoint 的反对。问题是,如果简略的开启 Flink 的 Checkpoint 机制,而不须要额定的编码工作,是否能达到目标?为答复该问题,本文首先要钻研 Flink 的 Checkpoint 的解决机制,而后再看 Flink 是否反对 Kafka 的状态存储,于是本文分以下四个局部:
- Flink Checkpoint 状态快照(snapshotState)次要流程
- Flink Checkpoint 状态初始化(initializeState)次要流程
- Kafka Source Operator 对 Flink Checkpoint 实现
- Kafka Source Operator 状态复原
为了精确形容起见,本文以 Flink 1.12.x 版本,Kafka 客户端版本 2.4.x 为例阐明。
Flink Checkpoint 状态快照(snapshotState)次要流程
咱们已知 Flink Checkpoint 由 CheckpointCoordinator 周期性发动,它通过向相干的 tasks 发送触发音讯和从各 tasks 收集确认音讯(Ack)来实现 checkpoint。这里省略 CheckpointCoordinator 发动调用逻辑解析,直奔音讯受体 TaskExecutor,来看 Checkpoint 的执行流程,在 TaskExecutor 中获取 Task 实例,触发 triggerCheckpointBarrier:
TaskExecutor.class
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
...
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
...
}
}
Task 是在 TaskExecutor 中的调度执行单元,也响应 Checkpoint 申请:
Task.class
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
...
final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointID, checkpointTimestamp);
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
...
}
其中的看点是 invokable,为 AbstractInvokable 类型的对象,依据调用类动静实例化:
// now load and instantiate the task's invokable code
AbstractInvokable invokable =
loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
其中 nameOfInvokableClass 参数 在 Task 初始化时传入,动态创建 AbstractInvokable 实例,比方以一个 SourceOperator 为例,其类名称为:
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask
从 SourceOperatorStreamTask 类定义来看,它又是 StreamTask 的子类:
class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>>
triggerCheckpointAsync 办法接连调用 SourceOperatorStreamTask 和 StreamTask 类的 triggerCheckpointAsync 办法,次要逻辑是在 StreamTask 的 triggerCheckpointAsync 办法中:
StreamTask.class
@Override
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
...
triggerCheckpoint(checkpointMetaData, checkpointOptions)
...
}
private boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
throws Exception {
// No alignment if we inject a checkpoint
CheckpointMetricsBuilder checkpointMetrics =
new CheckpointMetricsBuilder()
.setAlignmentDurationNanos(0L)
.setBytesProcessedDuringAlignment(0L);
...
boolean success =
performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
if (!success) {declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics)
throws Exception {
...
subtaskCheckpointCoordinator.checkpointState(
checkpointMetaData,
checkpointOptions,
checkpointMetrics,
operatorChain,
this::isRunning);
...
}
其中 subtaskCheckpointCoordinator 是 SubtaskCheckpointCoordinatorImpl 类型实例,负责协调子工作相干的 checkpoint 工作:
/**
* Coordinates checkpointing-related work for a subtask (i.e. {@link
* org.apache.flink.runtime.taskmanager.Task Task} and {@link StreamTask}). Responsibilities:
*
* <ol>
* <li>build a snapshot (invokable)
* <li>report snapshot to the JobManager
* <li>action upon checkpoint notification
* <li>maintain storage locations
* </ol>
*/
@Internal
public interface SubtaskCheckpointCoordinator extends Closeable
上面是 SubtaskCheckpointCoordinatorImpl 实现类中的 checkpointState 次要逻辑:
SubtaskCheckpointCoordinatorImpl.class
@Override
public void checkpointState(
CheckpointMetaData metadata,
CheckpointOptions options,
CheckpointMetricsBuilder metrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect
// downstream
// checkpoint alignments
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}",
lastCheckpointId,
metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
}
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to
// checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
return;
}
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastEvent(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
options.isUnalignedCheckpoint());
// Step (3): Prepare to spill the in-flight buffers for input and output
if (options.isUnalignedCheckpoint()) {
// output data already written while broadcasting event
channelStateWriter.finishOutput(metadata.getCheckpointId());
}
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact
// progress of the
// streaming topology
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
new HashMap<>(operatorChain.getNumberOfOperators());
if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);
} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
}
在 Step (1) 中看到 prepareSnapshotPreBarrier,在正式 snapshot 之前做了一些轻量级的筹备工作,具体操作实现在 OperatorChain 中,顺次调用链中每个 StreamOperator 的 prepareSnapshotPreBarrier 办法:
OperatorChain.class
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// go forward through the operator chain and tell each operator
// to prepare the checkpoint
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {if (!operatorWrapper.isClosed()) {operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
}
}
}
通过一系列快照查看验证、快照前的筹备、向上游播送事件操作,最终落脚到本类的 checkpointStreamOperator 办法:
SubtaskCheckpointCoordinatorImpl.class
private static OperatorSnapshotFutures checkpointStreamOperator(
StreamOperator<?> op,
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory storageLocation,
Supplier<Boolean> isRunning)
throws Exception {
try {
return op.snapshotState(checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions,
storageLocation);
} catch (Exception ex) {if (isRunning.get()) {LOG.info(ex.getMessage(), ex);
}
throw ex;
}
}
该办法又调用 AbstractStreamOperator 的 snapshotState:
AbstractStreamOperator.class
@Override
public final OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory)
throws Exception {
return stateHandler.snapshotState(
this,
Optional.ofNullable(timeServiceManager),
getOperatorName(),
checkpointId,
timestamp,
checkpointOptions,
factory,
isUsingCustomRawKeyedState());
}
snapshotState 又将 checkpoint 逻辑委派到 StreamOperatorStateHandler。StreamOperatorStateHandler 的逻辑下文再介绍。梳理上述 snapshot 逻辑流程,可视化体现为:
Flink Checkpoint 状态初始化(initializeState)次要流程
上文中提到的 Task 初始化启动会调用 AbstractInvokable 的 invoke 办法,
// now load and instantiate the task's invokable code
AbstractInvokable invokable =
loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
// run the invokable
invokable.invoke();
invoke 在其父类 StreamTask 中的 invoke 办法实现调用前、运行事件循环和调用后的 Template 策略动作:
StreamTask.class
@Override
public final void invoke() throws Exception {beforeInvoke();
// final check to exit early before starting to run
if (canceled) {throw new CancelTaskException();
}
// let the task do its work
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {throw new CancelTaskException();
}
afterInvoke();}
在 beforeInvoke 办法中通过 operatorChain 的 initializeStateAndOpenOperators 进行状态初始化:
StreamTask.class
protected void beforeInvoke() throws Exception {operatorChain = new OperatorChain<>(this, recordWriter);
...
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
...
}
在 operatorChain 中触发以后链中所有 StreamOperator:
OperatorChain.class
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {StreamOperator<?> operator = operatorWrapper.getStreamOperator();
operator.initializeState(streamTaskStateInitializer);
operator.open();}
}
持续跟进 AbstractStreamOperator 调用 initializeState:
AbstractStreamOperator.class
@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(getOperatorID(),
getClass().getSimpleName(),
getProcessingTimeService(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics,
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());
stateHandler =
new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
timeServiceManager = context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
}
其中 stateHandler.initializeOperatorState 又将 initializeOperatorState 委派到了 StreamOperatorStateHandler 类,在其中实现具体 StreamOperator 子类的状态初始化。梳理初始化状态的逻辑,可视化体现为:
Kafka Source Operator 对 Flink Checkpoint 的反对
当初将 Checkpoint 的状态快照过程和状态初始化过程画在一起,会看到两者都汇总委派到 StreamOperatorStateHandler 来执行:
StreamOperatorStateHandler 类中 initializeOperatorState 和 snapshotState 办法实现如下,次要实现的是参数的构建:
StreamOperatorStateHandler.class
public void initializeOperatorState(CheckpointedStreamOperator streamOperator)
throws Exception {
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs =
context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs =
context.rawOperatorStateInputs();
StateInitializationContext initializationContext =
new StateInitializationContextImpl(context.isRestored(), // information whether we restore or start for
// the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
streamOperator.initializeState(initializationContext);
}
public OperatorSnapshotFutures snapshotState(
CheckpointedStreamOperator streamOperator,
Optional<InternalTimeServiceManager<?>> timeServiceManager,
String operatorName,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory,
boolean isUsingCustomRawKeyedState)
throws CheckpointException {
KeyGroupRange keyGroupRange =
null != keyedStateBackend
? keyedStateBackend.getKeyGroupRange()
: KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
StateSnapshotContextSynchronousImpl snapshotContext =
new StateSnapshotContextSynchronousImpl(checkpointId, timestamp, factory, keyGroupRange, closeableRegistry);
snapshotState(
streamOperator,
timeServiceManager,
operatorName,
checkpointId,
timestamp,
checkpointOptions,
factory,
snapshotInProgress,
snapshotContext,
isUsingCustomRawKeyedState);
return snapshotInProgress;
}
void snapshotState(
CheckpointedStreamOperator streamOperator,
Optional<InternalTimeServiceManager<?>> timeServiceManager,
String operatorName,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory,
OperatorSnapshotFutures snapshotInProgress,
StateSnapshotContextSynchronousImpl snapshotContext,
boolean isUsingCustomRawKeyedState)
throws CheckpointException {if (timeServiceManager.isPresent()) {
checkState(
keyedStateBackend != null,
"keyedStateBackend should be available with timeServiceManager");
final InternalTimeServiceManager<?> manager = timeServiceManager.get();
if (manager.isUsingLegacyRawKeyedStateSnapshots()) {
checkState(
!isUsingCustomRawKeyedState,
"Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
manager.snapshotToRawKeyedState(snapshotContext.getRawKeyedOperatorStateOutput(), operatorName);
}
}
streamOperator.snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
}
值得一提的是两个办法中的 StreamOperator 参数要求是 CheckpointedStreamOperator 类型:
public interface CheckpointedStreamOperator {void initializeState(StateInitializationContext context) throws Exception;
void snapshotState(StateSnapshotContext context) throws Exception;
}
比拟下 StreamOperator,其跟 Checkpoint 相干的三个办法定义如下:
尽管办法名字一样,参数不同,其实不必管这些,只须要晓得 StreamOperator 将快照的相干逻辑委派到了 StreamOperatorStateHandler,真正的快照逻辑都在 CheckpointedStreamOperator 中实现即可,于是,要想实现自定义快照逻辑,只须要实现 CheckpointedStreamOperato 接口,以 SourceOperator 为例,类定义:
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>
implements OperatorEventHandler, PushingAsyncDataInput<OUT>
而 AbstractStreamOperator 的类定义为:
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>,
SetupableStreamOperator<OUT>,
CheckpointedStreamOperator,
Serializable
AbstractStreamOperator 曾经帮咱们实现了相干办法,只须要 extend AbstractStreamOperator,依然以 SourceOperator 为例来看它的实现:
SourceOperator.class
private ListState<SplitT> readerState;
@Override
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);
final ListState<byte[]> rawState =
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {long checkpointId = context.getCheckpointId();
LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);
readerState.update(sourceReader.snapshotState(checkpointId));
}
可见 SourceOperator 将快照状态存储在内存中的 SimpleVersionedListState 中,snapshotState 的具体操作转给了 SourceReader,来看 Flink Kafka Connector 提供的 KafkaSourceReader 如何实现 snapshotState:
KafkaSourceReader.class
KafkaSourceReader extends SourceReaderBase implements SourceReader
@Override
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {offsetsToCommit.put(checkpointId, Collections.emptyMap());
} else {
Map<TopicPartition, OffsetAndMetadata> offsetsMap =
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the offsets of the active splits.
for (KafkaPartitionSplit split : splits) {
// If the checkpoint is triggered before the partition starting offsets
// is retrieved, do not commit the offsets for those partitions.
if (split.getStartingOffset() >= 0) {
offsetsMap.put(split.getTopicPartition(),
new OffsetAndMetadata(split.getStartingOffset()));
}
}
// Put offsets of all the finished splits.
offsetsMap.putAll(offsetsOfFinishedSplits);
}
return splits;
}
下面是基于内存的状态存储,而长久化还须要内部零碎的反对,持续探索 StreamOperatorStateHandler 的 snapshot 办法逻辑,其中有这么一段:
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
当配置了长久化后端存储,才会将状态数据长久化,以默认的 OperatorStateBackend 为例:
DefaultOperatorStateBackend.class
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {long syncStartTime = System.currentTimeMillis();
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
return snapshotRunner;
}
snapshotStrategy.snapshot 执行逻辑实现在 DefaultOperatorStateBackendSnapshotStrategy 中:
DefaultOperatorStateBackendSnapshotStrategy.class
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull final CheckpointOptions checkpointOptions)
throws IOException {
...
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
...
// ... write them all in the checkpoint stream ...
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
...
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {PartitionableListState<?> value = entry.getValue();
long[] partitionOffsets = value.write(localOut);
}
}
状态数据有元数据信息和状态自身的数据,状态数据通过 PartitionableListState 的 write 办法写入文件系统:
PartitionableListState.class
public long[] write(FSDataOutputStream out) throws IOException {long[] partitionOffsets = new long[internalList.size()];
DataOutputView dov = new DataOutputViewStreamWrapper(out);
for (int i = 0; i < internalList.size(); ++i) {S element = internalList.get(i);
partitionOffsets[i] = out.getPos();
getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
}
return partitionOffsets;
}
Kafka Source Operator 状态复原
下面一部分介绍了 Kafka Source Operator 对 Flink Checkpoint 的反对,也是波及到 snapshot 和 initialState 两个局部,但次要介绍了 snapshot 的逻辑,再来看 SourceOperator 的如何初始化状态的:
SourceOperator.class
@Override
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);
final ListState<byte[]> rawState =
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
}
context.getOperatorStateStore 应用了 DefaultOperatorStateBackend 的 getListState 办法:
DefaultOperatorStateBackend.class
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}
private <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode)
throws StateMigrationException {
...
PartitionableListState<S> partitionableListState =
(PartitionableListState<S>) registeredOperatorStates.get(name);
...
return partitionableListState;
}
而 getListState 仅仅是从名叫 registeredOperatorStates 的 Map> 中获取,那问题来了,registeredOperatorStates 从哪里来?为了找到答案,这部分通过一个 Kafka 生产示例来演示和阐明,首先创立 KafkaSource:
KafkaSource<MetaAndValue> kafkaSource =
KafkaSource.<ObjectNode>builder()
.setBootstrapServers(Constants.kafkaServers)
.setGroupId(KafkaSinkIcebergExample.class.getName())
.setTopics(topic)
.setDeserializer(recordDeserializer)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.setProperties(properties)
.build();
并且设置重启策略:StreamOperator 失败后立刻重启一次好快照距离:100 毫秒 1 次:
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(0)));
env.getCheckpointConfig().setCheckpointInterval(1 * 100L);
而后在 Kafka 反序列时候,设置解析 100 条记录后抛出异样:
public static class TestingKafkaRecordDeserializer
implements KafkaRecordDeserializer<MetaAndValue> {
private static final long serialVersionUID = -3765473065594331694L;
private transient Deserializer<String> deserializer = new StringDeserializer();
int parseNum=0;
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<MetaAndValue> collector) {if (deserializer == null)
deserializer = new StringDeserializer();
MetaAndValue metaAndValue=new MetaAndValue(new TopicPartition(record.topic(), record.partition()),
deserializer.deserialize(record.topic(), record.value()), record.offset());
if(parseNum++>100) {Map<String,Object> metaData=metaAndValue.getMetaData();
throw new RuntimeException("for test");
}
collector.collect(metaAndValue);
}
}
JobMaster 初始化并创立 Scheduler 时候从 Checkpoint 进行状态初始化,如果从 Checkpoint 初始化失败,则试图从 Savepoint 复原。
SchedulerBase.class
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp)
throws Exception {
ExecutionGraph newExecutionGraph =
createExecutionGraph(
currentJobManagerJobMetricGroup,
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp);
final CheckpointCoordinator checkpointCoordinator =
newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
return newExecutionGraph;
}
最初回到相熟的 CheckpointCoordinator,在其办法 restoreLatestCheckpointedStateInternal 中从 Checkpoint 目录加载最新快照状态:
CheckpointCoordinator.class
public boolean restoreInitialCheckpointIfPresent(final Set<ExecutionJobVertex> tasks)
throws Exception {
final OptionalLong restoredCheckpointId =
restoreLatestCheckpointedStateInternal(
tasks,
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
false, // initial checkpoints exist only on JobManager failover. ok if not
// present.
false); // JobManager failover means JobGraphs match exactly.
return restoredCheckpointId.isPresent();}
private OptionalLong restoreLatestCheckpointedStateInternal(
final Set<ExecutionJobVertex> tasks,
final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior,
final boolean errorIfNoCheckpoint,
final boolean allowNonRestoredState)
throws Exception {
...
// Recover the checkpoints, TODO this could be done only when there is a new leader, not
// on each recovery
completedCheckpointStore.recover();
// Restore from the latest checkpoint
CompletedCheckpoint latest =
completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);
...
// re-assign the task states
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
StateAssignmentOperation stateAssignmentOperation =
new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
stateAssignmentOperation.assignStates();
...
}
下面是利用初始启动的状态复原逻辑,那在利用运行期间的 Operator 失败重启的逻辑又是什么样的呢?实际上 JobMaster 会监听工作运行状态,并做相应解决,比方上面一个失败解决链路逻辑:
UpdateSchedulerNgOnInternalFailuresListener.class
@Override
public void notifyTaskFailure(
final ExecutionAttemptID attemptId,
final Throwable t,
final boolean cancelTask,
final boolean releasePartitions) {
final TaskExecutionState state =
new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t);
schedulerNg.updateTaskExecutionState(new TaskExecutionStateTransition(state, cancelTask, releasePartitions));
}
SchedulerBase.class
@Override
public final boolean updateTaskExecutionState(final TaskExecutionStateTransition taskExecutionState) {
final Optional<ExecutionVertexID> executionVertexId =
getExecutionVertexId(taskExecutionState.getID());
boolean updateSuccess = executionGraph.updateState(taskExecutionState);
if (updateSuccess) {checkState(executionVertexId.isPresent());
if (isNotifiable(executionVertexId.get(), taskExecutionState)) {updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
}
return true;
} else {return false;}
}
DefaultScheduler.class
@Override
protected void updateTaskExecutionStateInternal(
final ExecutionVertexID executionVertexId,
final TaskExecutionStateTransition taskExecutionState) {
schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState());
maybeHandleTaskFailure(taskExecutionState, executionVertexId);
}
private void maybeHandleTaskFailure(
final TaskExecutionStateTransition taskExecutionState,
final ExecutionVertexID executionVertexId) {if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {final Throwable error = taskExecutionState.getError(userCodeLoader);
handleTaskFailure(executionVertexId, error);
}
}
private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {setGlobalFailureCause(error);
notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
final FailureHandlingResult failureHandlingResult =
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
maybeRestartTasks(failureHandlingResult);
}
private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {if (failureHandlingResult.canRestart()) {
// 调用 restartTasks
restartTasksWithDelay(failureHandlingResult);
} else {failJob(failureHandlingResult.getError());
}
}
private Runnable restartTasks(
final Set<ExecutionVertexVersion> executionVertexVersions,
final boolean isGlobalRecovery) {return () -> {
final Set<ExecutionVertexID> verticesToRestart =
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
removeVerticesFromRestartPending(verticesToRestart);
resetForNewExecutions(verticesToRestart);
try {restoreState(verticesToRestart, isGlobalRecovery);
} catch (Throwable t) {handleGlobalFailure(t);
return;
}
schedulingStrategy.restartTasks(verticesToRestart);
};
}
SchedulerBase.class
protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery)
throws Exception {
...
if (isGlobalRecovery) {
final Set<ExecutionJobVertex> jobVerticesToRestore =
getInvolvedExecutionJobVertices(vertices);
checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);
} else {
final Map<ExecutionJobVertex, IntArrayList> subtasksToRestore =
getInvolvedExecutionJobVerticesAndSubtasks(vertices);
final OptionalLong restoredCheckpointId =
checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(subtasksToRestore.keySet());
// Ideally, the Checkpoint Coordinator would call OperatorCoordinator.resetSubtask, but
// the Checkpoint Coordinator is not aware of subtasks in a local failover. It always
// assigns state to all subtasks, and for the subtask execution attempts that are still
// running (or not waiting to be deployed) the state assignment has simply no effect.
// Because of that, we need to do the "subtask restored" notification here.
// Once the Checkpoint Coordinator is properly aware of partial (region) recovery,
// this code should move into the Checkpoint Coordinator.
final long checkpointId =
restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT);
notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId);
}
...
}
上述整个链路波及到 DefaultScheduler 和 SchedulerBase,实际上还是在一个运行对象实例中进行的,两者关系为:
public abstract class SchedulerBase implements SchedulerNG
public class DefaultScheduler extends SchedulerBase implements SchedulerOperations
最初又回到了相熟的 CheckpointCoordinator:
CheckpointCoordinator.class
public OptionalLong restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception {
// when restoring subtasks only we accept potentially unmatched state for the
// following reasons
// - the set frequently does not include all Job Vertices (only the ones that are part
// of the restarted region), meaning there will be unmatched state by design.
// - because what we might end up restoring from an original savepoint with unmatched
// state, if there is was no checkpoint yet.
return restoreLatestCheckpointedStateInternal(
tasks,
OperatorCoordinatorRestoreBehavior
.SKIP, // local/regional recovery does not reset coordinators
false, // recovery might come before first successful checkpoint
true); // see explanation above
}
在 schedulingStrategy.restartTasks 中,每个 Task 调配的状态被封装在 JobManagerTaskRestore 中,jobManagerTaskRestore 会作为 TaskDeploymentDescriptor 的一个属性下发到 TaskEXecutor 中。当 TaskDeploymentDescriptor 被提交给 TaskExecutor 之后,TaskExcutor 会应用 TaskStateManager 用于治理以后 Task 的状态,TaskStateManager 对象会基于调配的 JobManagerTaskRestore 和本地状态存储 TaskLocalStateStore 进行创立:
TaskEXecutor.class
@Override
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
...
final TaskLocalStateStore localStateStore =
localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex());
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
final TaskStateManager taskStateManager =
new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
taskRestore,
checkpointResponder);
...
// 启动 Task
}
启动 Task 会调用 StreamTask 的 invoke 办法,并在 beforeInvoke 中进行如下初始化:
StreamTask.class
protected void beforeInvoke() throws Exception {
...
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
...
}
public StreamTaskStateInitializer createStreamTaskStateInitializer() {
InternalTimeServiceManager.Provider timerServiceProvider =
configuration.getTimerServiceProvider(getUserCodeClassLoader());
return new StreamTaskStateInitializerImpl(getEnvironment(),
stateBackend,
TtlTimeProvider.DEFAULT,
timerServiceProvider != null
? timerServiceProvider
: InternalTimeServiceManagerImpl::create);
}
再回到 operatorChain 的 initializeStateAndOpenOperators 办法:
OperatorChain.class
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {StreamOperator<?> operator = operatorWrapper.getStreamOperator();
operator.initializeState(streamTaskStateInitializer);
operator.open();}
}
其中 StreamOperator 的 initializeState 调用了子类 AbstractStreamOperator 的 initializeState,并在其中创立 StreamOperatorStateContext:
AbstractStreamOperator.class
@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(getOperatorID(),
getClass().getSimpleName(),
getProcessingTimeService(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics,
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());
stateHandler =
new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
timeServiceManager = context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
从快照中读取状态数据并复原的理论动作就暗藏在 streamOperatorStateContext 的创立过程中:
StreamTaskStateInitializerImpl.class
@Override
public StreamOperatorStateContext streamOperatorStateContext(
// -------------- Keyed State Backend --------------
keyedStatedBackend =
keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup,
managedMemoryFraction);
// -------------- Operator State Backend --------------
operatorStateBackend =
operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry);
// -------------- Raw State Streams --------------
rawKeyedStateInputs =
rawKeyedStateInputs(
prioritizedOperatorSubtaskStates
.getPrioritizedRawKeyedState()
.iterator());
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs =
rawOperatorStateInputs(
prioritizedOperatorSubtaskStates
.getPrioritizedRawOperatorState()
.iterator());
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
if (keyedStatedBackend != null) {
// if the operator indicates that it is using custom raw keyed state,
// then whatever was written in the raw keyed state snapshot was NOT written
// by the internal timer services (because there is only ever one user of raw keyed
// state);
// in this case, timers should not attempt to restore timers from the raw keyed
// state.
final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =
(prioritizedOperatorSubtaskStates.isRestored()
&& !isUsingCustomRawKeyedState)
? rawKeyedStateInputs
: Collections.emptyList();
timeServiceManager =
timeServiceManagerProvider.create(
keyedStatedBackend,
environment.getUserCodeClassLoader().asClassLoader(),
keyContext,
processingTimeService,
restoredRawKeyedStateTimers);
} else {timeServiceManager = null;}
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
}
到此为止,咱们实现了梳理 Flink Kafka Source Operator Checkpoint 的状态复原流程,这部分逻辑大抵能够划分为两大部分:基于 StateInitializationContext 的状态初始化和从 Checkpoint 复原状态并生成 StateInitializationContext。后者的复原过程远比文章中介绍的简单,实际上 JobMaster 监听到工作失败后,会从 Checkpoint 长久化数据中装载最近一个快照的状态元数据,而后再将状态重新分配各子工作,特地是利用重启级别的复原,还波及到算子拓扑构造和并行度的扭转,JobMaster 状态复原之后再提交工作重启申请,在 TaskManager 端还可能再从本地快照(如果启用的话)复原状态数据。TaskManager 端的状态复原以创立实现 StreamOperatorStateContext 为标记,它包装了快照复原后的残缺数据,接下来就回到了失常的 StreamOperator 的 InitialState 办法调用流程。
总结
本文从 Flink Checkpoint 的解决流程(包含 snapshot 的创立和初始化两局部)和 Kafka 对 Flink Checkpoint 的反对几个局部,从 Flink 的代码实现角度来确定 Flink 的 Checkpoint 是反对 Kafka 的数据生产状态保护的,然而这个状态只是从 StateInitializationContext 对象中获取的,为了进一步验证 StateInitializationContext 的状态是否从 Checkpoint 长久化中获取,本文第四局部联合试验,从 Flink 利用重启和运行时 Operator 失败重试来梳理 Flink 的状态复原逻辑,确定 Flink 是反对从 Checkpoint 或 Savepoint 复原状态。
最初,依据前文的剖析,开发者在开发 Flink 利用时须要留神的是:尽管 Flink 可能将 Kafka 生产状态复原到最近一个 Checkpoint 快照状态,然而无奈防止在两个快照之间的反复生产。一个典型情景是 Sink 端不反对幂等时,有可能造成数据的反复,例如 PrintSink 就无奈撤回快照之间输入的数据。另外,在未开启 Flink Checkpoint 时须要依赖 Kafka Client 本身的 commit 的状态来实现状态保护。