关于Flink:Flink-Checkpoint是否支持Kafka-数据消费状态的维护

49次阅读

共计 33028 个字符,预计需要花费 83 分钟才能阅读完成。

作者:闻乃松

应用 Flink 实时生产 kafka 数据时候,波及到 offset 的状态保护,为了保障 Flink 作业重启或者运行时的 Operator 级别的失败重试,如果要做到“断点续跑”,须要 Flink 的 Checkpoint 的反对。问题是,如果简略的开启 Flink 的 Checkpoint 机制,而不须要额定的编码工作,是否能达到目标?为答复该问题,本文首先要钻研 Flink 的 Checkpoint 的解决机制,而后再看 Flink 是否反对 Kafka 的状态存储,于是本文分以下四个局部:

  • Flink Checkpoint 状态快照(snapshotState)次要流程
  • Flink Checkpoint 状态初始化(initializeState)次要流程
  • Kafka Source Operator 对 Flink Checkpoint 实现
  • Kafka Source Operator 状态复原

为了精确形容起见,本文以 Flink 1.12.x 版本,Kafka 客户端版本 2.4.x 为例阐明。

Flink Checkpoint 状态快照(snapshotState)次要流程

咱们已知 Flink Checkpoint 由 CheckpointCoordinator 周期性发动,它通过向相干的 tasks 发送触发音讯和从各 tasks 收集确认音讯(Ack)来实现 checkpoint。这里省略 CheckpointCoordinator 发动调用逻辑解析,直奔音讯受体 TaskExecutor,来看 Checkpoint 的执行流程,在 TaskExecutor 中获取 Task 实例,触发 triggerCheckpointBarrier:

TaskExecutor.class

    @Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions) {
            ...
        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

            return CompletableFuture.completedFuture(Acknowledge.get());
          ...
        }
    }

Task 是在 TaskExecutor 中的调度执行单元,也响应 Checkpoint 申请:

Task.class
        public void triggerCheckpointBarrier(
        final long checkpointID,
        final long checkpointTimestamp,
        final CheckpointOptions checkpointOptions) {
                    ...
                    final CheckpointMetaData checkpointMetaData =
            new CheckpointMetaData(checkpointID, checkpointTimestamp);
                    invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                    ...
        }

其中的看点是 invokable,为 AbstractInvokable 类型的对象,依据调用类动静实例化:

// now load and instantiate the task's invokable code
AbstractInvokable invokable =
        loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);

其中 nameOfInvokableClass 参数 在 Task 初始化时传入,动态创建 AbstractInvokable 实例,比方以一个 SourceOperator 为例,其类名称为:

org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask

从 SourceOperatorStreamTask 类定义来看,它又是 StreamTask 的子类:

class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>>

triggerCheckpointAsync 办法接连调用 SourceOperatorStreamTask 和 StreamTask 类的 triggerCheckpointAsync 办法,次要逻辑是在 StreamTask 的 triggerCheckpointAsync 办法中:

StreamTask.class

    @Override
    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
      ...
      triggerCheckpoint(checkpointMetaData, checkpointOptions)
      ...
      }


  private boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
            throws Exception {
            // No alignment if we inject a checkpoint
            CheckpointMetricsBuilder checkpointMetrics =
                    new CheckpointMetricsBuilder()
                            .setAlignmentDurationNanos(0L)
                            .setBytesProcessedDuringAlignment(0L);
            ...

            boolean success =
                    performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
            if (!success) {declineCheckpoint(checkpointMetaData.getCheckpointId());
            }
            return success;
  }

    private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetricsBuilder checkpointMetrics)
            throws Exception {
            ...
        subtaskCheckpointCoordinator.checkpointState(
                                checkpointMetaData,
                                checkpointOptions,
                                checkpointMetrics,
                                operatorChain,
                                this::isRunning);
      ...
    }

其中 subtaskCheckpointCoordinator 是 SubtaskCheckpointCoordinatorImpl 类型实例,负责协调子工作相干的 checkpoint 工作:

/**
 * Coordinates checkpointing-related work for a subtask (i.e. {@link
 * org.apache.flink.runtime.taskmanager.Task Task} and {@link StreamTask}). Responsibilities:
 *
 * <ol>
 *   <li>build a snapshot (invokable)
 *   <li>report snapshot to the JobManager
 *   <li>action upon checkpoint notification
 *   <li>maintain storage locations
 * </ol>
 */
@Internal
public interface SubtaskCheckpointCoordinator extends Closeable

