作者:闻乃松

应用Flink实时生产kafka数据时候,波及到offset的状态保护,为了保障Flink作业重启或者运行时的Operator级别的失败重试,如果要做到“断点续跑”,须要Flink的Checkpoint的反对。问题是,如果简略的开启Flink的Checkpoint机制,而不须要额定的编码工作,是否能达到目标?为答复该问题,本文首先要钻研Flink的Checkpoint的解决机制,而后再看Flink是否反对Kafka的状态存储,于是本文分以下四个局部:

  • Flink Checkpoint 状态快照(snapshotState)次要流程
  • Flink Checkpoint 状态初始化(initializeState)次要流程
  • Kafka Source Operator 对Flink Checkpoint实现
  • Kafka Source Operator状态复原

为了精确形容起见,本文以Flink 1.12.x版本,Kafka客户端版本2.4.x为例阐明。

Flink Checkpoint 状态快照(snapshotState)次要流程

咱们已知 Flink Checkpoint由CheckpointCoordinator周期性发动,它通过向相干的tasks发送触发音讯和从各tasks收集确认音讯(Ack)来实现checkpoint。这里省略CheckpointCoordinator发动调用逻辑解析,直奔音讯受体TaskExecutor,来看Checkpoint的执行流程,在TaskExecutor中获取Task实例,触发triggerCheckpointBarrier:

TaskExecutor.class    @Override    public CompletableFuture<Acknowledge> triggerCheckpoint(            ExecutionAttemptID executionAttemptID,            long checkpointId,            long checkpointTimestamp,            CheckpointOptions checkpointOptions) {            ...        final Task task = taskSlotTable.getTask(executionAttemptID);        if (task != null) {            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);            return CompletableFuture.completedFuture(Acknowledge.get());          ...        }    }

Task是在TaskExecutor中的调度执行单元,也响应Checkpoint申请:

Task.class        public void triggerCheckpointBarrier(        final long checkpointID,        final long checkpointTimestamp,        final CheckpointOptions checkpointOptions) {                    ...                    final CheckpointMetaData checkpointMetaData =            new CheckpointMetaData(checkpointID, checkpointTimestamp);                    invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);                    ...        }

其中的看点是invokable,为AbstractInvokable类型的对象,依据调用类动静实例化:

// now load and instantiate the task's invokable codeAbstractInvokable invokable =        loadAndInstantiateInvokable(                userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);

其中 nameOfInvokableClass参数 在Task初始化时传入,动态创建AbstractInvokable实例,比方以一个SourceOperator为例,其类名称为:

org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask

从SourceOperatorStreamTask类定义来看,它又是StreamTask的子类:

class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>>

triggerCheckpointAsync办法接连调用SourceOperatorStreamTask和StreamTask类的triggerCheckpointAsync办法,次要逻辑是在StreamTask的triggerCheckpointAsync办法中:

