作为 Flink 最根底也是最要害的容错机制,Checkpoint 快照机制很好地保障了 Flink 利用从异样状态复原后的数据准确性。同时 Checkpoint 相干的 metrics 也是诊断 Flink 利用衰弱状态最为重要的指标,胜利且耗时较短的 Checkpoint 表明作业运行状况良好,没有异样或反压。然而,因为 Checkpoint 与反压的耦合,反压反过来也会作用于 Checkpoint,导致 Checkpoint 的种种问题。针对于此,Flink 在 1.11 引入 Unaligned Checkpint 来解耦 Checkpoint 机制与反压机制,优化高反压状况下的 Checkpoint 体现。
以后 Checkpoint 机制简述
置信不少读者对 Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照曾经比拟相熟,该节简略回顾下算法的根底逻辑,相熟算法的读者可释怀跳过。Chandy-Lamport 算法将分布式系统形象成 DAG(临时不思考有闭环的图),节点示意过程,边示意两个过程间通信的管道。分布式快照的目标是记录下整个零碎的状态,即能够分为节点的状态(过程的状态)和边的状态(信道的状态,即传输中的数据)。因为零碎状态是由输出的音讯序列驱动变动的,咱们能够将输出的音讯序列分为多个较短的子序列,图的每个节点或边先后解决完某个子序列后,都会进入同一个稳固的全局统状态。利用这个个性,零碎的过程和信道在子序列的边界点别离进行本地快照,即便各局部的快照工夫点不同,最终也能够组合成一个有意义的全局快照。
从实现上看,Flink 通过在 DAG 数据源定时向数据流注入名为 Barrier 的非凡元素,将间断的数据流切分为多个无限序列,对应多个 Checkpoint 周期。每当接管到 Barrier,算子进行本地的 Checkpoint 快照,并在实现后异步上传本地快照,同时将 Barrier 以播送形式发送至上游。当某个 Checkpoint 的所有 Barrier 达到 DAG 末端且所有算子实现快照,则标记着全局快照的胜利。
在有多个输出 Channel 的状况下,为了数据准确性,算子会期待所有流的 Barrier 都达到之后才会开始本地的快照,这种机制被称为 Barrier 对齐。在对齐的过程中,算子只会持续解决的来自未呈现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输出队列,直至在队列满后被阻塞。当所有 Barrier 达到后,算子进行本地快照,输入 Barrier 到上游并恢复正常解决。比起其余分布式快照,该算法的劣势在于辅以 Copy-On-Write 技术的状况下不须要“Stop The World”影响利用吞吐量,同时根本不必长久化解决中的数据,只用保留过程的状态信息,大大减小了快照的大小。
Checkpoint 与反压的耦合
目前的 Checkpoint 算法在大多数状况下运行良好,然而当作业呈现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业的不稳固。首先,Chandy-Lamport 分布式快照的完结依赖于 Marker 的流动,而反压则会限度 Marker 的流动,导致快照的实现工夫变长甚至超时。无论是哪种状况,都会导致 Checkpoint 的工夫点落后于理论数据流较多。这时作业的计算进度是没有被长久化的,处于一个比拟软弱的状态,如果作业出于异样被动重启或者被用户被动重启,作业会回滚失落肯定的进度。如果 Checkpoint 间断超时且没有很好的监控,回滚失落的进度可能高达一天以上,对于实时业务这通常是不可承受的。更蹩脚的是,回滚后的作业落后的 Lag 更大,通常带来更大的反压,造成一个恶性循环。其次,Barrier 对齐自身可能成为一个反压的源头,影响上游算子的效率,而这在某些状况下是不必要的。比方典型的状况是一个的作业读取多个 Source,别离进行不同的聚合计算,而后将计算完的后果别离写入不同的 Sink。通常来说,这些不同的 Sink 会复用公共的算子以缩小反复计算,但并不心愿不同 Source 间相互影响。
假如一个作业要别离统计 A 和 B 两个业务线的以天为粒度指标,同时还须要统计所有业务线以周为单位的指标,拓扑如上图所示。如果 B 业务线某天的业务量突涨,使得 Checkpoint Barrier 有提早,那么会导致专用的 Window Aggregate 进行 Barrier 对齐,进而阻塞业务 A 的 FlatMap,最终令业务 A 的计算也呈现提早。当然这种状况能够通过拆分作业等形式优化,但不免引入更多开发保护老本,而且更重要的是这原本就合乎 Flink 用户惯例的开发思路,应该在框架内尽量减小呈现用户意料之外的行为的可能性。
Unaligned Checkpoint
为了解决这个问题,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的个性。要了解 Unaligned Checkpoint 的原理,首先须要理解 Chandy-Lamport 论文中对于 Marker 解决规定的形容:
其中要害是 if q has not recorded its state,也就是接管到 Marker 时算子是否曾经进行过本地快照。始终以来 Flink 的 Aligned Checkpoint 通过 Barrier 对齐,将本地快照提早至所有 Barrier 达到,因此这个条件是永真的,从而奇妙地防止了对算子输出队列的状态进行快照,但代价是比拟不可控的 Checkpoint 时长和吞吐量的升高。实际上这和 Chandy-Lamport 算法是有肯定出入的。举个例子,假如咱们对两个数据流进行 equal-join,输入匹配上的元素。依照 Flink Aligned Checkpoint 的形式,零碎的状态变动如下(图中不同色彩的元素代表属于不同的 Checkpoint 周期):
- 图 a: 输出 Channel 1 存在 3 个元素,其中 2 在 Barrier 后面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 后面。
- 图 b: 算子别离读取 Channel 一个元素,输入 2。随后接管到 Channel 1 的 Barrier,进行解决 Channel 1 后续的数据,只解决 Channel 2 的数据。
- 图 c: 算子再生产 2 个自 Channel 2 的元素,接管到 Barrier,开始本地快照并输入 Barrier。
对于雷同的状况,Chandy-Lamport 算法的状态变动如下:
- 图 a: 同上。
- 图 b: 算子别离解决两个 Channel 一个元素,输入后果 2。尔后接管到 Channel 1 的 Barrier,算子开始本地快照记录本人的状态,并输入 Barrier。
- 图 c: 算子持续失常解决两个 Channel 的输出,输入 9。特地的中央是 Channel 2 后续元素会被保留下来,直到 Channel 2 的 Barrier 呈现(即 Channel 2 的 9 和 7)。保留的数据会作为 Channel 的状态成为快照的一部分。
两者的差别次要能够总结为两点:
- 快照的触发是在接管到第一个 Barrier 时还是在接管到最初一个 Barrier 时。
- 是否须要阻塞曾经接管到 Barrier 的 Channel 的计算。
从这两点来看,新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且勾销阻塞 Channel 的计算,算法上与 Chandy-Lamport 基本一致,同时在实现细节方面联合 Flink 的定位做了几个改良。首先,不同于 Chandy-Lamport 模型的只须要思考算子输出 Channel 的状态,Flink 的算子有输出和输入两种 Channel,在快照时两者的状态都须要被思考。其次,无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中,Barrier 都必须遵循其在数据流中的地位,算子须要期待 Barrier 被理论解决才开始快照。而 Unaligned Checkpoint 扭转了这个设定,容许算子优先摄入并优先输入 Barrier。如此一来,第一个达到 Barrier 会在算子的缓存数据队列(包含输出 Channel 和输入 Channel)中往前跳跃一段距离,而被”插队”的数据和其余输出 Channel 在其 Barrier 之前的数据会被写入快照中(图中黄色局部)。
这样的次要益处是,如果自身算子的解决就是瓶颈,Chandy-Lamport 的 Barrier 仍会被阻塞,但 Unaligned Checkpoint 则能够在 Barrier 进入输出 Channel 就马上开始快照。这能够从很大水平上放慢 Barrier 流经整个 DAG 的速度,从而升高 Checkpoint 整体时长。回到之前的例子,用 Unaligned Checkpoint 来实现,状态变动如下:
- 图 a: 输出 Channel 1 存在 3 个元素,其中 2 在 Barrier 后面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 后面。输入 Channel 已存在后果数据 1。
- 图 b: 算子优先解决输出 Channel 1 的 Barrier,开始本地快照记录本人的状态,并将 Barrier 插到输入 Channel 末端。
- 图 c: 算子持续失常解决两个 Channel 的输出,输入 2、9。同时算子会将 Barrier 越过的数据(即输出 Channel 1 的 2 和输入 Channel 的 1)写入 Checkpoint,并将输出 Channel 2 后续早于 Barrier 的数据(即 2、9、7)继续写入 Checkpoint。
比起 Aligned Checkpoint 中不同 Checkpoint 周期的数据以算子快照为界线分隔得很清晰,Unaligned Checkpoint 进行快照和输入 Barrier 时,局部本属于以后 Checkpoint 的输出数据还未计算(因而未反映到以后算子状态中),而局部属于以后 Checkpoint 的输入数据却落到 Barrier 之后(因而未反映到上游算子的状态中)。这也正是 Unaligned 的含意: 不同 Checkpoint 周期的数据没有对齐,包含不同输出 Channel 之间的不对齐,以及输出和输入间的不对齐。而这部分不对齐的数据会被快照记录下来,以在复原状态时重放。换句话说,从 Checkpoint 复原时,不对齐的数据并不能由 Source 端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被 Checkpoint 复原到对应 Channel 中,所以仍然能提供只计算一次的精确后果。当然,Unaligned Checkpoint 并不是百分百优于 Aligned Checkpoint,它会带来的已知问题就有:
- 因为要长久化缓存数据,State Size 会有比拟大的增长,磁盘负载会减轻。
- 随着 State Size 增长,作业复原工夫可能增长,运维治理难度减少。
目前看来,Unaligned Checkpoint 更适宜容易产生高反压同时又比拟重要的简单作业。对于像数据 ETL 同步等简略作业,更轻量级的 Aligned Checkpoint 显然是更好的抉择。
总结
Flink 1.11 的 Unaligned Checkpoint 次要解决在高反压状况下作业难以完成 Checkpoint 的问题,同时它以磁盘资源为代价,防止了 Checkpoint 可能带来的阻塞,有利于晋升 Flink 的资源利用率。随着流计算的遍及,将来的 Flink 利用大略会越来越简单,在将来通过实战打磨欠缺后 Unaligned Checkpoint 很有可能会取代 Aligned Checkpoint 成为 Flink 的默认 Checkpoint 策略。
参考
- FLIP-76: Unaligned Checkpoints
http://www.whitewood.me/2020/…
- Distributed Snapshots: Determining Global States of Distributed Systems
http://www.whitewood.me/2020/…
- Flink Docs: Data Streaming Fault Tolerance
https://ci.apache.org/project…
- Checkpointing Under Backpressure
http://apache-flink-mailing-l…
- Flink Checkpoint 问题排查实用指南
https://zhuanlan.zhihu.com/p/…
作者介绍:
林小铂,网易游戏高级开发工程师,负责游戏数据中心实时平台的开发及运维工作,目前专一于 Apache Flink 的开发及利用。探索问题原本就是一种乐趣。
原文链接:
http://www.whitewood.me/2020/…