上面是 SubtaskCheckpointCoordinatorImpl 实现类中的 checkpointState 次要逻辑:

SubtaskCheckpointCoordinatorImpl.class

    @Override
    public void checkpointState(
            CheckpointMetaData metadata,
            CheckpointOptions options,
            CheckpointMetricsBuilder metrics,
            OperatorChain<?, ?> operatorChain,
            Supplier<Boolean> isRunning)
            throws Exception {
        // All of the following steps happen as an atomic step from the perspective of barriers and
        // records/watermarks/timers/callbacks.
        // We generally try to emit the checkpoint barrier as soon as possible to not affect
        // downstream
        // checkpoint alignments

        if (lastCheckpointId >= metadata.getCheckpointId()) {
            LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}",
                    lastCheckpointId,
                    metadata.getCheckpointId());
            channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
            checkAndClearAbortedStatus(metadata.getCheckpointId());
            return;
        }

            lastCheckpointId = metadata.getCheckpointId();
        if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
            // broadcast cancel checkpoint marker to avoid downstream back-pressure due to
            // checkpoint barrier align.
            operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
            return;
                }

        // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
        //           The pre-barrier work should be nothing or minimal in the common case.
        operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());

        // Step (2): Send the checkpoint barrier downstream
        operatorChain.broadcastEvent(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
                options.isUnalignedCheckpoint());

            // Step (3): Prepare to spill the in-flight buffers for input and output
        if (options.isUnalignedCheckpoint()) {
            // output data already written while broadcasting event
            channelStateWriter.finishOutput(metadata.getCheckpointId());
        }
        // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact
        // progress of the
        // streaming topology
          Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
                new HashMap<>(operatorChain.getNumberOfOperators());
                if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);
          } else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
          }
        }

在 Step (1) 中看到 prepareSnapshotPreBarrier,在正式 snapshot 之前做了一些轻量级的筹备工作,具体操作实现在 OperatorChain 中,顺次调用链中每个 StreamOperator 的 prepareSnapshotPreBarrier 办法:

OperatorChain.class

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        // go forward through the operator chain and tell each operator
        // to prepare the checkpoint
        for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {if (!operatorWrapper.isClosed()) {operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
            }
        }
    }

通过一系列快照查看验证、快照前的筹备、向上游播送事件操作,最终落脚到本类的 checkpointStreamOperator 办法:

SubtaskCheckpointCoordinatorImpl.class

    private static OperatorSnapshotFutures checkpointStreamOperator(
            StreamOperator<?> op,
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory storageLocation,
            Supplier<Boolean> isRunning)
            throws Exception {
        try {
            return op.snapshotState(checkpointMetaData.getCheckpointId(),
                    checkpointMetaData.getTimestamp(),
                    checkpointOptions,
                    storageLocation);
        } catch (Exception ex) {if (isRunning.get()) {LOG.info(ex.getMessage(), ex);
            }
            throw ex;
        }
    }

该办法又调用 AbstractStreamOperator 的 snapshotState:

AbstractStreamOperator.class

    @Override
    public final OperatorSnapshotFutures snapshotState(
            long checkpointId,
            long timestamp,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory)
            throws Exception {
        return stateHandler.snapshotState(
                this,
                Optional.ofNullable(timeServiceManager),
                getOperatorName(),
                checkpointId,
                timestamp,
                checkpointOptions,
                factory,
                isUsingCustomRawKeyedState());
    }

snapshotState 又将 checkpoint 逻辑委派到 StreamOperatorStateHandler。StreamOperatorStateHandler 的逻辑下文再介绍。梳理上述 snapshot 逻辑流程,可视化体现为:

Flink Checkpoint 状态初始化(initializeState)次要流程

上文中提到的 Task 初始化启动会调用 AbstractInvokable 的 invoke 办法,

// now load and instantiate the task's invokable code
AbstractInvokable invokable =
        loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
// run the invokable
invokable.invoke();

invoke 在其父类 StreamTask 中的 invoke 办法实现调用前、运行事件循环和调用后的 Template 策略动作:

StreamTask.class

    @Override
    public final void invoke() throws Exception {beforeInvoke();

        // final check to exit early before starting to run
        if (canceled) {throw new CancelTaskException();
        }

        // let the task do its work
        runMailboxLoop();

        // if this left the run() method cleanly despite the fact that this was canceled,
        // make sure the "clean shutdown" is not attempted
        if (canceled) {throw new CancelTaskException();
        }

        afterInvoke();}

