乐趣区

关于流计算:读Flink源码谈设计Exactly-Once

本文首发于泊浮目标语雀:https://www.yuque.com/17sing

版本 日期 备注
1.0 2022.2.2 文章首发

0. 前言

将 Flink 利用至生产已有一段时间,刚上生产的时候有幸排查过因数据歪斜引起的 Checkpoint 超时问题——过后简略的理解了相干机制,最近正好在读 Flink 源码,不如趁这个机会搞清楚。

在这里,咱们首先要搞清楚两种 Exactly-Once 的区别:

  • Exactly Once:在计算引擎外部,数据不失落不反复。实质是通过 Flink 开启检查点进行 Barrier 对齐,即可做到。
  • End to End Exactly Once:这意味着从数据读取、引擎解决到写入内部存储的整个过程中,数据都是不失落不反复的。这要求数据源可重放,写入端反对事务的复原和回滚或幂等。

1. 数据歪斜为什么会引起 Checkpoint 超时

做 Checkpoint 时算子会有一个 barrier 的对齐机制(为何肯定要对齐前面会讲到)。以下图为例解说对齐过程:

当两条边下发 barrier 时,barrier1 比 barrier2 先达到了算子,那么算子会将一条边输出的元素缓存起来,直到 barrier2 到了做 Checkpoint 当前才会下发元素。

每个算子对齐 barrier 后,会进行异步状态存储,而后下发 barrier。每个算子做完 Checkpoint 时,会告诉 CheckpointCoordinator。当CheckpointCoordinator 得悉所有算子的 Checkpoint 都做完时,认为本次 Checkpoint 实现。

而在咱们的应用程序中,有一个 map 算子承受了大量数据,导致 barrier 始终没有下发,最终整个 Checkpoint 超时。

2. Checkpoint 的原理

其具体原理能够参考 Flink 团队的论文:Lightweight Asynchronous Snapshots for Distributed Dataflow。简略来说,晚期流计算的容错计划都是周期性做全局状态的快照,但这有两个毛病:

  • 阻塞计算——做快照时是同步阻塞的。
  • 会将以后算子未解决以及正在解决的 record 一起做进快照,因而快照会变得特地大。

而 Flink 是基于 Chandy-Lamport 算法来扩大的——该算法异步地执行快照,同时要求数据源可重放,但依然会存储上游数据。而 Flink 的计划提出的计划在无环图中并不会存储数据。

在 Flink 中(无环有向图),会周期性的插入 Barrier 这个标记,告知上游算子开始做快照。这个算法基于以下前提:

  • 网络传输牢靠,能够做到 FIFO。这里会对算子进行 blockedunblocked操作,如果一个算子是 blocked,它会把从上游通道接管到的所有数据缓存起来,间接收到unblocked 的信号才发送。
  • Task 能够对它们的通道进行以下操作:block, unblock, send messages, broading messages
  • 对于 Source 节点来说,会被形象成 Nil 输出通道。

3. Checkpoint 的实现

在 Flink 中,做 Checkpoint 大抵由以下几步组成:

  1. 可行性查看
  2. JobMaster 告诉 Task 触发检查点
  3. TaskExecutor 执行检查点
  4. JobMaster 确认检查点

接下来,让咱们跟着源码来看一下外面的具体实现。

3.1 可行性查看

