关于流计算:读Flink源码谈设计Exactly-Once

本文首发于泊浮目标语雀:https://www.yuque.com/17sing

版本 日期 备注
1.0 2022.2.2 文章首发

0.前言

将Flink利用至生产已有一段时间,刚上生产的时候有幸排查过因数据歪斜引起的Checkpoint超时问题——过后简略的理解了相干机制,最近正好在读Flink源码,不如趁这个机会搞清楚。

在这里,咱们首先要搞清楚两种Exactly-Once的区别:

  • Exactly Once:在计算引擎外部,数据不失落不反复。实质是通过Flink开启检查点进行Barrier对齐,即可做到。
  • End to End Exactly Once:这意味着从数据读取、引擎解决到写入内部存储的整个过程中,数据都是不失落不反复的。这要求数据源可重放,写入端反对事务的复原和回滚或幂等。

1. 数据歪斜为什么会引起Checkpoint超时

做Checkpoint时算子会有一个barrier的对齐机制(为何肯定要对齐前面会讲到)。以下图为例解说对齐过程:

当两条边下发barrier时,barrier1比barrier2先达到了算子,那么算子会将一条边输出的元素缓存起来,直到barrier2到了做Checkpoint当前才会下发元素。

每个算子对齐barrier后,会进行异步状态存储,而后下发barrier。每个算子做完Checkpoint时,会告诉CheckpointCoordinator。当CheckpointCoordinator得悉所有算子的Checkpoint都做完时,认为本次Checkpoint实现。

而在咱们的应用程序中,有一个map算子承受了大量数据,导致barrier始终没有下发,最终整个Checkpoint超时。

2. Checkpoint的原理

其具体原理能够参考Flink团队的论文:Lightweight Asynchronous Snapshots for Distributed Dataflow。简略来说,晚期流计算的容错计划都是周期性做全局状态的快照,但这有两个毛病:

  • 阻塞计算——做快照时是同步阻塞的。
  • 会将以后算子未解决以及正在解决的record一起做进快照,因而快照会变得特地大。

而Flink是基于Chandy-Lamport 算法来扩大的——该算法异步地执行快照,同时要求数据源可重放,但依然会存储上游数据。而Flink的计划提出的计划在无环图中并不会存储数据。

在Flink中(无环有向图),会周期性的插入Barrier这个标记,告知上游算子开始做快照。这个算法基于以下前提:

  • 网络传输牢靠,能够做到FIFO。这里会对算子进行blockedunblocked操作,如果一个算子是blocked,它会把从上游通道接管到的所有数据缓存起来,间接收到unblocked的信号才发送。
  • Task能够对它们的通道进行以下操作:block, unblock, send messages, broading messages
  • 对于Source节点来说,会被形象成Nil输出通道。

3. Checkpoint的实现

在Flink中,做Checkpoint大抵由以下几步组成:

  1. 可行性查看
  2. JobMaster告诉Task触发检查点
  3. TaskExecutor执行检查点
  4. JobMaster确认检查点

接下来,让咱们跟着源码来看一下外面的具体实现。

3.1 可行性查看