在 beforeInvoke 办法中通过 operatorChain 的 initializeStateAndOpenOperators 进行状态初始化:

StreamTask.class

    protected void beforeInvoke() throws Exception {operatorChain = new OperatorChain<>(this, recordWriter);
        ...
        operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
      ...
    }

在 operatorChain 中触发以后链中所有 StreamOperator:

OperatorChain.class

protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {StreamOperator<?> operator = operatorWrapper.getStreamOperator();
        operator.initializeState(streamTaskStateInitializer);
        operator.open();}
}

持续跟进 AbstractStreamOperator 调用 initializeState:

AbstractStreamOperator.class

   @Override
   public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
            throws Exception {
        final StreamOperatorStateContext context =
                streamTaskStateManager.streamOperatorStateContext(getOperatorID(),
                        getClass().getSimpleName(),
                        getProcessingTimeService(),
                        this,
                        keySerializer,
                        streamTaskCloseableRegistry,
                        metrics,
                        config.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                ManagedMemoryUseCase.STATE_BACKEND,
                                runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
                                runtimeContext.getUserCodeClassLoader()),
                        isUsingCustomRawKeyedState());
      stateHandler =
            new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
    timeServiceManager = context.internalTimerServiceManager();
    stateHandler.initializeOperatorState(this);
}

其中 stateHandler.initializeOperatorState 又将 initializeOperatorState 委派到了 StreamOperatorStateHandler 类,在其中实现具体 StreamOperator 子类的状态初始化。梳理初始化状态的逻辑,可视化体现为:

Kafka Source Operator 对 Flink Checkpoint 的反对

当初将 Checkpoint 的状态快照过程和状态初始化过程画在一起,会看到两者都汇总委派到 StreamOperatorStateHandler 来执行:

StreamOperatorStateHandler 类中 initializeOperatorState 和 snapshotState 办法实现如下,次要实现的是参数的构建:

StreamOperatorStateHandler.class

    public void initializeOperatorState(CheckpointedStreamOperator streamOperator)
            throws Exception {
        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs =
                context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs =
                context.rawOperatorStateInputs();

      StateInitializationContext initializationContext =
        new StateInitializationContextImpl(context.isRestored(), // information whether we restore or start for
        // the first time
        operatorStateBackend, // access to operator state backend
        keyedStateStore, // access to keyed state backend
        keyedStateInputs, // access to keyed state stream
        operatorStateInputs); // access to operator state stream

      streamOperator.initializeState(initializationContext);
    }


     public OperatorSnapshotFutures snapshotState(
            CheckpointedStreamOperator streamOperator,
            Optional<InternalTimeServiceManager<?>> timeServiceManager,
            String operatorName,
            long checkpointId,
            long timestamp,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory,
            boolean isUsingCustomRawKeyedState)
            throws CheckpointException {
        KeyGroupRange keyGroupRange =
                null != keyedStateBackend
                        ? keyedStateBackend.getKeyGroupRange()
                        : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

        StateSnapshotContextSynchronousImpl snapshotContext =
                new StateSnapshotContextSynchronousImpl(checkpointId, timestamp, factory, keyGroupRange, closeableRegistry);

        snapshotState(
                streamOperator,
                timeServiceManager,
                operatorName,
                checkpointId,
                timestamp,
                checkpointOptions,
                factory,
                snapshotInProgress,
                snapshotContext,
                isUsingCustomRawKeyedState);

        return snapshotInProgress;
    }


        void snapshotState(
            CheckpointedStreamOperator streamOperator,
            Optional<InternalTimeServiceManager<?>> timeServiceManager,
            String operatorName,
            long checkpointId,
            long timestamp,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory,
            OperatorSnapshotFutures snapshotInProgress,
            StateSnapshotContextSynchronousImpl snapshotContext,
            boolean isUsingCustomRawKeyedState)
            throws CheckpointException {if (timeServiceManager.isPresent()) {
                checkState(
                        keyedStateBackend != null,
                        "keyedStateBackend should be available with timeServiceManager");
                final InternalTimeServiceManager<?> manager = timeServiceManager.get();

                if (manager.isUsingLegacyRawKeyedStateSnapshots()) {
                    checkState(
                            !isUsingCustomRawKeyedState,
                            "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
                    manager.snapshotToRawKeyedState(snapshotContext.getRawKeyedOperatorStateOutput(), operatorName);
                }
            }
            streamOperator.snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                        operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                        keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        }

值得一提的是两个办法中的 StreamOperator 参数要求是 CheckpointedStreamOperator 类型:

public interface CheckpointedStreamOperator {void initializeState(StateInitializationContext context) throws Exception;

    void snapshotState(StateSnapshotContext context) throws Exception;
}

比拟下 StreamOperator,其跟 Checkpoint 相干的三个办法定义如下:

尽管办法名字一样,参数不同,其实不必管这些,只须要晓得 StreamOperator 将快照的相干逻辑委派到了 StreamOperatorStateHandler,真正的快照逻辑都在 CheckpointedStreamOperator 中实现即可,于是,要想实现自定义快照逻辑,只须要实现 CheckpointedStreamOperato 接口,以 SourceOperator 为例,类定义:

public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>
        implements OperatorEventHandler, PushingAsyncDataInput<OUT>

而 AbstractStreamOperator 的类定义为:

public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>,
                SetupableStreamOperator<OUT>,
                CheckpointedStreamOperator,
                Serializable

AbstractStreamOperator 曾经帮咱们实现了相干办法,只须要 extend AbstractStreamOperator,依然以 SourceOperator 为例来看它的实现:

SourceOperator.class

    private ListState<SplitT> readerState;
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);
        final ListState<byte[]> rawState =
                context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
        readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {long checkpointId = context.getCheckpointId();
        LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);
        readerState.update(sourceReader.snapshotState(checkpointId));
    }