StreamTask.class    @Override    public Future<Boolean> triggerCheckpointAsync(            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {      ...      triggerCheckpoint(checkpointMetaData, checkpointOptions)      ...      }  private boolean triggerCheckpoint(            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)            throws Exception {            // No alignment if we inject a checkpoint            CheckpointMetricsBuilder checkpointMetrics =                    new CheckpointMetricsBuilder()                            .setAlignmentDurationNanos(0L)                            .setBytesProcessedDuringAlignment(0L);            ...            boolean success =                    performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);            if (!success) {                declineCheckpoint(checkpointMetaData.getCheckpointId());            }            return success;  }    private boolean performCheckpoint(            CheckpointMetaData checkpointMetaData,            CheckpointOptions checkpointOptions,            CheckpointMetricsBuilder checkpointMetrics)            throws Exception {            ...        subtaskCheckpointCoordinator.checkpointState(                                checkpointMetaData,                                checkpointOptions,                                checkpointMetrics,                                operatorChain,                                this::isRunning);      ...    }

其中subtaskCheckpointCoordinator是SubtaskCheckpointCoordinatorImpl类型实例,负责协调子工作相干的checkpoint工作:

/** * Coordinates checkpointing-related work for a subtask (i.e. {@link * org.apache.flink.runtime.taskmanager.Task Task} and {@link StreamTask}). Responsibilities: * * <ol> *   <li>build a snapshot (invokable) *   <li>report snapshot to the JobManager *   <li>action upon checkpoint notification *   <li>maintain storage locations * </ol> */@Internalpublic interface SubtaskCheckpointCoordinator extends Closeable

上面是SubtaskCheckpointCoordinatorImpl实现类中的checkpointState次要逻辑:

SubtaskCheckpointCoordinatorImpl.class    @Override    public void checkpointState(            CheckpointMetaData metadata,            CheckpointOptions options,            CheckpointMetricsBuilder metrics,            OperatorChain<?, ?> operatorChain,            Supplier<Boolean> isRunning)            throws Exception {        // All of the following steps happen as an atomic step from the perspective of barriers and        // records/watermarks/timers/callbacks.        // We generally try to emit the checkpoint barrier as soon as possible to not affect        // downstream        // checkpoint alignments        if (lastCheckpointId >= metadata.getCheckpointId()) {            LOG.info(                    "Out of order checkpoint barrier (aborted previously?): {} >= {}",                    lastCheckpointId,                    metadata.getCheckpointId());            channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);            checkAndClearAbortedStatus(metadata.getCheckpointId());            return;        }            lastCheckpointId = metadata.getCheckpointId();        if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {            // broadcast cancel checkpoint marker to avoid downstream back-pressure due to            // checkpoint barrier align.            operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));            return;                }        // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.        //           The pre-barrier work should be nothing or minimal in the common case.        operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());        // Step (2): Send the checkpoint barrier downstream        operatorChain.broadcastEvent(                new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),                options.isUnalignedCheckpoint());            // Step (3): Prepare to spill the in-flight buffers for input and output        if (options.isUnalignedCheckpoint()) {            // output data already written while broadcasting event            channelStateWriter.finishOutput(metadata.getCheckpointId());        }        // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact        // progress of the        // streaming topology          Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =                new HashMap<>(operatorChain.getNumberOfOperators());                if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {                finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);          } else {               cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));          }        }

在Step (1)中看到prepareSnapshotPreBarrier,在正式snapshot之前做了一些轻量级的筹备工作,具体操作实现在OperatorChain中,顺次调用链中每个StreamOperator的prepareSnapshotPreBarrier办法:

OperatorChain.class    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {        // go forward through the operator chain and tell each operator        // to prepare the checkpoint        for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {            if (!operatorWrapper.isClosed()) {                operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);            }        }    }

通过一系列快照查看验证、快照前的筹备、向上游播送事件操作,最终落脚到本类的checkpointStreamOperator办法:

SubtaskCheckpointCoordinatorImpl.class    private static OperatorSnapshotFutures checkpointStreamOperator(            StreamOperator<?> op,            CheckpointMetaData checkpointMetaData,            CheckpointOptions checkpointOptions,            CheckpointStreamFactory storageLocation,            Supplier<Boolean> isRunning)            throws Exception {        try {            return op.snapshotState(                    checkpointMetaData.getCheckpointId(),                    checkpointMetaData.getTimestamp(),                    checkpointOptions,                    storageLocation);        } catch (Exception ex) {            if (isRunning.get()) {                LOG.info(ex.getMessage(), ex);            }            throw ex;        }    }

该办法又调用AbstractStreamOperator的snapshotState:

AbstractStreamOperator.class    @Override    public final OperatorSnapshotFutures snapshotState(            long checkpointId,            long timestamp,            CheckpointOptions checkpointOptions,            CheckpointStreamFactory factory)            throws Exception {        return stateHandler.snapshotState(                this,                Optional.ofNullable(timeServiceManager),                getOperatorName(),                checkpointId,                timestamp,                checkpointOptions,                factory,                isUsingCustomRawKeyedState());    }

snapshotState又将checkpoint逻辑委派到StreamOperatorStateHandler。StreamOperatorStateHandler的逻辑下文再介绍。梳理上述snapshot逻辑流程,可视化体现为:

Flink Checkpoint 状态初始化(initializeState)次要流程

上文中提到的Task初始化启动会调用AbstractInvokable 的invoke办法,

// now load and instantiate the task's invokable codeAbstractInvokable invokable =        loadAndInstantiateInvokable(                userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);// run the invokableinvokable.invoke();

invoke在其父类StreamTask中的invoke办法实现调用前、运行事件循环和调用后的Template 策略动作:

StreamTask.class    @Override    public final void invoke() throws Exception {        beforeInvoke();        // final check to exit early before starting to run        if (canceled) {            throw new CancelTaskException();        }        // let the task do its work        runMailboxLoop();        // if this left the run() method cleanly despite the fact that this was canceled,        // make sure the "clean shutdown" is not attempted        if (canceled) {            throw new CancelTaskException();        }        afterInvoke();    }

在beforeInvoke办法中通过operatorChain的initializeStateAndOpenOperators进行状态初始化:

StreamTask.class    protected void beforeInvoke() throws Exception {      operatorChain = new OperatorChain<>(this, recordWriter);        ...        operatorChain.initializeStateAndOpenOperators(                            createStreamTaskStateInitializer());      ...    }

在operatorChain中触发以后链中所有StreamOperator:

OperatorChain.classprotected void initializeStateAndOpenOperators(        StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {    for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {        StreamOperator<?> operator = operatorWrapper.getStreamOperator();        operator.initializeState(streamTaskStateInitializer);        operator.open();    }}

持续跟进AbstractStreamOperator调用initializeState:

AbstractStreamOperator.class   @Override   public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)            throws Exception {        final StreamOperatorStateContext context =                streamTaskStateManager.streamOperatorStateContext(                        getOperatorID(),                        getClass().getSimpleName(),                        getProcessingTimeService(),                        this,                        keySerializer,                        streamTaskCloseableRegistry,                        metrics,                        config.getManagedMemoryFractionOperatorUseCaseOfSlot(                                ManagedMemoryUseCase.STATE_BACKEND,                                runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),                                runtimeContext.getUserCodeClassLoader()),                        isUsingCustomRawKeyedState());      stateHandler =            new StreamOperatorStateHandler(                    context, getExecutionConfig(), streamTaskCloseableRegistry);    timeServiceManager = context.internalTimerServiceManager();    stateHandler.initializeOperatorState(this);}

其中stateHandler.initializeOperatorState又将initializeOperatorState委派到了StreamOperatorStateHandler类,在其中实现具体StreamOperator子类的状态初始化。梳理初始化状态的逻辑,可视化体现为:

Kafka Source Operator 对Flink Checkpoint的反对

当初将Checkpoint的状态快照过程和状态初始化过程画在一起,会看到两者都汇总委派到StreamOperatorStateHandler来执行:

StreamOperatorStateHandler类中initializeOperatorState和snapshotState办法实现如下,次要实现的是参数的构建:

StreamOperatorStateHandler.class    public void initializeOperatorState(CheckpointedStreamOperator streamOperator)            throws Exception {        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs =                context.rawKeyedStateInputs();        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs =                context.rawOperatorStateInputs();      StateInitializationContext initializationContext =        new StateInitializationContextImpl(        context.isRestored(), // information whether we restore or start for        // the first time        operatorStateBackend, // access to operator state backend        keyedStateStore, // access to keyed state backend        keyedStateInputs, // access to keyed state stream        operatorStateInputs); // access to operator state stream      streamOperator.initializeState(initializationContext);    }     public OperatorSnapshotFutures snapshotState(            CheckpointedStreamOperator streamOperator,            Optional<InternalTimeServiceManager<?>> timeServiceManager,            String operatorName,            long checkpointId,            long timestamp,            CheckpointOptions checkpointOptions,            CheckpointStreamFactory factory,            boolean isUsingCustomRawKeyedState)            throws CheckpointException {        KeyGroupRange keyGroupRange =                null != keyedStateBackend                        ? keyedStateBackend.getKeyGroupRange()                        : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();        StateSnapshotContextSynchronousImpl snapshotContext =                new StateSnapshotContextSynchronousImpl(                        checkpointId, timestamp, factory, keyGroupRange, closeableRegistry);        snapshotState(                streamOperator,                timeServiceManager,                operatorName,                checkpointId,                timestamp,                checkpointOptions,                factory,                snapshotInProgress,                snapshotContext,                isUsingCustomRawKeyedState);        return snapshotInProgress;    }        void snapshotState(            CheckpointedStreamOperator streamOperator,            Optional<InternalTimeServiceManager<?>> timeServiceManager,            String operatorName,            long checkpointId,            long timestamp,            CheckpointOptions checkpointOptions,            CheckpointStreamFactory factory,            OperatorSnapshotFutures snapshotInProgress,            StateSnapshotContextSynchronousImpl snapshotContext,            boolean isUsingCustomRawKeyedState)            throws CheckpointException {            if (timeServiceManager.isPresent()) {                checkState(                        keyedStateBackend != null,                        "keyedStateBackend should be available with timeServiceManager");                final InternalTimeServiceManager<?> manager = timeServiceManager.get();                if (manager.isUsingLegacyRawKeyedStateSnapshots()) {                    checkState(                            !isUsingCustomRawKeyedState,                            "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");                    manager.snapshotToRawKeyedState(                            snapshotContext.getRawKeyedOperatorStateOutput(), operatorName);                }            }            streamOperator.snapshotState(snapshotContext);            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());            snapshotInProgress.setOperatorStateRawFuture(                    snapshotContext.getOperatorStateStreamFuture());            if (null != operatorStateBackend) {                snapshotInProgress.setOperatorStateManagedFuture(                        operatorStateBackend.snapshot(                                checkpointId, timestamp, factory, checkpointOptions));            }            if (null != keyedStateBackend) {                snapshotInProgress.setKeyedStateManagedFuture(                        keyedStateBackend.snapshot(                                checkpointId, timestamp, factory, checkpointOptions));            }        }

值得一提的是两个办法中的StreamOperator参数要求是CheckpointedStreamOperator 类型:

public interface CheckpointedStreamOperator {    void initializeState(StateInitializationContext context) throws Exception;    void snapshotState(StateSnapshotContext context) throws Exception;}

比拟下StreamOperator,其跟Checkpoint相干的三个办法定义如下:

尽管办法名字一样,参数不同,其实不必管这些,只须要晓得StreamOperator将快照的相干逻辑委派到了StreamOperatorStateHandler,真正的快照逻辑都在CheckpointedStreamOperator中实现即可,于是,要想实现自定义快照逻辑,只须要实现CheckpointedStreamOperato接口,以SourceOperator为例,类定义:

public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>        implements OperatorEventHandler, PushingAsyncDataInput<OUT>

而AbstractStreamOperator的类定义为:

public abstract class AbstractStreamOperator<OUT>        implements StreamOperator<OUT>,                SetupableStreamOperator<OUT>,                CheckpointedStreamOperator,                Serializable

AbstractStreamOperator曾经帮咱们实现了相干办法,只须要extend AbstractStreamOperator,依然以SourceOperator为例来看它的实现:

SourceOperator.class    private ListState<SplitT> readerState;    @Override    public void initializeState(StateInitializationContext context) throws Exception {        super.initializeState(context);        final ListState<byte[]> rawState =                context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);        readerState = new SimpleVersionedListState<>(rawState, splitSerializer);    }    @Override    public void snapshotState(StateSnapshotContext context) throws Exception {        long checkpointId = context.getCheckpointId();        LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);        readerState.update(sourceReader.snapshotState(checkpointId));    }

可见SourceOperator将快照状态存储在内存中的SimpleVersionedListState中,snapshotState的具体操作转给了SourceReader,来看Flink Kafka Connector提供的KafkaSourceReader 如何实现snapshotState:

KafkaSourceReader.classKafkaSourceReader extends SourceReaderBase implements SourceReader        @Override    public List<KafkaPartitionSplit> snapshotState(long checkpointId) {        List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);        if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {            offsetsToCommit.put(checkpointId, Collections.emptyMap());        } else {            Map<TopicPartition, OffsetAndMetadata> offsetsMap =                    offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());            // Put the offsets of the active splits.            for (KafkaPartitionSplit split : splits) {                // If the checkpoint is triggered before the partition starting offsets                // is retrieved, do not commit the offsets for those partitions.                if (split.getStartingOffset() >= 0) {                    offsetsMap.put(                            split.getTopicPartition(),                            new OffsetAndMetadata(split.getStartingOffset()));                }            }            // Put offsets of all the finished splits.            offsetsMap.putAll(offsetsOfFinishedSplits);        }        return splits;    }

下面是基于内存的状态存储,而长久化还须要内部零碎的反对,持续探索StreamOperatorStateHandler的snapshot办法逻辑,其中有这么一段:

if (null != operatorStateBackend) {    snapshotInProgress.setOperatorStateManagedFuture(      operatorStateBackend.snapshot(        checkpointId, timestamp, factory, checkpointOptions));  }

当配置了长久化后端存储,才会将状态数据长久化,以默认的OperatorStateBackend为例:

DefaultOperatorStateBackend.class   @Override    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(            long checkpointId,            long timestamp,            @Nonnull CheckpointStreamFactory streamFactory,            @Nonnull CheckpointOptions checkpointOptions)            throws Exception {        long syncStartTime = System.currentTimeMillis();        RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =                snapshotStrategy.snapshot(                        checkpointId, timestamp, streamFactory, checkpointOptions);        return snapshotRunner;    }

snapshotStrategy.snapshot执行逻辑实现在DefaultOperatorStateBackendSnapshotStrategy中:

DefaultOperatorStateBackendSnapshotStrategy.class      @Override    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(            final long checkpointId,            final long timestamp,            @Nonnull final CheckpointStreamFactory streamFactory,            @Nonnull final CheckpointOptions checkpointOptions)            throws IOException {        ...        for (Map.Entry<String, PartitionableListState<?>> entry :                                registeredOperatorStatesDeepCopies.entrySet()) {          operatorMetaInfoSnapshots.add(            entry.getValue().getStateMetaInfo().snapshot());        }            ...        // ... write them all in the checkpoint stream ...        DataOutputView dov = new DataOutputViewStreamWrapper(localOut);        OperatorBackendSerializationProxy backendSerializationProxy =        new OperatorBackendSerializationProxy(        operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);        backendSerializationProxy.write(dov);         ...       for (Map.Entry<String, PartitionableListState<?>> entry :            registeredOperatorStatesDeepCopies.entrySet()) {         PartitionableListState<?> value = entry.getValue();         long[] partitionOffsets = value.write(localOut);       }        }

状态数据有元数据信息和状态自身的数据,状态数据通过PartitionableListState的write办法写入文件系统:

PartitionableListState.class     public long[] write(FSDataOutputStream out) throws IOException {        long[] partitionOffsets = new long[internalList.size()];        DataOutputView dov = new DataOutputViewStreamWrapper(out);        for (int i = 0; i < internalList.size(); ++i) {            S element = internalList.get(i);            partitionOffsets[i] = out.getPos();            getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);        }        return partitionOffsets;    }

Kafka Source Operator状态复原

下面一部分介绍了Kafka Source Operator对Flink Checkpoint的反对,也是波及到snapshot和initialState两个局部,但次要介绍了snapshot的逻辑,再来看SourceOperator的如何初始化状态的:

SourceOperator.class    @Override    public void initializeState(StateInitializationContext context) throws Exception {        super.initializeState(context);        final ListState<byte[]> rawState =                context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);        readerState = new SimpleVersionedListState<>(rawState, splitSerializer);    }

context.getOperatorStateStore应用了DefaultOperatorStateBackend的getListState办法:

DefaultOperatorStateBackend.class    private final Map<String, PartitionableListState<?>> registeredOperatorStates;        @Override    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {        return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);    }    private <S> ListState<S> getListState(            ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode)            throws StateMigrationException {      ...       PartitionableListState<S> partitionableListState =                (PartitionableListState<S>) registeredOperatorStates.get(name);      ...        return partitionableListState;    }

而getListState仅仅是从名叫registeredOperatorStates的Map>中获取,那问题来了,registeredOperatorStates从哪里来?为了找到答案,这部分通过一个Kafka生产示例来演示和阐明,首先创立KafkaSource:

KafkaSource<MetaAndValue> kafkaSource =        KafkaSource.<ObjectNode>builder()                .setBootstrapServers(Constants.kafkaServers)                .setGroupId(KafkaSinkIcebergExample.class.getName())                .setTopics(topic)                .setDeserializer(recordDeserializer)                .setStartingOffsets(OffsetsInitializer.earliest())                .setBounded(OffsetsInitializer.latest())                .setProperties(properties)                .build();

并且设置重启策略:StreamOperator失败后立刻重启一次好快照距离:100毫秒1次:

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(0)));env.getCheckpointConfig().setCheckpointInterval(1 * 100L);