参考代码:CheckpointingCoordinator#startTriggeringCheckpoint

  1. 确保作业不是处于敞开中或未启动的状态(见CheckpointPlanCalculator#calculateCheckpointPlan)。
  2. 生成新的CheckpointingID,并创立一个PendingCheckpoint——当所有Task都实现了Checkpoint,则会转换成一个CompletedCheckpoint。同时也会注册一个线程去关注是否有超时的状况,如果超时则会Abort以后的Checkpoint(见CheckpointPlanCalculator#createPendingCheckpoint)。
  3. 触发MasterHook。局部内部零碎在触发检查点之前,须要做一些扩大逻辑,通过该实现MasterHook能够实现告诉机制(见CheckpointPlanCalculator#snapshotMasterState)。
  4. 反复步骤1,没问题的话告诉SourceStreamTask开始触发检查点(见CheckpointPlanCalculator#triggerCheckpointRequest)。

3.2 JobMaster告诉Task触发检查点

CheckpointPlanCalculator#triggerCheckpointRequest中,会通过triggerTasks办法调用到Execution#triggerCheckpoint办法。Execution对应了一个Task实例,因而JobMaster能够通过外面的Slot援用找到其TaskManagerGateway,发送近程申请触发Checkpoint。

3.3 TaskManager执行检查点

TaskManager在代码中的体现为TaskExecutor。当JobMaster触发近程申请至TaskExecutor时,handle的办法为TaskExecutor#triggerCheckpoint,之后便会调用Task#triggerCheckpointBarrier来做:

  1. 做一些查看,比方Task是否是Running状态
  2. 触发Checkpoint:调用CheckpointableTask#triggerCheckpointAsync
  3. 执行检查点:CheckpointableTask#triggerCheckpointAsync。以StreamTask实现为例,这里会思考上游曾经Finish时如何触发上游Checkpoint的状况——通过塞入CheckpointBarrier来触发;如果工作没有完结,则调用StreamTask#triggerCheckpointAsyncInMailbox。最终都会走入SubtaskCheckpointCoordinator#checkpointState来触发Checkpoint。
  4. 算子保留快照:调用OperatorChain#broadcastEvent:保留OperatorState与KeyedState。
  5. 调用SubtaskCheckpointCoordinatorImpl#finishAndReportAsync,:异步的汇报以后快照已实现。

3.4 JobMaster确认检查点

|-- RpcCheckpointResponder
  \-- acknowledgeCheckpoint
|-- JobMaster
  \-- acknowledgeCheckpoint
|-- SchedulerBase
  \-- acknowledgeCheckpoint
|-- ExecutionGraphHandler
  \-- acknowledgeCheckpoint
|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage

在3.1中,咱们提到过PendingCheckpoint。这外面保护了一些状来确保Task全副Ack、Master全副Ack。当确认实现后, CheckpointCoordinator将会告诉所有的Checkpoint曾经实现。

|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage
  \-- sendAcknowledgeMessages

3.5 检查点复原

该局部代码较为简单,有趣味的同学能够依据相干调用栈自行浏览代码。

|-- Task
  \-- run
  \-- doRun
|-- StreamTask
  \-- invoke
  \-- restoreInternal
  \-- restoreGates
|-- OperatorChain
  \-- initializeStateAndOpenOperators
|-- StreamOperator
  \-- initializeState
|-- StreamOperatorStateHandler
  \-- initializeOperatorState
|-- AbstractStreamOperator
  \-- initializeState
|-- StreamOperatorStateHandler
  \-- initializeOperatorState
|-- CheckpointedStreamOperator
  \-- initializeState #调用用户代码

3.6 End to End Exactly Once

端到端的精准一次实现其实是比拟艰难的——思考一个Source对N个Sink的场景。故此Flink设计了相应的接口来保障端到端的精准一次,别离是:

  • TwoPhaseCommitSinkFunction:想做精准一次的Sink必须实现此接口。
  • CheckpointedFunction:Checkpoint被调用时的钩子。
  • CheckpointListener:顾名思义,当Checkpoint实现或失败时会告诉此接口的实现者。

目前Source和Sink全副ExactlyOnce实现的只有Kafka——其上游反对断点读取,上游反对回滚or幂等。有趣味的同学能够浏览该接口的相干实现。

可能有同学会好奇为什么JDBC Sink没有实现ExactlyOnce。实质和这个接口的执行形式无奈兼容JDBC的事务应用形式——当一个算子象征退出时,是无奈再对之前的事务进行操作的。因而TwoPhaseCommitSinkFunction中的retryCommit以及retryRollback是无奈进行的——见https://github.com/apache/fli…。JDBC的Sink是基于XA实现的,尽可能保障一致性。这里可能又有同学会问了为什么不必Upset类的语句,因为这个形式并不通用——对于Upset须要一个惟一键,不然性能极差。

4. 小结

本文以问题视角切入Checkpoint的原理与实现,并对相干源码做了简略的跟踪。其实代码的线路是比拟清晰的,但波及大量的类——有心的同学可能曾经发现,这是繁多职责准则的体现。TwoPhaseCommitSinkFunction中的实现也是典型的模版办法设计模式。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理