参考代码:CheckpointingCoordinator#startTriggeringCheckpoint

  1. 确保作业不是处于敞开中或未启动的状态(见CheckpointPlanCalculator#calculateCheckpointPlan)。
  2. 生成新的 CheckpointingID,并创立一个 PendingCheckpoint——当所有 Task 都实现了 Checkpoint,则会转换成一个 CompletedCheckpoint。同时也会注册一个线程去关注是否有超时的状况,如果超时则会 Abort 以后的 Checkpoint(见CheckpointPlanCalculator#createPendingCheckpoint)。
  3. 触发 MasterHook。局部内部零碎在触发检查点之前,须要做一些扩大逻辑,通过该实现 MasterHook 能够实现告诉机制(见CheckpointPlanCalculator#snapshotMasterState)。
  4. 反复步骤 1,没问题的话告诉 SourceStreamTask 开始触发检查点(见CheckpointPlanCalculator#triggerCheckpointRequest)。

3.2 JobMaster 告诉 Task 触发检查点

CheckpointPlanCalculator#triggerCheckpointRequest 中,会通过 triggerTasks 办法调用到 Execution#triggerCheckpoint 办法。Execution 对应了一个 Task 实例,因而 JobMaster 能够通过外面的 Slot 援用找到其TaskManagerGateway,发送近程申请触发 Checkpoint。

3.3 TaskManager 执行检查点

TaskManager 在代码中的体现为 TaskExecutor。当 JobMaster 触发近程申请至 TaskExecutor 时,handle 的办法为TaskExecutor#triggerCheckpoint,之后便会调用Task#triggerCheckpointBarrier 来做:

  1. 做一些查看,比方 Task 是否是 Running 状态
  2. 触发 Checkpoint:调用CheckpointableTask#triggerCheckpointAsync
  3. 执行检查点:CheckpointableTask#triggerCheckpointAsync。以 StreamTask 实现为例,这里会思考上游曾经 Finish 时如何触发上游 Checkpoint 的状况——通过塞入 CheckpointBarrier 来触发;如果工作没有完结,则调用 StreamTask#triggerCheckpointAsyncInMailbox。最终都会走入SubtaskCheckpointCoordinator#checkpointState 来触发 Checkpoint。
  4. 算子保留快照:调用OperatorChain#broadcastEvent:保留 OperatorState 与 KeyedState。
  5. 调用SubtaskCheckpointCoordinatorImpl#finishAndReportAsync,:异步的汇报以后快照已实现。

3.4 JobMaster 确认检查点

|-- RpcCheckpointResponder
  \-- acknowledgeCheckpoint
|-- JobMaster
  \-- acknowledgeCheckpoint
|-- SchedulerBase
  \-- acknowledgeCheckpoint
|-- ExecutionGraphHandler
  \-- acknowledgeCheckpoint
|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage

在 3.1 中,咱们提到过 PendingCheckpoint。这外面保护了一些状来确保 Task 全副 Ack、Master 全副 Ack。当确认实现后,CheckpointCoordinator将会告诉所有的 Checkpoint 曾经实现。

|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage
  \-- sendAcknowledgeMessages

3.5 检查点复原

该局部代码较为简单,有趣味的同学能够依据相干调用栈自行浏览代码。

|-- Task
  \-- run
  \-- doRun
|-- StreamTask
  \-- invoke
  \-- restoreInternal
  \-- restoreGates
|-- OperatorChain
  \-- initializeStateAndOpenOperators
|-- StreamOperator
  \-- initializeState
|-- StreamOperatorStateHandler
  \-- initializeOperatorState
|-- AbstractStreamOperator
  \-- initializeState
|-- StreamOperatorStateHandler
  \-- initializeOperatorState
|-- CheckpointedStreamOperator
  \-- initializeState #调用用户代码

3.6 End to End Exactly Once

端到端的精准一次实现其实是比拟艰难的——思考一个 Source 对 N 个 Sink 的场景。故此 Flink 设计了相应的接口来保障端到端的精准一次,别离是:

  • TwoPhaseCommitSinkFunction:想做精准一次的 Sink 必须实现此接口。
  • CheckpointedFunction:Checkpoint 被调用时的钩子。
  • CheckpointListener:顾名思义,当 Checkpoint 实现或失败时会告诉此接口的实现者。

目前 Source 和 Sink 全副 ExactlyOnce 实现的只有 Kafka——其上游反对断点读取,上游反对回滚 or 幂等。有趣味的同学能够浏览该接口的相干实现。

可能有同学会好奇为什么 JDBC Sink 没有实现 ExactlyOnce。实质和这个接口的执行形式无奈兼容 JDBC 的事务应用形式——当一个算子象征退出时,是无奈再对之前的事务进行操作的。因而 TwoPhaseCommitSinkFunction 中的 retryCommit 以及 retryRollback 是无奈进行的——见 https://github.com/apache/fli…。JDBC 的 Sink 是基于 XA 实现的,尽可能保障一致性。这里可能又有同学会问了为什么不必 Upset 类的语句,因为这个形式并不通用——对于 Upset 须要一个惟一键,不然性能极差。

4. 小结

本文以问题视角切入 Checkpoint 的原理与实现,并对相干源码做了简略的跟踪。其实代码的线路是比拟清晰的,但波及大量的类——有心的同学可能曾经发现,这是繁多职责准则的体现。TwoPhaseCommitSinkFunction中的实现也是典型的模版办法设计模式。

退出移动版