作者 | 梅源(Yuan Mei)& Roman Khachatryan
流解决零碎最重要的个性是端到端的提早,端到端提早是指开始解决输出数据到输入该数据产生的后果所需的工夫。Flink,作为流式计算的标杆,其端到端提早包含容错的快慢次要取决于检查点机制(Checkpointing),所以如何将 Checkpoint 做得高效稳固是 Flink 流计算的首要任务。咱们在“Flink 新一代流计算和容错——阶段总结和瞻望”[1] 一文中介绍了 Flink 从社区 1.12 版本开始所做的晋升 Checkpointing 机制的致力,本文将着重介绍其中刚刚在 Flink 1.15 版本公布的 Generic Log-Based Incremental Checkpointing 这个性能。
一、概述
Generic Log-Based Incremental Checkpointing 的设计初衷是咱们将全量的状态快照和增量的检查点机制分隔开,通过继续上传增量 Changelog 的办法,来确保每次 Checkpointing 能够稳固疾速的实现,从而减小 Checkpointing 之间的距离,晋升 Flink 零碎端到端的提早。拓展开来说,次要有如下三点晋升:
- 更短的端到端提早:尤其是对于 Transactional Sink。Transactional Sink 在 Checkpoint 实现的时候能力实现两阶段提交,因而减小 Checkpointing 的距离意味着能够更频繁的提交,达到更短的端到端的提早。
- 更稳固的 Checkpoint 实现工夫:目前 Checkpoint 实现工夫很大水平上取决于在 Checkpointing 时须要长久化的(增量)状态的大小。在新的设计中,咱们通过继续上传增量,以达到缩小 Checkpoint Flush 时所须要长久化的数据,来保障 Checkpoint 实现的稳定性。
- 容错复原须要回滚的数据量更少:Checkpointing 之间的距离越短,每次容错复原后须要重新处理的数据就越少。
那是怎么做到的呢?咱们晓得影响 Flink Checkpointing 工夫的次要因素有以下几点:
- Checkpoint Barrier 流动和对齐的速度;
- 将状态快照长久化到非易失性高可用存储(例如 S3)上所须要的工夫。
对 Flink Checkpoint 机制不太理解的读者能够参考 1。
Flink 1.12 版本引入的 Unaligned Checkpoint[2] 和 1.14 版本中引入的 Buffer Debloating[3] 次要解决了上述第 1 个问题,尤其是在反压的状况下。更早之前引入的 Incremental Checkpoint[4] 是为了缩小每次 Checkpointing 所须要长久化存储状态的大小,以减小第 2 个影响因素,但在理论中也不齐全能做到:现有 Incremental Checkpoint 是基于 RocksDB 来实现的,RocksDB 出于空间放大和读性能的思考会定期做 Compaction。Compaction 会产生新的、绝对较大的文件,会减少上传所须要的工夫。每一个执行 Flink 作业的物理节点(Task)至多有一个 RocksDB 实例,所以 Checkpoint 被提早的概率会随着物理节点增多而变大。这导致在 Flink 的大型作业中,简直每次实现 Checkpointing 时都有可能会因为某个节点而提早,如下图所示。
图 1: 每次 Checkpoint 都可能因为某个节点上传文件迟缓而提早
另外值得一提的是在现有的 Checkpointing 机制下,Task 只有在收到至多一个 Checkpoint Barrier 之后,才会做状态快照并且开始长久化状态快照到高可用存储,从而减少了 Checkpoint 实现工夫,如下图所示。
图 2: 在现有机制下,快照在 Checkpoint Barrier 达到之后才会开始上传
在新的设计中,咱们通过继续上传增量 Changelog 的办法,能够防止这个限度,减速 Checkpoint 实现工夫。上面咱们来看看具体的设计。
二、设计
Generic Log-Based Incremental Checkpointing 的核心思想是引入 State Changelog(状态变动日志),这样能够更细粒度地长久化状态:
- 算子在更新状态的时候写双份,一份更新写入状态表 State Table 中,一份增量写入 State Changelog 中。
- Checkpoint 变成由两个局部组成,第一个局部是以后曾经长久化的存在远端存储上的 State Table,第二个局部是增量的 State Changelog。
- State Table 的长久化和 Checkpointing 过程独立开来,会定期由 background thread 长久化,咱们称为 Materialization(物化)的过程。
- 在做 Checkpoint 的时候,只有保障新增的 State Changelog 被长久化就能够了。
新的设计中须要在做 Checkpoint 的时候上传的数据量变得很少,不仅能够把 Checkpoint 做得更稳固,还能够做得更高频。整个工作流程如下图所示:
图 3: Generic Log-Based Incremental Checkpointing 工作流程
Generic Log-Based Incremental Checkpointing 相似传统数据库系统的 WAL 机制:
- 数据的增量更改(插入 / 更新 / 删除)会被写入到 Transaction Log 中。一旦这部分更改的日志被同步到长久存储中,咱们就能够认为 Transaction 曾经实现了。这个过程相似于上述办法中的 Checkpointing 的过程。
- 同时,为了不便数据查问,数据的更改也会异步长久化在数据表(Table)中。一旦 Transaction Log 中的相干局部也在数据表中被长久化了,Transaction Log 中相干局部就能够删除了。这个过程相似于咱们办法中的 State Table 长久化过程。
这种和 WAL 相似的机制能够无效晋升 Checkpoint 实现的速度,但也带来一些额定的开销:
- 额定的网络 IO 和额定的 Changelog 长久存储开销;
- 缓存 Changelog 带来的额定的内存应用;
- 容错复原须要额定的重放 Changelog 带来的潜在的复原工夫的减少。
咱们在前面的 Benchmark 比照中,也会对这三方面的影响进行剖析。特地对于第 3 点,额定的重放 Changelog 所带来的容错复原工夫减少会在肯定水平上因为能够做更频繁的 Checkpoint 所补救,因为更频繁的 Checkpoint 意味着容错复原后须要回放的解决数据更少。
三、Changelog 存储(DSTL)
Generic Log-Based Incremental Checkpointing 的很重要的一个组件是 State Changelog 存储这个局部,咱们称之为 Durable Short-term Log(DSTL,短存 Log)。DSTL 须要满足以下几个个性:
-
短期长久化
State Changelog 是组成 Checkpoint 的一个局部,所以也须要能长久化存储。同时,State Changelog 只须要保留从最近一次长久化 State Table 到以后做 Checkpoint 时的 Changelog,因而只须要保留很短时间(几分钟)的数据。
-
写入频率远远大于读取频率
只有在 Restore 或者 Rescale 的状况下才须要读取 Changelog,大部分状况下只有 append 操作,并且一旦写入,数据就不能再被批改。
-
很短的写提早
引入 State Changelog 是为了能将 Checkpoint 做得更快(1s 以内)。因而,单次写申请须要至多能在冀望的 Checkpoint 工夫内实现。
-
保障一致性
如果咱们有多个 State Changelog 的正本,就会产生多正本之间的一致性问题。一旦某个正本的 State Changelog 被长久化并被 JM 确认,复原时须要以此正本为基准保障语义一致性。
从下面的个性也能够看出为什么咱们将 Changelog 存储命名为 DSTL 短存 Log。
3.1 DSTL 计划的抉择
DSTL 能够有多种形式实现,例如分布式日志(Kafka)、分布式文件系统(DFS),甚至是数据库。在 Flink 1.15 公布的 Generic Log-Based Incremental Checkpointing MVP 版本中,咱们抉择 DFS 来实现 DSTL,基于如下思考:
- 没有额定的内部依赖:目前 Flink Checkpoint 长久化在 DFS 中,所以以 DFS 来实现 DSTL 没有引入额定的内部组件。
- 没有额定的状态治理:目前的设计方案中 DSTL 的状态治理是和 Flink Checkpointing 机制整合在一起的,所以也不须要额定的状态治理。
- DFS 原生提供长久化和一致性保障:如果实现多正本分布式日志,这些都是额定须要思考的老本。
另一方面,应用 DFS 有以下毛病:
- 更高的提早:DFS 相比于写入本地盘的分布式日志零碎来讲一般来说有更高的提早。
- 网络 I/O 限度:大部分 DFS 供应商出于老本的思考都会对单用户 DFS 写入限流限速,极其状况有可能会造成网络过载。
通过一些初步试验,咱们认为目前大部分 DFS 实现(例如 S3,HDFS 等)的性能能够满足 80% 的用例,前面的 Benchmark 会提供更多数据。
3.2 DSTL 架构
下图以 RocksDB 为例展现了基于 DFS 的 DSTL 架构图。状态更新通过 Changelog State Backend 双写,一份写到 RocksDB,另一份写到 DSTL。RocksDB 会定期进行 Materialization,也就是将以后的 SST 文件 上传到 DFS;而 DSTL 会将 state change 继续写入 DFS,并在 Checkpointing 的时候实现 flush,这样 Checkpoint 实现工夫只取决于所需 flush 的数据量。须要留神的是 Materialization 齐全独立于 Checkpointing 的过程,并且 Materialization 也能够比 Checkpointing 的频率慢很多,零碎默认值是 10 分钟。
图 4: 以 RocksDB 为例基于 DFS 的 DSTL 架构图
这里还有几个问题值得补充讨论一下:
-
状态清理问题
后面有提到在新的架构中,一个 Checkpoint 由两局部组成:1)State Table 和 2)State Change Log。这两局部都须要按需清理。1)这个局部的清理复用 Flink 已有的 Checkpoint 机制;2)这个局部的清理绝对较简单,特地是 State Change Log 在以后的设计中为了防止小文件的问题,是以 TM 为粒度的。在以后的设计中,咱们分两个局部来清理 State Change Log:一是 Change Log 自身的数据须要在 State Table 物化后删除其绝对应的局部;二是 Change Log 中成为 Checkpoint 的局部的清理交融进已有的 Flink Checkpoint 清理机制[4]。
- DFS 相干问题
-
小文件问题
DFS 的一个问题是每个 Checkpoint 会创立很多小文件,并且因为 Changleog State Backend 能够提供更高频的 Checkpoint,小文件问题会成为瓶颈。为了缓解这种状况,咱们将同一个 Task Manager 上同一作业的所有 State Change 写到同一个文件中。因而,同一个 Task Manager 会共享同一个 State Change Log。
-
长尾提早问题
为了解决 DFS 高长尾提早问题,DFS 写入申请会在容许超时工夫(默认为 1 秒)内无奈实现时重试。
四、Benchmark 测试后果剖析
Generic Log-Based Incremental Checkpointing 对于 Checkpoint 速度和稳定性的晋升取决于以下几个因素:
- State Change Log 增量的局部与全量状态大小之比,增量越小越好。
- 不间断上传状态增量的能力。这个和状态拜访模式相干,极其状况下,如果算子只在 Checkpointing 前更新 Flink State Table 的话,Changelog 起不到太大作用。
- 可能对来自多个 Task 的 changelog 分组批量上传的能力。Changelog 分组批量写 DFS 能够缩小须要创立的文件数量并升高 DFS 负载,从而进步稳定性。
- 底层 State Backend 在刷磁盘前对同一个 key 的 更新的去重能力。因为 state change log 保留的是状态更新,而不是最终值,底层 State Backend 这种能力会增大 Changelog 增量与 State Table 全量状态大小之比。
- 写长久存储 DFS 的速度,写的速度越快 Changelog 所带来的晋升越不显著。
4.1 Benchmark 配置
在 Benchmark 试验中,咱们应用如下配置:
- 算子并行度:50
- 运行工夫:21h
- State Backend:RocksDB (Incremental Checkpoint Enabled)
- 长久存储:S3 (Presto plugin)
- 机器型号:AWS m5.xlarge(4 slots per TM)
- Checkpoint 距离: 10ms
- State Table Materialization 距离:3m
- Input Rate:50K Events /s
4.2 ValueState Workload
咱们第一局部的试验,次要针对每次更新的 Key 值都不一样的负载;这种负载因为上述第 2 点和第 4 点的起因,Changelog 的晋升是比拟显著的:Checkpoint 实现工夫缩短了 10 倍(99.9 pct),Checkpoint 大小减少 30%,复原工夫减少 66% – 225%,如下表所示。
表 1: 基于 ValueState Workload 的 Changelog 各项指标比照
上面咱们来更具体的看一下 Checkpoint Size 这个局部:
表 2: 基于 ValueState Workload 的 Changelog(开启 / 敞开)的 Checkpoint 相干指标比照
- Checkpointed Data Size 是指在收到 Checkpoint Barrier,Checkpointing 过程开始后上传数据的大小。对于 Changelog 来说,大部分数据在 Checkpointing 过程开始前就曾经上传了,所以这就是为什么开启 Changelog 时这个指标要比敞开时小得多的起因。
- Full Checkpoint Data Size 是形成 Checkpoint 的所有文件的总大小,也包含与之前 Checkpoint 共享的文件。与通常的 Checkpoint 相比,Changelog 的格局没有被压缩过也不够紧凑,因而占用更多空间。
4.3 Window Workload
这里应用的是 Sliding Window。如下表所示,Changelog 对 checkpoint 实现工夫减速 3 倍左右;但存储放大要高得多(耗费的空间靠近 45 倍):
表 3: 基于 Window Workload 的 Changelog(开启 / 敞开)的 Checkpoint 相干指标比照
Full Checkpoint Data 存储空间放大次要起因来自于:
- 对于 Sliding Window 算子,每条数据会加到多个滑动窗口中,因而为造成屡次更新。Changelog 的写放大问题会更大。
- 后面有提到,如果底层 State Backend(比方 RocksDB)在刷磁盘前对同一个 key 的 更新去重能力越强,则快照的大小绝对于 Changelog 会越小。在 Sliding Window 算子的极其状况下,滑动窗口会因为生效被清理。如果更新和清理产生在同一个 Checkpoint 之内,则很可能该窗口中的数据不蕴含在快照中。这也意味着革除窗口的速度越快,快照的大小就可能越小。
五、论断
Flink 1.15 版本实现了 Generic Log-Based Incremental Checkpointing 的 MVP 版本。这个版本基于 DFS 能够提供秒级左右的 Checkpoint 工夫,并极大的晋升了 Checkpoint 稳定性,但肯定水平上也减少了空间的老本,实质上是用空间换工夫。1.16 版本将进一步欠缺使其生产可用,比方咱们能够通过 Local Recovery 和文件缓存来减速复原工夫。另一个方面,Changelog State Backend 接口是通用的,咱们能够用同样的接口对接更快的存储来实现更短的提早,例如 Apache Bookkeeper。除此之外,咱们正在钻研 Changelog 的其余利用,例如将 Changelog 利用于 Sink 来实现通用的端到端的 exactly-once 等。
附录
如果您想试用 Generic Log-Based Incremental Checkpointing 的话,能够在 flink-conf.yaml 中进行如下简略的设置:
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: <location similar to state.checkpoints.dir>
残缺的设置文档能够参考 5
致谢
咱们感激 Stephan Ewen 提出了这个性能的最后构想,也感激 Piotr Nowojski, Yu Li 和 Yun Tang 的探讨和代码 Review。
[1] https://flink-learning.org.cn…
[2] https://cwiki.apache.org/conf…
[3] https://cwiki.apache.org/conf…
[4] https://flink.apache.org/feat…
[5] https://nightlies.apache.org/…
Flink CDC Meetup 视频回顾 & PPT 下载
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…