而后在Kafka反序列时候,设置解析100条记录后抛出异样:

public static class TestingKafkaRecordDeserializer        implements KafkaRecordDeserializer<MetaAndValue> {    private static final long serialVersionUID = -3765473065594331694L;    private transient Deserializer<String> deserializer = new StringDeserializer();    int parseNum=0;    @Override    public void deserialize(            ConsumerRecord<byte[], byte[]> record, Collector<MetaAndValue> collector) {                   if (deserializer == null)                deserializer = new StringDeserializer();            MetaAndValue metaAndValue=new MetaAndValue(                    new TopicPartition(record.topic(), record.partition()),                    deserializer.deserialize(record.topic(), record.value()), record.offset());            if(parseNum++>100) {                Map<String,Object> metaData=metaAndValue.getMetaData();                throw new RuntimeException("for test");            }            collector.collect(metaAndValue);    }}

JobMaster初始化并创立Scheduler时候从Checkpoint进行状态初始化,如果从Checkpoint初始化失败,则试图从Savepoint复原。

SchedulerBase.class  private ExecutionGraph createAndRestoreExecutionGraph(        JobManagerJobMetricGroup currentJobManagerJobMetricGroup,        ShuffleMaster<?> shuffleMaster,        JobMasterPartitionTracker partitionTracker,        ExecutionDeploymentTracker executionDeploymentTracker,        long initializationTimestamp)        throws Exception {    ExecutionGraph newExecutionGraph =            createExecutionGraph(                    currentJobManagerJobMetricGroup,                    shuffleMaster,                    partitionTracker,                    executionDeploymentTracker,                    initializationTimestamp);    final CheckpointCoordinator checkpointCoordinator =            newExecutionGraph.getCheckpointCoordinator();    if (checkpointCoordinator != null) {        // check whether we find a valid checkpoint        if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(                new HashSet<>(newExecutionGraph.getAllVertices().values()))) {            // check whether we can restore from a savepoint            tryRestoreExecutionGraphFromSavepoint(                    newExecutionGraph, jobGraph.getSavepointRestoreSettings());        }    }    return newExecutionGraph;}

最初回到相熟的CheckpointCoordinator,在其办法restoreLatestCheckpointedStateInternal中从Checkpoint目录加载最新快照状态:

CheckpointCoordinator.class    public boolean restoreInitialCheckpointIfPresent(final Set<ExecutionJobVertex> tasks)            throws Exception {        final OptionalLong restoredCheckpointId =                restoreLatestCheckpointedStateInternal(                        tasks,                        OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,                        false, // initial checkpoints exist only on JobManager failover. ok if not                        // present.                        false); // JobManager failover means JobGraphs match exactly.        return restoredCheckpointId.isPresent();    }        private OptionalLong restoreLatestCheckpointedStateInternal(      final Set<ExecutionJobVertex> tasks,      final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior,      final boolean errorIfNoCheckpoint,      final boolean allowNonRestoredState)      throws Exception {      ...        // Recover the checkpoints, TODO this could be done only when there is a new leader, not        // on each recovery        completedCheckpointStore.recover();      // Restore from the latest checkpoint      CompletedCheckpoint latest =        completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);      ...        // re-assign the task states        final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();      StateAssignmentOperation stateAssignmentOperation =        new StateAssignmentOperation(        latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);      stateAssignmentOperation.assignStates();      ...   }

下面是利用初始启动的状态复原逻辑,那在利用运行期间的Operator失败重启的逻辑又是什么样的呢?实际上JobMaster会监听工作运行状态,并做相应解决,比方上面一个失败解决链路逻辑:

UpdateSchedulerNgOnInternalFailuresListener.class    @Override    public void notifyTaskFailure(            final ExecutionAttemptID attemptId,            final Throwable t,            final boolean cancelTask,            final boolean releasePartitions) {        final TaskExecutionState state =                new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t);        schedulerNg.updateTaskExecutionState(                new TaskExecutionStateTransition(state, cancelTask, releasePartitions));    }SchedulerBase.class@Overridepublic final boolean updateTaskExecutionState(        final TaskExecutionStateTransition taskExecutionState) {    final Optional<ExecutionVertexID> executionVertexId =            getExecutionVertexId(taskExecutionState.getID());    boolean updateSuccess = executionGraph.updateState(taskExecutionState);    if (updateSuccess) {        checkState(executionVertexId.isPresent());        if (isNotifiable(executionVertexId.get(), taskExecutionState)) {            updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);        }        return true;    } else {        return false;    }}DefaultScheduler.class        @Override    protected void updateTaskExecutionStateInternal(            final ExecutionVertexID executionVertexId,            final TaskExecutionStateTransition taskExecutionState) {        schedulingStrategy.onExecutionStateChange(                executionVertexId, taskExecutionState.getExecutionState());        maybeHandleTaskFailure(taskExecutionState, executionVertexId);    }    private void maybeHandleTaskFailure(            final TaskExecutionStateTransition taskExecutionState,            final ExecutionVertexID executionVertexId) {        if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {            final Throwable error = taskExecutionState.getError(userCodeLoader);            handleTaskFailure(executionVertexId, error);        }    }    private void handleTaskFailure(            final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {        setGlobalFailureCause(error);        notifyCoordinatorsAboutTaskFailure(executionVertexId, error);        final FailureHandlingResult failureHandlingResult =                executionFailureHandler.getFailureHandlingResult(executionVertexId, error);        maybeRestartTasks(failureHandlingResult);    }    private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {        if (failureHandlingResult.canRestart()) {            //调用restartTasks            restartTasksWithDelay(failureHandlingResult);        } else {            failJob(failureHandlingResult.getError());        }    }    private Runnable restartTasks(            final Set<ExecutionVertexVersion> executionVertexVersions,            final boolean isGlobalRecovery) {        return () -> {            final Set<ExecutionVertexID> verticesToRestart =                    executionVertexVersioner.getUnmodifiedExecutionVertices(                            executionVertexVersions);            removeVerticesFromRestartPending(verticesToRestart);            resetForNewExecutions(verticesToRestart);            try {                restoreState(verticesToRestart, isGlobalRecovery);            } catch (Throwable t) {                handleGlobalFailure(t);                return;            }            schedulingStrategy.restartTasks(verticesToRestart);        };    }SchedulerBase.class  protected void restoreState(            final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery)            throws Exception {      ...      if (isGlobalRecovery) {          final Set<ExecutionJobVertex> jobVerticesToRestore =                  getInvolvedExecutionJobVertices(vertices);          checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);      } else {          final Map<ExecutionJobVertex, IntArrayList> subtasksToRestore =                  getInvolvedExecutionJobVerticesAndSubtasks(vertices);          final OptionalLong restoredCheckpointId =                  checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(                          subtasksToRestore.keySet());          // Ideally, the Checkpoint Coordinator would call OperatorCoordinator.resetSubtask, but          // the Checkpoint Coordinator is not aware of subtasks in a local failover. It always          // assigns state to all subtasks, and for the subtask execution attempts that are still          // running (or not waiting to be deployed) the state assignment has simply no effect.          // Because of that, we need to do the "subtask restored" notification here.          // Once the Checkpoint Coordinator is properly aware of partial (region) recovery,          // this code should move into the Checkpoint Coordinator.          final long checkpointId =                  restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT);          notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId);      }      ...    }

上述整个链路波及到DefaultScheduler和SchedulerBase,实际上还是在一个运行对象实例中进行的,两者关系为:

public abstract class SchedulerBase implements SchedulerNGpublic class DefaultScheduler extends SchedulerBase implements SchedulerOperations

最初又回到了相熟的CheckpointCoordinator:

CheckpointCoordinator.classpublic OptionalLong restoreLatestCheckpointedStateToSubtasks(        final Set<ExecutionJobVertex> tasks) throws Exception {    // when restoring subtasks only we accept potentially unmatched state for the    // following reasons    //   - the set frequently does not include all Job Vertices (only the ones that are part    //     of the restarted region), meaning there will be unmatched state by design.    //   - because what we might end up restoring from an original savepoint with unmatched    //     state, if there is was no checkpoint yet.    return restoreLatestCheckpointedStateInternal(            tasks,            OperatorCoordinatorRestoreBehavior                    .SKIP, // local/regional recovery does not reset coordinators            false, // recovery might come before first successful checkpoint            true); // see explanation above    }

在schedulingStrategy.restartTasks中,每个Task 调配的状态被封装在JobManagerTaskRestore 中,jobManagerTaskRestore 会作为TaskDeploymentDescriptor 的一个属性下发到TaskEXecutor 中。当TaskDeploymentDescriptor被提交给TaskExecutor 之后,TaskExcutor 会应用TaskStateManager 用于治理以后Task的状态,TaskStateManager 对象会基于调配的JobManagerTaskRestore 和本地状态存储TaskLocalStateStore进行创立:

TaskEXecutor.class  @Override  public CompletableFuture<Acknowledge> submitTask(          TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {        ...      final TaskLocalStateStore localStateStore =        localStateStoresManager.localStateStoreForSubtask(        jobId,        tdd.getAllocationId(),        taskInformation.getJobVertexId(),        tdd.getSubtaskIndex());    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();    final TaskStateManager taskStateManager =      new TaskStateManagerImpl(      jobId,      tdd.getExecutionAttemptId(),      localStateStore,      taskRestore,      checkpointResponder);    ...    //启动 Task    }

启动Task会调用StreamTask的invoke办法,并在beforeInvoke中进行如下初始化:

StreamTask.class    protected void beforeInvoke() throws Exception {            ...                 operatorChain.initializeStateAndOpenOperators(        createStreamTaskStateInitializer());            ...            }        public StreamTaskStateInitializer createStreamTaskStateInitializer() {        InternalTimeServiceManager.Provider timerServiceProvider =                configuration.getTimerServiceProvider(getUserCodeClassLoader());        return new StreamTaskStateInitializerImpl(                getEnvironment(),                stateBackend,                TtlTimeProvider.DEFAULT,                timerServiceProvider != null                        ? timerServiceProvider                        : InternalTimeServiceManagerImpl::create);    }

再回到operatorChain的initializeStateAndOpenOperators办法:

OperatorChain.class  protected void initializeStateAndOpenOperators(          StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {      for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {          StreamOperator<?> operator = operatorWrapper.getStreamOperator();          operator.initializeState(streamTaskStateInitializer);          operator.open();      }  }

其中StreamOperator的initializeState调用了子类AbstractStreamOperator的initializeState,并在其中创立StreamOperatorStateContext:

AbstractStreamOperator.class  @Override  public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)          throws Exception {      final TypeSerializer<?> keySerializer =              config.getStateKeySerializer(getUserCodeClassloader());      final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());      final CloseableRegistry streamTaskCloseableRegistry =              Preconditions.checkNotNull(containingTask.getCancelables());      final StreamOperatorStateContext context =              streamTaskStateManager.streamOperatorStateContext(                      getOperatorID(),                      getClass().getSimpleName(),                      getProcessingTimeService(),                      this,                      keySerializer,                      streamTaskCloseableRegistry,                      metrics,                      config.getManagedMemoryFractionOperatorUseCaseOfSlot(                              ManagedMemoryUseCase.STATE_BACKEND,                              runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),                              runtimeContext.getUserCodeClassLoader()),                      isUsingCustomRawKeyedState());      stateHandler =              new StreamOperatorStateHandler(                      context, getExecutionConfig(), streamTaskCloseableRegistry);      timeServiceManager = context.internalTimerServiceManager();      stateHandler.initializeOperatorState(this);      runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  }

从快照中读取状态数据并复原的理论动作就暗藏在streamOperatorStateContext的创立过程中:

StreamTaskStateInitializerImpl.class    @Override    public StreamOperatorStateContext streamOperatorStateContext(        // -------------- Keyed State Backend --------------        keyedStatedBackend =                keyedStatedBackend(                        keySerializer,                        operatorIdentifierText,                        prioritizedOperatorSubtaskStates,                        streamTaskCloseableRegistry,                        metricGroup,                        managedMemoryFraction);        // -------------- Operator State Backend --------------        operatorStateBackend =                operatorStateBackend(                        operatorIdentifierText,                        prioritizedOperatorSubtaskStates,                        streamTaskCloseableRegistry);        // -------------- Raw State Streams --------------        rawKeyedStateInputs =                rawKeyedStateInputs(                        prioritizedOperatorSubtaskStates                                .getPrioritizedRawKeyedState()                                .iterator());        streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);        rawOperatorStateInputs =                rawOperatorStateInputs(                        prioritizedOperatorSubtaskStates                                .getPrioritizedRawOperatorState()                                .iterator());        streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);        // -------------- Internal Timer Service Manager --------------        if (keyedStatedBackend != null) {            // if the operator indicates that it is using custom raw keyed state,            // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =                    (prioritizedOperatorSubtaskStates.isRestored()                                    && !isUsingCustomRawKeyedState)                            ? rawKeyedStateInputs                            : Collections.emptyList();            timeServiceManager =                    timeServiceManagerProvider.create(                            keyedStatedBackend,                            environment.getUserCodeClassLoader().asClassLoader(),                            keyContext,                            processingTimeService,                            restoredRawKeyedStateTimers);        } else {            timeServiceManager = null;        }        // -------------- Preparing return value --------------        return new StreamOperatorStateContextImpl(                prioritizedOperatorSubtaskStates.isRestored(),                operatorStateBackend,                keyedStatedBackend,                timeServiceManager,                rawOperatorStateInputs,                rawKeyedStateInputs);     }

到此为止,咱们实现了梳理Flink Kafka Source Operator Checkpoint的状态复原流程,这部分逻辑大抵能够划分为两大部分:基于StateInitializationContext的状态初始化和从Checkpoint复原状态并生成StateInitializationContext。后者的复原过程远比文章中介绍的简单,实际上JobMaster监听到工作失败后,会从Checkpoint长久化数据中装载最近一个快照的状态元数据,而后再将状态重新分配各子工作,特地是利用重启级别的复原,还波及到算子拓扑构造和并行度的扭转,JobMaster状态复原之后再提交工作重启申请,在TaskManager端还可能再从本地快照(如果启用的话)复原状态数据。TaskManager端的状态复原以创立实现StreamOperatorStateContext为标记,它包装了快照复原后的残缺数据,接下来就回到了失常的StreamOperator的InitialState办法调用流程。

总结

本文从Flink Checkpoint的解决流程(包含snapshot的创立和初始化两局部)和Kafka对Flink Checkpoint的反对几个局部,从Flink的代码实现角度来确定Flink的Checkpoint是反对Kafka的数据生产状态保护的,然而这个状态只是从StateInitializationContext对象中获取的,为了进一步验证StateInitializationContext的状态是否从Checkpoint长久化中获取,本文第四局部联合试验,从Flink利用重启和运行时Operator失败重试来梳理Flink的状态复原逻辑,确定Flink是反对从Checkpoint或Savepoint复原状态。

最初,依据前文的剖析,开发者在开发Flink利用时须要留神的是:尽管Flink可能将Kafka生产状态复原到最近一个Checkpoint快照状态,然而无奈防止在两个快照之间的反复生产。一个典型情景是Sink端不反对幂等时,有可能造成数据的反复,例如PrintSink就无奈撤回快照之间输入的数据。另外,在未开启Flink Checkpoint时须要依赖Kafka Client本身的commit的状态来实现状态保护。