可见 SourceOperator 将快照状态存储在内存中的 SimpleVersionedListState 中,snapshotState 的具体操作转给了 SourceReader,来看 Flink Kafka Connector 提供的 KafkaSourceReader 如何实现 snapshotState:

KafkaSourceReader.class

KafkaSourceReader extends SourceReaderBase implements SourceReader

        @Override
    public List<KafkaPartitionSplit> snapshotState(long checkpointId) {List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
        if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {offsetsToCommit.put(checkpointId, Collections.emptyMap());
        } else {
            Map<TopicPartition, OffsetAndMetadata> offsetsMap =
                    offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
            // Put the offsets of the active splits.
            for (KafkaPartitionSplit split : splits) {
                // If the checkpoint is triggered before the partition starting offsets
                // is retrieved, do not commit the offsets for those partitions.
                if (split.getStartingOffset() >= 0) {
                    offsetsMap.put(split.getTopicPartition(),
                            new OffsetAndMetadata(split.getStartingOffset()));
                }
            }
            // Put offsets of all the finished splits.
            offsetsMap.putAll(offsetsOfFinishedSplits);
        }
        return splits;
    }

下面是基于内存的状态存储,而长久化还须要内部零碎的反对,持续探索 StreamOperatorStateHandler 的 snapshot 办法逻辑,其中有这么一段:

if (null != operatorStateBackend) {
    snapshotInProgress.setOperatorStateManagedFuture(
      operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
  }

当配置了长久化后端存储,才会将状态数据长久化,以默认的 OperatorStateBackend 为例:

DefaultOperatorStateBackend.class

   @Override
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory streamFactory,
            @Nonnull CheckpointOptions checkpointOptions)
            throws Exception {long syncStartTime = System.currentTimeMillis();

        RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
                snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
        return snapshotRunner;
    }

snapshotStrategy.snapshot 执行逻辑实现在 DefaultOperatorStateBackendSnapshotStrategy 中:

DefaultOperatorStateBackendSnapshotStrategy.class

      @Override
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
            final long checkpointId,
            final long timestamp,
            @Nonnull final CheckpointStreamFactory streamFactory,
            @Nonnull final CheckpointOptions checkpointOptions)
            throws IOException {

        ...
        for (Map.Entry<String, PartitionableListState<?>> entry :
                                registeredOperatorStatesDeepCopies.entrySet()) {
          operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
        }
            ...
        // ... write them all in the checkpoint stream ...
        DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

        OperatorBackendSerializationProxy backendSerializationProxy =
        new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

        backendSerializationProxy.write(dov);
         ...
       for (Map.Entry<String, PartitionableListState<?>> entry :
            registeredOperatorStatesDeepCopies.entrySet()) {PartitionableListState<?> value = entry.getValue();
         long[] partitionOffsets = value.write(localOut);
       }
        }

状态数据有元数据信息和状态自身的数据,状态数据通过 PartitionableListState 的 write 办法写入文件系统:

