作者:闻乃松
应用Flink实时生产kafka数据时候,波及到offset的状态保护,为了保障Flink作业重启或者运行时的Operator级别的失败重试,如果要做到“断点续跑”,须要Flink的Checkpoint的反对。问题是,如果简略的开启Flink的Checkpoint机制,而不须要额定的编码工作,是否能达到目标?为答复该问题,本文首先要钻研Flink的Checkpoint的解决机制,而后再看Flink是否反对Kafka的状态存储,于是本文分以下四个局部:
- Flink Checkpoint 状态快照(snapshotState)次要流程
- Flink Checkpoint 状态初始化(initializeState)次要流程
- Kafka Source Operator 对Flink Checkpoint实现
- Kafka Source Operator状态复原
为了精确形容起见,本文以Flink 1.12.x版本,Kafka客户端版本2.4.x为例阐明。
Flink Checkpoint 状态快照(snapshotState)次要流程
咱们已知 Flink Checkpoint由CheckpointCoordinator周期性发动,它通过向相干的tasks发送触发音讯和从各tasks收集确认音讯(Ack)来实现checkpoint。这里省略CheckpointCoordinator发动调用逻辑解析,直奔音讯受体TaskExecutor,来看Checkpoint的执行流程,在TaskExecutor中获取Task实例,触发triggerCheckpointBarrier:
TaskExecutor.class @Override public CompletableFuture<Acknowledge> triggerCheckpoint( ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) { ... final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); return CompletableFuture.completedFuture(Acknowledge.get()); ... } }
Task是在TaskExecutor中的调度执行单元,也响应Checkpoint申请:
Task.class public void triggerCheckpointBarrier( final long checkpointID, final long checkpointTimestamp, final CheckpointOptions checkpointOptions) { ... final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); ... }
其中的看点是invokable,为AbstractInvokable类型的对象,依据调用类动静实例化:
// now load and instantiate the task's invokable codeAbstractInvokable invokable = loadAndInstantiateInvokable( userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
其中 nameOfInvokableClass参数 在Task初始化时传入,动态创建AbstractInvokable实例,比方以一个SourceOperator为例,其类名称为:
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask
从SourceOperatorStreamTask类定义来看,它又是StreamTask的子类:
class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>>
triggerCheckpointAsync办法接连调用SourceOperatorStreamTask和StreamTask类的triggerCheckpointAsync办法,次要逻辑是在StreamTask的triggerCheckpointAsync办法中:
StreamTask.class @Override public Future<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { ... triggerCheckpoint(checkpointMetaData, checkpointOptions) ... } private boolean triggerCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { // No alignment if we inject a checkpoint CheckpointMetricsBuilder checkpointMetrics = new CheckpointMetricsBuilder() .setAlignmentDurationNanos(0L) .setBytesProcessedDuringAlignment(0L); ... boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); if (!success) { declineCheckpoint(checkpointMetaData.getCheckpointId()); } return success; } private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws Exception { ... subtaskCheckpointCoordinator.checkpointState( checkpointMetaData, checkpointOptions, checkpointMetrics, operatorChain, this::isRunning); ... }
其中subtaskCheckpointCoordinator是SubtaskCheckpointCoordinatorImpl类型实例,负责协调子工作相干的checkpoint工作:
/** * Coordinates checkpointing-related work for a subtask (i.e. {@link * org.apache.flink.runtime.taskmanager.Task Task} and {@link StreamTask}). Responsibilities: * * <ol> * <li>build a snapshot (invokable) * <li>report snapshot to the JobManager * <li>action upon checkpoint notification * <li>maintain storage locations * </ol> */@Internalpublic interface SubtaskCheckpointCoordinator extends Closeable
上面是SubtaskCheckpointCoordinatorImpl实现类中的checkpointState次要逻辑:
SubtaskCheckpointCoordinatorImpl.class @Override public void checkpointState( CheckpointMetaData metadata, CheckpointOptions options, CheckpointMetricsBuilder metrics, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception { // All of the following steps happen as an atomic step from the perspective of barriers and // records/watermarks/timers/callbacks. // We generally try to emit the checkpoint barrier as soon as possible to not affect // downstream // checkpoint alignments if (lastCheckpointId >= metadata.getCheckpointId()) { LOG.info( "Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId()); channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); checkAndClearAbortedStatus(metadata.getCheckpointId()); return; } lastCheckpointId = metadata.getCheckpointId(); if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { // broadcast cancel checkpoint marker to avoid downstream back-pressure due to // checkpoint barrier align. operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId())); return; } // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. // The pre-barrier work should be nothing or minimal in the common case. operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId()); // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastEvent( new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), options.isUnalignedCheckpoint()); // Step (3): Prepare to spill the in-flight buffers for input and output if (options.isUnalignedCheckpoint()) { // output data already written while broadcasting event channelStateWriter.finishOutput(metadata.getCheckpointId()); } // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact // progress of the // streaming topology Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators()); if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) { finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning); } else { cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined")); } }
在Step (1)中看到prepareSnapshotPreBarrier,在正式snapshot之前做了一些轻量级的筹备工作,具体操作实现在OperatorChain中,顺次调用链中每个StreamOperator的prepareSnapshotPreBarrier办法:
OperatorChain.class public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // go forward through the operator chain and tell each operator // to prepare the checkpoint for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) { if (!operatorWrapper.isClosed()) { operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId); } } }
通过一系列快照查看验证、快照前的筹备、向上游播送事件操作,最终落脚到本类的checkpointStreamOperator办法:
SubtaskCheckpointCoordinatorImpl.class private static OperatorSnapshotFutures checkpointStreamOperator( StreamOperator<?> op, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation, Supplier<Boolean> isRunning) throws Exception { try { return op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions, storageLocation); } catch (Exception ex) { if (isRunning.get()) { LOG.info(ex.getMessage(), ex); } throw ex; } }
该办法又调用AbstractStreamOperator的snapshotState:
AbstractStreamOperator.class @Override public final OperatorSnapshotFutures snapshotState( long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { return stateHandler.snapshotState( this, Optional.ofNullable(timeServiceManager), getOperatorName(), checkpointId, timestamp, checkpointOptions, factory, isUsingCustomRawKeyedState()); }
snapshotState又将checkpoint逻辑委派到StreamOperatorStateHandler。StreamOperatorStateHandler的逻辑下文再介绍。梳理上述snapshot逻辑流程,可视化体现为:
Flink Checkpoint 状态初始化(initializeState)次要流程
上文中提到的Task初始化启动会调用AbstractInvokable 的invoke办法,
// now load and instantiate the task's invokable codeAbstractInvokable invokable = loadAndInstantiateInvokable( userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);// run the invokableinvokable.invoke();
invoke在其父类StreamTask中的invoke办法实现调用前、运行事件循环和调用后的Template 策略动作:
StreamTask.class @Override public final void invoke() throws Exception { beforeInvoke(); // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } afterInvoke(); }
在beforeInvoke办法中通过operatorChain的initializeStateAndOpenOperators进行状态初始化:
StreamTask.class protected void beforeInvoke() throws Exception { operatorChain = new OperatorChain<>(this, recordWriter); ... operatorChain.initializeStateAndOpenOperators( createStreamTaskStateInitializer()); ... }
在operatorChain中触发以后链中所有StreamOperator:
OperatorChain.classprotected void initializeStateAndOpenOperators( StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) { StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); }}
持续跟进AbstractStreamOperator调用initializeState:
AbstractStreamOperator.class @Override public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this);}
其中stateHandler.initializeOperatorState又将initializeOperatorState委派到了StreamOperatorStateHandler类,在其中实现具体StreamOperator子类的状态初始化。梳理初始化状态的逻辑,可视化体现为:
Kafka Source Operator 对Flink Checkpoint的反对
当初将Checkpoint的状态快照过程和状态初始化过程画在一起,会看到两者都汇总委派到StreamOperatorStateHandler来执行:
StreamOperatorStateHandler类中initializeOperatorState和snapshotState办法实现如下,次要实现的是参数的构建:
StreamOperatorStateHandler.class public void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception { CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs(); CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs(); StateInitializationContext initializationContext = new StateInitializationContextImpl( context.isRestored(), // information whether we restore or start for // the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateInputs, // access to keyed state stream operatorStateInputs); // access to operator state stream streamOperator.initializeState(initializationContext); } public OperatorSnapshotFutures snapshotState( CheckpointedStreamOperator streamOperator, Optional<InternalTimeServiceManager<?>> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, boolean isUsingCustomRawKeyedState) throws CheckpointException { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, closeableRegistry); snapshotState( streamOperator, timeServiceManager, operatorName, checkpointId, timestamp, checkpointOptions, factory, snapshotInProgress, snapshotContext, isUsingCustomRawKeyedState); return snapshotInProgress; } void snapshotState( CheckpointedStreamOperator streamOperator, Optional<InternalTimeServiceManager<?>> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, OperatorSnapshotFutures snapshotInProgress, StateSnapshotContextSynchronousImpl snapshotContext, boolean isUsingCustomRawKeyedState) throws CheckpointException { if (timeServiceManager.isPresent()) { checkState( keyedStateBackend != null, "keyedStateBackend should be available with timeServiceManager"); final InternalTimeServiceManager<?> manager = timeServiceManager.get(); if (manager.isUsingLegacyRawKeyedStateSnapshots()) { checkState( !isUsingCustomRawKeyedState, "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write."); manager.snapshotToRawKeyedState( snapshotContext.getRawKeyedOperatorStateOutput(), operatorName); } } streamOperator.snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture( snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); } }
值得一提的是两个办法中的StreamOperator参数要求是CheckpointedStreamOperator 类型:
public interface CheckpointedStreamOperator { void initializeState(StateInitializationContext context) throws Exception; void snapshotState(StateSnapshotContext context) throws Exception;}
比拟下StreamOperator,其跟Checkpoint相干的三个办法定义如下:
尽管办法名字一样,参数不同,其实不必管这些,只须要晓得StreamOperator将快照的相干逻辑委派到了StreamOperatorStateHandler,真正的快照逻辑都在CheckpointedStreamOperator中实现即可,于是,要想实现自定义快照逻辑,只须要实现CheckpointedStreamOperato接口,以SourceOperator为例,类定义:
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT> implements OperatorEventHandler, PushingAsyncDataInput<OUT>
而AbstractStreamOperator的类定义为:
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, CheckpointedStreamOperator, Serializable
AbstractStreamOperator曾经帮咱们实现了相干办法,只须要extend AbstractStreamOperator,依然以SourceOperator为例来看它的实现:
SourceOperator.class private ListState<SplitT> readerState; @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC); readerState = new SimpleVersionedListState<>(rawState, splitSerializer); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { long checkpointId = context.getCheckpointId(); LOG.debug("Taking a snapshot for checkpoint {}", checkpointId); readerState.update(sourceReader.snapshotState(checkpointId)); }
可见SourceOperator将快照状态存储在内存中的SimpleVersionedListState中,snapshotState的具体操作转给了SourceReader,来看Flink Kafka Connector提供的KafkaSourceReader 如何实现snapshotState:
KafkaSourceReader.classKafkaSourceReader extends SourceReaderBase implements SourceReader @Override public List<KafkaPartitionSplit> snapshotState(long checkpointId) { List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId); if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) { offsetsToCommit.put(checkpointId, Collections.emptyMap()); } else { Map<TopicPartition, OffsetAndMetadata> offsetsMap = offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); // Put the offsets of the active splits. for (KafkaPartitionSplit split : splits) { // If the checkpoint is triggered before the partition starting offsets // is retrieved, do not commit the offsets for those partitions. if (split.getStartingOffset() >= 0) { offsetsMap.put( split.getTopicPartition(), new OffsetAndMetadata(split.getStartingOffset())); } } // Put offsets of all the finished splits. offsetsMap.putAll(offsetsOfFinishedSplits); } return splits; }
下面是基于内存的状态存储,而长久化还须要内部零碎的反对,持续探索StreamOperatorStateHandler的snapshot办法逻辑,其中有这么一段:
if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot( checkpointId, timestamp, factory, checkpointOptions)); }
当配置了长久化后端存储,才会将状态数据长久化,以默认的OperatorStateBackend为例:
DefaultOperatorStateBackend.class @Override public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { long syncStartTime = System.currentTimeMillis(); RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner = snapshotStrategy.snapshot( checkpointId, timestamp, streamFactory, checkpointOptions); return snapshotRunner; }
snapshotStrategy.snapshot执行逻辑实现在DefaultOperatorStateBackendSnapshotStrategy中:
DefaultOperatorStateBackendSnapshotStrategy.class @Override public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( final long checkpointId, final long timestamp, @Nonnull final CheckpointStreamFactory streamFactory, @Nonnull final CheckpointOptions checkpointOptions) throws IOException { ... for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) { operatorMetaInfoSnapshots.add( entry.getValue().getStateMetaInfo().snapshot()); } ... // ... write them all in the checkpoint stream ... DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy( operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); backendSerializationProxy.write(dov); ... for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) { PartitionableListState<?> value = entry.getValue(); long[] partitionOffsets = value.write(localOut); } }
状态数据有元数据信息和状态自身的数据,状态数据通过PartitionableListState的write办法写入文件系统:
PartitionableListState.class public long[] write(FSDataOutputStream out) throws IOException { long[] partitionOffsets = new long[internalList.size()]; DataOutputView dov = new DataOutputViewStreamWrapper(out); for (int i = 0; i < internalList.size(); ++i) { S element = internalList.get(i); partitionOffsets[i] = out.getPos(); getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov); } return partitionOffsets; }
Kafka Source Operator状态复原
下面一部分介绍了Kafka Source Operator对Flink Checkpoint的反对,也是波及到snapshot和initialState两个局部,但次要介绍了snapshot的逻辑,再来看SourceOperator的如何初始化状态的:
SourceOperator.class @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC); readerState = new SimpleVersionedListState<>(rawState, splitSerializer); }
context.getOperatorStateStore应用了DefaultOperatorStateBackend的getListState办法:
DefaultOperatorStateBackend.class private final Map<String, PartitionableListState<?>> registeredOperatorStates; @Override public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception { return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); } private <S> ListState<S> getListState( ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException { ... PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name); ... return partitionableListState; }
而getListState仅仅是从名叫registeredOperatorStates的Map>中获取,那问题来了,registeredOperatorStates从哪里来?为了找到答案,这部分通过一个Kafka生产示例来演示和阐明,首先创立KafkaSource:
KafkaSource<MetaAndValue> kafkaSource = KafkaSource.<ObjectNode>builder() .setBootstrapServers(Constants.kafkaServers) .setGroupId(KafkaSinkIcebergExample.class.getName()) .setTopics(topic) .setDeserializer(recordDeserializer) .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) .setProperties(properties) .build();
并且设置重启策略:StreamOperator失败后立刻重启一次好快照距离:100毫秒1次:
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(0)));env.getCheckpointConfig().setCheckpointInterval(1 * 100L);
而后在Kafka反序列时候,设置解析100条记录后抛出异样:
public static class TestingKafkaRecordDeserializer implements KafkaRecordDeserializer<MetaAndValue> { private static final long serialVersionUID = -3765473065594331694L; private transient Deserializer<String> deserializer = new StringDeserializer(); int parseNum=0; @Override public void deserialize( ConsumerRecord<byte[], byte[]> record, Collector<MetaAndValue> collector) { if (deserializer == null) deserializer = new StringDeserializer(); MetaAndValue metaAndValue=new MetaAndValue( new TopicPartition(record.topic(), record.partition()), deserializer.deserialize(record.topic(), record.value()), record.offset()); if(parseNum++>100) { Map<String,Object> metaData=metaAndValue.getMetaData(); throw new RuntimeException("for test"); } collector.collect(metaAndValue); }}
JobMaster初始化并创立Scheduler时候从Checkpoint进行状态初始化,如果从Checkpoint初始化失败,则试图从Savepoint复原。
SchedulerBase.class private ExecutionGraph createAndRestoreExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp) throws Exception { ExecutionGraph newExecutionGraph = createExecutionGraph( currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { // check whether we find a valid checkpoint if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( new HashSet<>(newExecutionGraph.getAllVertices().values()))) { // check whether we can restore from a savepoint tryRestoreExecutionGraphFromSavepoint( newExecutionGraph, jobGraph.getSavepointRestoreSettings()); } } return newExecutionGraph;}
最初回到相熟的CheckpointCoordinator,在其办法restoreLatestCheckpointedStateInternal中从Checkpoint目录加载最新快照状态:
CheckpointCoordinator.class public boolean restoreInitialCheckpointIfPresent(final Set<ExecutionJobVertex> tasks) throws Exception { final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, false, // initial checkpoints exist only on JobManager failover. ok if not // present. false); // JobManager failover means JobGraphs match exactly. return restoredCheckpointId.isPresent(); } private OptionalLong restoreLatestCheckpointedStateInternal( final Set<ExecutionJobVertex> tasks, final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior, final boolean errorIfNoCheckpoint, final boolean allowNonRestoredState) throws Exception { ... // Recover the checkpoints, TODO this could be done only when there is a new leader, not // on each recovery completedCheckpointStore.recover(); // Restore from the latest checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery); ... // re-assign the task states final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates(); StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation( latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState); stateAssignmentOperation.assignStates(); ... }
下面是利用初始启动的状态复原逻辑,那在利用运行期间的Operator失败重启的逻辑又是什么样的呢?实际上JobMaster会监听工作运行状态,并做相应解决,比方上面一个失败解决链路逻辑:
UpdateSchedulerNgOnInternalFailuresListener.class @Override public void notifyTaskFailure( final ExecutionAttemptID attemptId, final Throwable t, final boolean cancelTask, final boolean releasePartitions) { final TaskExecutionState state = new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t); schedulerNg.updateTaskExecutionState( new TaskExecutionStateTransition(state, cancelTask, releasePartitions)); }SchedulerBase.class@Overridepublic final boolean updateTaskExecutionState( final TaskExecutionStateTransition taskExecutionState) { final Optional<ExecutionVertexID> executionVertexId = getExecutionVertexId(taskExecutionState.getID()); boolean updateSuccess = executionGraph.updateState(taskExecutionState); if (updateSuccess) { checkState(executionVertexId.isPresent()); if (isNotifiable(executionVertexId.get(), taskExecutionState)) { updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState); } return true; } else { return false; }}DefaultScheduler.class @Override protected void updateTaskExecutionStateInternal( final ExecutionVertexID executionVertexId, final TaskExecutionStateTransition taskExecutionState) { schedulingStrategy.onExecutionStateChange( executionVertexId, taskExecutionState.getExecutionState()); maybeHandleTaskFailure(taskExecutionState, executionVertexId); } private void maybeHandleTaskFailure( final TaskExecutionStateTransition taskExecutionState, final ExecutionVertexID executionVertexId) { if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { final Throwable error = taskExecutionState.getError(userCodeLoader); handleTaskFailure(executionVertexId, error); } } private void handleTaskFailure( final ExecutionVertexID executionVertexId, @Nullable final Throwable error) { setGlobalFailureCause(error); notifyCoordinatorsAboutTaskFailure(executionVertexId, error); final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error); maybeRestartTasks(failureHandlingResult); } private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { if (failureHandlingResult.canRestart()) { //调用restartTasks restartTasksWithDelay(failureHandlingResult); } else { failJob(failureHandlingResult.getError()); } } private Runnable restartTasks( final Set<ExecutionVertexVersion> executionVertexVersions, final boolean isGlobalRecovery) { return () -> { final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices( executionVertexVersions); removeVerticesFromRestartPending(verticesToRestart); resetForNewExecutions(verticesToRestart); try { restoreState(verticesToRestart, isGlobalRecovery); } catch (Throwable t) { handleGlobalFailure(t); return; } schedulingStrategy.restartTasks(verticesToRestart); }; }SchedulerBase.class protected void restoreState( final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) throws Exception { ... if (isGlobalRecovery) { final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices); checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true); } else { final Map<ExecutionJobVertex, IntArrayList> subtasksToRestore = getInvolvedExecutionJobVerticesAndSubtasks(vertices); final OptionalLong restoredCheckpointId = checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks( subtasksToRestore.keySet()); // Ideally, the Checkpoint Coordinator would call OperatorCoordinator.resetSubtask, but // the Checkpoint Coordinator is not aware of subtasks in a local failover. It always // assigns state to all subtasks, and for the subtask execution attempts that are still // running (or not waiting to be deployed) the state assignment has simply no effect. // Because of that, we need to do the "subtask restored" notification here. // Once the Checkpoint Coordinator is properly aware of partial (region) recovery, // this code should move into the Checkpoint Coordinator. final long checkpointId = restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT); notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId); } ... }
上述整个链路波及到DefaultScheduler和SchedulerBase,实际上还是在一个运行对象实例中进行的,两者关系为:
public abstract class SchedulerBase implements SchedulerNGpublic class DefaultScheduler extends SchedulerBase implements SchedulerOperations
最初又回到了相熟的CheckpointCoordinator:
CheckpointCoordinator.classpublic OptionalLong restoreLatestCheckpointedStateToSubtasks( final Set<ExecutionJobVertex> tasks) throws Exception { // when restoring subtasks only we accept potentially unmatched state for the // following reasons // - the set frequently does not include all Job Vertices (only the ones that are part // of the restarted region), meaning there will be unmatched state by design. // - because what we might end up restoring from an original savepoint with unmatched // state, if there is was no checkpoint yet. return restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior .SKIP, // local/regional recovery does not reset coordinators false, // recovery might come before first successful checkpoint true); // see explanation above }
在schedulingStrategy.restartTasks中,每个Task 调配的状态被封装在JobManagerTaskRestore 中,jobManagerTaskRestore 会作为TaskDeploymentDescriptor 的一个属性下发到TaskEXecutor 中。当TaskDeploymentDescriptor被提交给TaskExecutor 之后,TaskExcutor 会应用TaskStateManager 用于治理以后Task的状态,TaskStateManager 对象会基于调配的JobManagerTaskRestore 和本地状态存储TaskLocalStateStore进行创立:
TaskEXecutor.class @Override public CompletableFuture<Acknowledge> submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { ... final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( jobId, tdd.getAllocationId(), taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); final TaskStateManager taskStateManager = new TaskStateManagerImpl( jobId, tdd.getExecutionAttemptId(), localStateStore, taskRestore, checkpointResponder); ... //启动 Task }
启动Task会调用StreamTask的invoke办法,并在beforeInvoke中进行如下初始化:
StreamTask.class protected void beforeInvoke() throws Exception { ... operatorChain.initializeStateAndOpenOperators( createStreamTaskStateInitializer()); ... } public StreamTaskStateInitializer createStreamTaskStateInitializer() { InternalTimeServiceManager.Provider timerServiceProvider = configuration.getTimerServiceProvider(getUserCodeClassLoader()); return new StreamTaskStateInitializerImpl( getEnvironment(), stateBackend, TtlTimeProvider.DEFAULT, timerServiceProvider != null ? timerServiceProvider : InternalTimeServiceManagerImpl::create); }
再回到operatorChain的initializeStateAndOpenOperators办法:
OperatorChain.class protected void initializeStateAndOpenOperators( StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) { StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }
其中StreamOperator的initializeState调用了子类AbstractStreamOperator的initializeState,并在其中创立StreamOperatorStateContext:
AbstractStreamOperator.class @Override public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); }
从快照中读取状态数据并复原的理论动作就暗藏在streamOperatorStateContext的创立过程中:
StreamTaskStateInitializerImpl.class @Override public StreamOperatorStateContext streamOperatorStateContext( // -------------- Keyed State Backend -------------- keyedStatedBackend = keyedStatedBackend( keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction); // -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); // -------------- Raw State Streams -------------- rawKeyedStateInputs = rawKeyedStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawKeyedState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs = rawOperatorStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawOperatorState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); // -------------- Internal Timer Service Manager -------------- if (keyedStatedBackend != null) { // if the operator indicates that it is using custom raw keyed state, // then whatever was written in the raw keyed state snapshot was NOT written // by the internal timer services (because there is only ever one user of raw keyed // state); // in this case, timers should not attempt to restore timers from the raw keyed // state. final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers = (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) ? rawKeyedStateInputs : Collections.emptyList(); timeServiceManager = timeServiceManagerProvider.create( keyedStatedBackend, environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers); } else { timeServiceManager = null; } // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( prioritizedOperatorSubtaskStates.isRestored(), operatorStateBackend, keyedStatedBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); }
到此为止,咱们实现了梳理Flink Kafka Source Operator Checkpoint的状态复原流程,这部分逻辑大抵能够划分为两大部分:基于StateInitializationContext的状态初始化和从Checkpoint复原状态并生成StateInitializationContext。后者的复原过程远比文章中介绍的简单,实际上JobMaster监听到工作失败后,会从Checkpoint长久化数据中装载最近一个快照的状态元数据,而后再将状态重新分配各子工作,特地是利用重启级别的复原,还波及到算子拓扑构造和并行度的扭转,JobMaster状态复原之后再提交工作重启申请,在TaskManager端还可能再从本地快照(如果启用的话)复原状态数据。TaskManager端的状态复原以创立实现StreamOperatorStateContext为标记,它包装了快照复原后的残缺数据,接下来就回到了失常的StreamOperator的InitialState办法调用流程。
总结
本文从Flink Checkpoint的解决流程(包含snapshot的创立和初始化两局部)和Kafka对Flink Checkpoint的反对几个局部,从Flink的代码实现角度来确定Flink的Checkpoint是反对Kafka的数据生产状态保护的,然而这个状态只是从StateInitializationContext对象中获取的,为了进一步验证StateInitializationContext的状态是否从Checkpoint长久化中获取,本文第四局部联合试验,从Flink利用重启和运行时Operator失败重试来梳理Flink的状态复原逻辑,确定Flink是反对从Checkpoint或Savepoint复原状态。
最初,依据前文的剖析,开发者在开发Flink利用时须要留神的是:尽管Flink可能将Kafka生产状态复原到最近一个Checkpoint快照状态,然而无奈防止在两个快照之间的反复生产。一个典型情景是Sink端不反对幂等时,有可能造成数据的反复,例如PrintSink就无奈撤回快照之间输入的数据。另外,在未开启Flink Checkpoint时须要依赖Kafka Client本身的commit的状态来实现状态保护。