PartitionableListState.class

     public long[] write(FSDataOutputStream out) throws IOException {long[] partitionOffsets = new long[internalList.size()];

        DataOutputView dov = new DataOutputViewStreamWrapper(out);

        for (int i = 0; i < internalList.size(); ++i) {S element = internalList.get(i);
            partitionOffsets[i] = out.getPos();
            getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
        }

        return partitionOffsets;
    }

Kafka Source Operator 状态复原

下面一部分介绍了 Kafka Source Operator 对 Flink Checkpoint 的反对,也是波及到 snapshot 和 initialState 两个局部,但次要介绍了 snapshot 的逻辑,再来看 SourceOperator 的如何初始化状态的:

SourceOperator.class

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);
        final ListState<byte[]> rawState =
                context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
        readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
    }

context.getOperatorStateStore 应用了 DefaultOperatorStateBackend 的 getListState 办法:

DefaultOperatorStateBackend.class

    private final Map<String, PartitionableListState<?>> registeredOperatorStates;

        @Override
    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    private <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode)
            throws StateMigrationException {
      ...
       PartitionableListState<S> partitionableListState =
                (PartitionableListState<S>) registeredOperatorStates.get(name);
      ...
        return partitionableListState;
    }

而 getListState 仅仅是从名叫 registeredOperatorStates 的 Map> 中获取,那问题来了,registeredOperatorStates 从哪里来?为了找到答案,这部分通过一个 Kafka 生产示例来演示和阐明,首先创立 KafkaSource:

KafkaSource<MetaAndValue> kafkaSource =
        KafkaSource.<ObjectNode>builder()
                .setBootstrapServers(Constants.kafkaServers)
                .setGroupId(KafkaSinkIcebergExample.class.getName())
                .setTopics(topic)
                .setDeserializer(recordDeserializer)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setBounded(OffsetsInitializer.latest())
                .setProperties(properties)
                .build();

并且设置重启策略:StreamOperator 失败后立刻重启一次好快照距离:100 毫秒 1 次:

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(0)));
env.getCheckpointConfig().setCheckpointInterval(1 * 100L);

而后在 Kafka 反序列时候,设置解析 100 条记录后抛出异样:

public static class TestingKafkaRecordDeserializer
        implements KafkaRecordDeserializer<MetaAndValue> {
    private static final long serialVersionUID = -3765473065594331694L;
    private transient Deserializer<String> deserializer = new StringDeserializer();

    int parseNum=0;
    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<MetaAndValue> collector) {if (deserializer == null)
                deserializer = new StringDeserializer();
            MetaAndValue metaAndValue=new MetaAndValue(new TopicPartition(record.topic(), record.partition()),
                    deserializer.deserialize(record.topic(), record.value()), record.offset());
            if(parseNum++>100) {Map<String,Object> metaData=metaAndValue.getMetaData();
                throw new RuntimeException("for test");
            }
            collector.collect(metaAndValue);
    }
}

JobMaster 初始化并创立 Scheduler 时候从 Checkpoint 进行状态初始化,如果从 Checkpoint 初始化失败,则试图从 Savepoint 复原。

SchedulerBase.class

  private ExecutionGraph createAndRestoreExecutionGraph(
        JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
        ShuffleMaster<?> shuffleMaster,
        JobMasterPartitionTracker partitionTracker,
        ExecutionDeploymentTracker executionDeploymentTracker,
        long initializationTimestamp)
        throws Exception {

    ExecutionGraph newExecutionGraph =
            createExecutionGraph(
                    currentJobManagerJobMetricGroup,
                    shuffleMaster,
                    partitionTracker,
                    executionDeploymentTracker,
                    initializationTimestamp);

    final CheckpointCoordinator checkpointCoordinator =
            newExecutionGraph.getCheckpointCoordinator();

    if (checkpointCoordinator != null) {
        // check whether we find a valid checkpoint
        if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(new HashSet<>(newExecutionGraph.getAllVertices().values()))) {

            // check whether we can restore from a savepoint
            tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
        }
    }

    return newExecutionGraph;
}

最初回到相熟的 CheckpointCoordinator,在其办法 restoreLatestCheckpointedStateInternal 中从 Checkpoint 目录加载最新快照状态:

CheckpointCoordinator.class

    public boolean restoreInitialCheckpointIfPresent(final Set<ExecutionJobVertex> tasks)
            throws Exception {
        final OptionalLong restoredCheckpointId =
                restoreLatestCheckpointedStateInternal(
                        tasks,
                        OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
                        false, // initial checkpoints exist only on JobManager failover. ok if not
                        // present.
                        false); // JobManager failover means JobGraphs match exactly.

        return restoredCheckpointId.isPresent();}

        private OptionalLong restoreLatestCheckpointedStateInternal(
      final Set<ExecutionJobVertex> tasks,
      final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior,
      final boolean errorIfNoCheckpoint,
      final boolean allowNonRestoredState)
      throws Exception {
      ...
        // Recover the checkpoints, TODO this could be done only when there is a new leader, not
        // on each recovery
        completedCheckpointStore.recover();
      // Restore from the latest checkpoint
      CompletedCheckpoint latest =
        completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);
      ...
        // re-assign the task states
        final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();

      StateAssignmentOperation stateAssignmentOperation =
        new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);

      stateAssignmentOperation.assignStates();
      ...
   }

下面是利用初始启动的状态复原逻辑,那在利用运行期间的 Operator 失败重启的逻辑又是什么样的呢?实际上 JobMaster 会监听工作运行状态,并做相应解决,比方上面一个失败解决链路逻辑:

UpdateSchedulerNgOnInternalFailuresListener.class

    @Override
    public void notifyTaskFailure(
            final ExecutionAttemptID attemptId,
            final Throwable t,
            final boolean cancelTask,
            final boolean releasePartitions) {

        final TaskExecutionState state =
                new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t);
        schedulerNg.updateTaskExecutionState(new TaskExecutionStateTransition(state, cancelTask, releasePartitions));
    }
SchedulerBase.class
@Override
public final boolean updateTaskExecutionState(final TaskExecutionStateTransition taskExecutionState) {
    final Optional<ExecutionVertexID> executionVertexId =
            getExecutionVertexId(taskExecutionState.getID());

    boolean updateSuccess = executionGraph.updateState(taskExecutionState);

    if (updateSuccess) {checkState(executionVertexId.isPresent());

        if (isNotifiable(executionVertexId.get(), taskExecutionState)) {updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
        }
        return true;
    } else {return false;}
}
DefaultScheduler.class

        @Override
    protected void updateTaskExecutionStateInternal(
            final ExecutionVertexID executionVertexId,
            final TaskExecutionStateTransition taskExecutionState) {

        schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState());
        maybeHandleTaskFailure(taskExecutionState, executionVertexId);
    }

    private void maybeHandleTaskFailure(
            final TaskExecutionStateTransition taskExecutionState,
            final ExecutionVertexID executionVertexId) {if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {final Throwable error = taskExecutionState.getError(userCodeLoader);
            handleTaskFailure(executionVertexId, error);
        }
    }

    private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {setGlobalFailureCause(error);
        notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
        final FailureHandlingResult failureHandlingResult =
                executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
        maybeRestartTasks(failureHandlingResult);
    }
    private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {if (failureHandlingResult.canRestart()) {
            // 调用 restartTasks
            restartTasksWithDelay(failureHandlingResult);
        } else {failJob(failureHandlingResult.getError());
        }
    }

    private Runnable restartTasks(
            final Set<ExecutionVertexVersion> executionVertexVersions,
            final boolean isGlobalRecovery) {return () -> {
            final Set<ExecutionVertexID> verticesToRestart =
                    executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);

            removeVerticesFromRestartPending(verticesToRestart);

            resetForNewExecutions(verticesToRestart);

            try {restoreState(verticesToRestart, isGlobalRecovery);
            } catch (Throwable t) {handleGlobalFailure(t);
                return;
            }

            schedulingStrategy.restartTasks(verticesToRestart);
        };
    }
SchedulerBase.class

  protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery)
            throws Exception {
      ...
      if (isGlobalRecovery) {
          final Set<ExecutionJobVertex> jobVerticesToRestore =
                  getInvolvedExecutionJobVertices(vertices);

          checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);

      } else {
          final Map<ExecutionJobVertex, IntArrayList> subtasksToRestore =
                  getInvolvedExecutionJobVerticesAndSubtasks(vertices);

          final OptionalLong restoredCheckpointId =
                  checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(subtasksToRestore.keySet());

          // Ideally, the Checkpoint Coordinator would call OperatorCoordinator.resetSubtask, but
          // the Checkpoint Coordinator is not aware of subtasks in a local failover. It always
          // assigns state to all subtasks, and for the subtask execution attempts that are still
          // running (or not waiting to be deployed) the state assignment has simply no effect.
          // Because of that, we need to do the "subtask restored" notification here.
          // Once the Checkpoint Coordinator is properly aware of partial (region) recovery,
          // this code should move into the Checkpoint Coordinator.
          final long checkpointId =
                  restoredCheckpointId.orElse(OperatorCoordinator.NO_CHECKPOINT);
          notifyCoordinatorsOfSubtaskRestore(subtasksToRestore, checkpointId);
      }
      ...
    }

上述整个链路波及到 DefaultScheduler 和 SchedulerBase,实际上还是在一个运行对象实例中进行的,两者关系为:

public abstract class SchedulerBase implements SchedulerNG
public class DefaultScheduler extends SchedulerBase implements SchedulerOperations

最初又回到了相熟的 CheckpointCoordinator:

CheckpointCoordinator.class

public OptionalLong restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception {
    // when restoring subtasks only we accept potentially unmatched state for the
    // following reasons
    //   - the set frequently does not include all Job Vertices (only the ones that are part
    //     of the restarted region), meaning there will be unmatched state by design.
    //   - because what we might end up restoring from an original savepoint with unmatched
    //     state, if there is was no checkpoint yet.
    return restoreLatestCheckpointedStateInternal(
            tasks,
            OperatorCoordinatorRestoreBehavior
                    .SKIP, // local/regional recovery does not reset coordinators
            false, // recovery might come before first successful checkpoint
            true); // see explanation above
    }

在 schedulingStrategy.restartTasks 中,每个 Task 调配的状态被封装在 JobManagerTaskRestore 中,jobManagerTaskRestore 会作为 TaskDeploymentDescriptor 的一个属性下发到 TaskEXecutor 中。当 TaskDeploymentDescriptor 被提交给 TaskExecutor 之后,TaskExcutor 会应用 TaskStateManager 用于治理以后 Task 的状态,TaskStateManager 对象会基于调配的 JobManagerTaskRestore 和本地状态存储 TaskLocalStateStore 进行创立:

TaskEXecutor.class

  @Override
  public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
        ...
      final TaskLocalStateStore localStateStore =
        localStateStoresManager.localStateStoreForSubtask(
        jobId,
        tdd.getAllocationId(),
        taskInformation.getJobVertexId(),
        tdd.getSubtaskIndex());

    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();

    final TaskStateManager taskStateManager =
      new TaskStateManagerImpl(
      jobId,
      tdd.getExecutionAttemptId(),
      localStateStore,
      taskRestore,
      checkpointResponder);
    ...
    // 启动 Task
    }

启动 Task 会调用 StreamTask 的 invoke 办法,并在 beforeInvoke 中进行如下初始化:

StreamTask.class
    protected void beforeInvoke() throws Exception {
            ...         
        operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
            ...
            }

        public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        InternalTimeServiceManager.Provider timerServiceProvider =
                configuration.getTimerServiceProvider(getUserCodeClassLoader());
        return new StreamTaskStateInitializerImpl(getEnvironment(),
                stateBackend,
                TtlTimeProvider.DEFAULT,
                timerServiceProvider != null
                        ? timerServiceProvider
                        : InternalTimeServiceManagerImpl::create);
    }

再回到 operatorChain 的 initializeStateAndOpenOperators 办法:

OperatorChain.class

  protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {StreamOperator<?> operator = operatorWrapper.getStreamOperator();
          operator.initializeState(streamTaskStateInitializer);
          operator.open();}
  }

其中 StreamOperator 的 initializeState 调用了子类 AbstractStreamOperator 的 initializeState,并在其中创立 StreamOperatorStateContext:

AbstractStreamOperator.class

  @Override
  public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
          throws Exception {

      final TypeSerializer<?> keySerializer =
              config.getStateKeySerializer(getUserCodeClassloader());

      final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());
      final CloseableRegistry streamTaskCloseableRegistry =
              Preconditions.checkNotNull(containingTask.getCancelables());

      final StreamOperatorStateContext context =
              streamTaskStateManager.streamOperatorStateContext(getOperatorID(),
                      getClass().getSimpleName(),
                      getProcessingTimeService(),
                      this,
                      keySerializer,
                      streamTaskCloseableRegistry,
                      metrics,
                      config.getManagedMemoryFractionOperatorUseCaseOfSlot(
                              ManagedMemoryUseCase.STATE_BACKEND,
                              runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
                              runtimeContext.getUserCodeClassLoader()),
                      isUsingCustomRawKeyedState());

      stateHandler =
              new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
      timeServiceManager = context.internalTimerServiceManager();
      stateHandler.initializeOperatorState(this);
      runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
  }

从快照中读取状态数据并复原的理论动作就暗藏在 streamOperatorStateContext 的创立过程中:

StreamTaskStateInitializerImpl.class

    @Override
    public StreamOperatorStateContext streamOperatorStateContext(
        // -------------- Keyed State Backend --------------
        keyedStatedBackend =
                keyedStatedBackend(
                        keySerializer,
                        operatorIdentifierText,
                        prioritizedOperatorSubtaskStates,
                        streamTaskCloseableRegistry,
                        metricGroup,
                        managedMemoryFraction);

        // -------------- Operator State Backend --------------
        operatorStateBackend =
                operatorStateBackend(
                        operatorIdentifierText,
                        prioritizedOperatorSubtaskStates,
                        streamTaskCloseableRegistry);

        // -------------- Raw State Streams --------------
        rawKeyedStateInputs =
                rawKeyedStateInputs(
                        prioritizedOperatorSubtaskStates
                                .getPrioritizedRawKeyedState()
                                .iterator());
        streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

        rawOperatorStateInputs =
                rawOperatorStateInputs(
                        prioritizedOperatorSubtaskStates
                                .getPrioritizedRawOperatorState()
                                .iterator());
        streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

        // -------------- Internal Timer Service Manager --------------
        if (keyedStatedBackend != null) {

            // if the operator indicates that it is using custom raw keyed state,
            // then whatever was written in the raw keyed state snapshot was NOT written
            // by the internal timer services (because there is only ever one user of raw keyed
            // state);
            // in this case, timers should not attempt to restore timers from the raw keyed
            // state.
            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =
                    (prioritizedOperatorSubtaskStates.isRestored()
                                    && !isUsingCustomRawKeyedState)
                            ? rawKeyedStateInputs
                            : Collections.emptyList();

            timeServiceManager =
                    timeServiceManagerProvider.create(
                            keyedStatedBackend,
                            environment.getUserCodeClassLoader().asClassLoader(),
                            keyContext,
                            processingTimeService,
                            restoredRawKeyedStateTimers);
        } else {timeServiceManager = null;}

        // -------------- Preparing return value --------------

        return new StreamOperatorStateContextImpl(prioritizedOperatorSubtaskStates.isRestored(),
                operatorStateBackend,
                keyedStatedBackend,
                timeServiceManager,
                rawOperatorStateInputs,
                rawKeyedStateInputs);
     }

到此为止,咱们实现了梳理 Flink Kafka Source Operator Checkpoint 的状态复原流程,这部分逻辑大抵能够划分为两大部分:基于 StateInitializationContext 的状态初始化和从 Checkpoint 复原状态并生成 StateInitializationContext。后者的复原过程远比文章中介绍的简单,实际上 JobMaster 监听到工作失败后,会从 Checkpoint 长久化数据中装载最近一个快照的状态元数据,而后再将状态重新分配各子工作,特地是利用重启级别的复原,还波及到算子拓扑构造和并行度的扭转,JobMaster 状态复原之后再提交工作重启申请,在 TaskManager 端还可能再从本地快照(如果启用的话)复原状态数据。TaskManager 端的状态复原以创立实现 StreamOperatorStateContext 为标记,它包装了快照复原后的残缺数据,接下来就回到了失常的 StreamOperator 的 InitialState 办法调用流程。

总结

本文从 Flink Checkpoint 的解决流程(包含 snapshot 的创立和初始化两局部)和 Kafka 对 Flink Checkpoint 的反对几个局部,从 Flink 的代码实现角度来确定 Flink 的 Checkpoint 是反对 Kafka 的数据生产状态保护的,然而这个状态只是从 StateInitializationContext 对象中获取的,为了进一步验证 StateInitializationContext 的状态是否从 Checkpoint 长久化中获取,本文第四局部联合试验,从 Flink 利用重启和运行时 Operator 失败重试来梳理 Flink 的状态复原逻辑,确定 Flink 是反对从 Checkpoint 或 Savepoint 复原状态。

最初,依据前文的剖析,开发者在开发 Flink 利用时须要留神的是:尽管 Flink 可能将 Kafka 生产状态复原到最近一个 Checkpoint 快照状态,然而无奈防止在两个快照之间的反复生产。一个典型情景是 Sink 端不反对幂等时,有可能造成数据的反复,例如 PrintSink 就无奈撤回快照之间输入的数据。另外,在未开启 Flink Checkpoint 时须要依赖 Kafka Client 本身的 commit 的状态来实现状态保护。

正文完
 0