摘要:本文整顿自美团计算引擎工程师王不凡,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容次要分为四个局部:
- Log based Checkpoint 基本原理介绍
- 美团利用场景及测试成果
- Changelog Restore 性能优化
- Changelog 存储选型摸索
点击查看原文视频 & 演讲 PPT
一、Log based Checkpoint 基本原理介绍
1.1 一般 Checkpoint 制作形式
Log based Checkpoint 又被称为通用的增量 Checkpoint,那就要先看一下在此之前的增量 Checkpoint 是怎么制作的,以及存在什么样的问题。这里次要是指 RocksDB 的增量 Checkpoint。为了简化形容,前面局部也都以 RocksDBStateBackend Checkpoint 为例。
如上图所示,RocksDBStateBackend Checkpoint 在制作时间接触发底层的 RocksDB 快照。在此过程中 RocksDB 须要将内存中的 MemTable Flush 到磁盘上,造成 L0 层的 SST 文件,以保障 SST 文件中保留了全量统一的状态数据。
RocksDBStateBackend 的增量 Checkpoint 就是在制作新的 Checkpoint 时不再反复上传上次 Checkpoint 曾经传过的 SST 文件,通过缩小反复上传文件来进步制作速度。但因为每次仍须要 RocksDB 将 MemTable 刷盘,导致高频制作 Checkpoint 时会存在一些问题。
首先是 Memtable 频繁刷盘会导致 L0 文件产生的过快,进而导致频繁的 Compaction。其次是 Checkpoint 制作时,所有 RocksDB 实例同步执行快照会导致 CPU 使用率、磁盘 IO 和网络流量的尖刺。
上图为 Log based Checkpoint 的制作形式。为了反对该性能,Flink 引入了 ChangelogStateBackend 组件。这个组件在代理状态写操作的过程中,通过背地的 Changelog Writer 组件将 State Change 数据写入到 Changelog Storage 中。在 Checkpoint 制作时,只须要 Changelog Writer 保障 Flush State Change 即可。而底层 RocksDB 的快照被称为物化,并将以较低的频率异步执行。
咱们联合例子来具体看下 Checkpoint 是如何制作的,上图中也形容了随工夫推动产生的事件:
- T0 时刻作业启动,随之开始解决数据。数据处理过程中一直地有 State Change 产生。T1 时刻 Checkpoint-1 触发,此时从 T0 到 T1 所有 State Change 就是 Checkpoint-1 中的所有状态数据,也就是图中的 Change-Set-1。
- T2 时刻触发了一次物化,即异步制作 RocksDB 快照并上传,咱们简称为 m1。同时在 T2 时刻将在 State Changelog 中记录同步点,使得 T1 到 T2 之间的 State Change 形成 Change-Set-2。
- T3 时刻触发了 Checkpoint-2,因为此时 m1 还没有实现,Checkpoint-2 依然只由 Changelog 形成,即蕴含了 Change-Set-1、Change-Set-2 和 Change-Set-3。在 Checkpoint-2 制作实现之后,m1 也制作实现了。
- T4 时刻触发了 Checkpoint-3,这时候能够发现存在一个最近实现的 m1,Checkpoint-3 能够间接由 m1 及其之后的 Change-Set-3 和 Change-Set-4 形成。
通过这种形式,Flink 的 Checkpoint 与底层的状态存储快照进行理解耦,使得 Flink Checkpoint 可能以较高的频率和速度执行。
1.2 Log based Checkpoint 劣势
劣势有四点:
- 第一,能够带来更轻量的 Recovery。Checkpoint 的距离越短,Recovery 时须要回溯的数据就越少,作业复原的速度也就会越快。
- 第二,能够缩小事务化 Sink 的端到端提早。事务化 Sink 在 Checkpoint 实现时进行 commit,更快的 Checkpoint 意味着能够进行更频繁的 commit。
- 第三,更可预测的 Checkpoint 距离。不须要等 DB Flush,compaction 的影响也更小,Checkpoint 制作时长仅取决于须要长久化到 Durable Storage 上的 Changelog 数据的多少。
- 第四,更敌对的资源应用。比拟重的 DB 快照操作被扩散执行,能够防止 CPU 使用率、磁盘 IO 和网卡流量 随 Checkpoint 制作而产生的尖刺。
二、美团利用场景和测试成果
咱们想要尝试应用 Log based Checkpoint 反对的业务需要,是流量数据天级回溯不超过千分之五。这种类型作业的特点是数据量大、作业规模大、单作业的规模有 5000 slot 左右。以后大部分作业可能稳固运行的 Checkpoint 距离为十分钟,一天内 Failover 一次,回溯的数据量就会突破千分之五以内的要求。
针对这种需要,咱们最后有两种解决思路:一个是缩小数据回溯的工夫,另一个是缩小回溯的 Kafka 分区数。缩小回溯数据工夫能够通过缩短 Checkpoint 距离来实现。这方面有 Flink 社区的 Flip-158 即咱们下面介绍的 Log based Checkpoint 来反对。而缩小回溯分区数能够通过只重启故障的节点来实现,这方面有 Flink 社区的 Flip-135 即 Approximate Task Local Recovery 来提供反对。
思考到 Approximate Task Local Recovery 的计划会导致数据失落,而这是咱们的业务所不能承受的,因而咱们抉择了 Log based Checkpoint。因为过后该性能还是实验室阶段,因而咱们首先对其进行了测试验证。
测试表明,应用 Log based Checkpoint 能够将 Checkpoint 的稳固制作距离从十分钟降落到十秒,齐全可能满足流量业务对天级数据回溯不超过千分之五的要求。
同时如上图三张资源使用率的监控所示,也验证了 Log based Checkpoint 可能优化资源应用。图中蓝线是应用传统的 Checkpoint 形式,黄线是应用了 Log based Checkpoint。具体的:CPU 使用率的峰值降落了 40%,磁盘 IO 使用率峰值降落了 29%,网卡出流量峰值降落了 61%。根本打消了传统 Checkpoint 制作过程中的资源应用尖刺。
但同时测试也暴露出 Flink 社区基于 DFS 实现的 Changelog 存储的一些问题。
首先是 Changelog Restore 反复下载文件问题。为了防止产生过多小文件,同一 TaskManager 内的 State Changelog 会尽量聚合到同一个文件中。而在 Restore 时这些文件会被同一个 TM 内的 operator 反复下载,导致 Restore 性能差。
其次是即便通过 TM 粒度的聚合,小文件问题依然重大,HDFS NN 压力微小。以咱们一个 4800 并发的作业为例,默认配置下会产生 130 万左右个 Changelog 文件,单个作业带来的 NN 申请高达 18000 次 / 秒左右。
最初是 Changelog 文件写提早太高,影响 Checkpoint 制作速度。还是以咱们的 4800 并发作业为例,写 Changelog 文件 p99 提早在 3 秒左右,最大提早甚至达到 2 分钟,导致 Checkpoint 的制作工夫十分不稳固。
针对这些问题,咱们别离进行了剖析和解决,这将会在前面的篇幅开展。
三、Changelog Restore 性能优化
3.1 Restore 时 Changelog 反复下载的问题
Flink 社区基于 DFS 的 Changelog Storage 在实现 Changelog 上传时,为了缩小小文件问题,抉择在同一 TM 内对所有 Operator Changelog 进行聚合和压缩,而后再上传到 DFS 上的文件中。而产生 Restore 时每个 Operator 实例的 Changelog Reader 都须要反复地读一次 Changelog 文件,造成重大的读放大,进而导致 Restore 的速度过慢。
缩小同一个 TM 内的 slot 数能够加重这个问题,然而又会减轻 DFS 小文件的问题。
其实每个 Operator 实例的 Changelog 数据在文件中是间断的一段,并且 Checkpoint 元数据中记录了 Offset,为什么不能间接 Seek 到相应的 Offset 后再开始读操作呢,这样就能够做到每个 Operator 只读属于本人的局部,不是么?
问题就在于,为了缩小 Changelog 文件的体积,Changelog 文件是通过压缩后再上传到 DFS 的。压缩导致 Offset 对压缩后的文件生效了,只能从头开始解压缩后再 Seek 到相应的 Offset 上。这样就导致了同一个 Changelog 文件的重复下载和解压缩。
针对这个问题,咱们提出在 TM 上减少一个 Changelog File Cache 组件,代理 Changelog 文件的下载。Cache 组件会在须要的时候将 Changelog 文件下载并间接解压缩后存储到本地,当 Changelog Reader 发动申请时,能够间接在本地缓存的文件上 Seek 到相应的 Offset 后读取。
这样在 Restore 的过程中,每个 Changelog 文件都只须要下载和解压各一次。Changelog File Cache 组件会在外部对每个本地缓存文件记录援用计数,通过援用计数和可配置的 TTL 来清理本地缓存。这个优化曾经在 Flink-1.16 上公布了。
四、Changelog 存储选型摸索
上图总结了在美团业务场景下 State Changelog 数据对存储的需要。根本的功能性需要是不丢、不重和保序,以保证数据的正确性。同时在性能方面须要满足较低的写入提早,以保障 Checkpoint 的疾速的实现。最初联合美团的 Flink 作业现状,还须要可能撑持百万级的并发写入。
针对以上的各项需要,咱们对候选的存储进行了比照。首先咱们将存储分为两类,一类是批式上传的 HDFS 和 S3,这一类是以后实现曾经反对的;另一类是流式上传,例如 BookKeeper、Kafka 和 Pulsar 等。具体细节见如下表格:
补充阐明下 BookKeeper,因为在设计之初就思考到为 WAL Log 服务,从性能保障、提早和并发规模上来看,都比拟适宜用来存储 Flink 的 State Changelog。而 HDFS 和 S3 这种 DFS 类的存储,提早和并发反对能力均不太满足需要。Kafka 因为面向分区的设计,在分区数过多的状况下体现不佳。Puslar 在能力上是满足需要的,然而绝对于 BookKeeper 来看没有显著劣势,又比 BookKeeper 多了 1 层 proxy 的开销。因而咱们初步抉择应用 BookKeeper 作为新的 Changelog 存储。
在实现 BookKeeper Changelog Storage 之前,咱们先来看一下 Changelog 波及的组件和他们的作用。如上图所示,Changelog 中组织了三个组件,别离是:
- 负责代理状态读写的 ChangelogBackend。
- 负责 Changelog 读写的 Changelog Storage。
- 负责协调以上两个组件的物化 Manager。
咱们要实现的就是 BookKeeper 的 Changelog Storage,其中蕴含 Writer 和 Reader 两个次要组件。
联合 Checkpoint 的制作流程来看一下 Changelog Writer 的角色:
- Coordinator 触发 Checkpoint 之后,Changelog Writer 须要将曾经传给本人的 State Change 数据都 Flush 到 BookKeeper 上。而后向 Coordinator ACK 一个 BookKeeper Changelog State Handle。这个 Handle 中会蕴含 BookKeeper 的地址、数据所在的 Ledger ID 和 Offset 等信息。
- Coordinator 在收到 ACK 之后,将 Metadata 写入 Durable Storage 上,Checkpoint 就制作实现了。
再来看一下 BookKeeper Changelog Writer 对三个外围办法的实现:
首先是 Append 办法,用于向 Changelog Writer 传递 State Change。思考到要缩小到 BookKeeper 的申请次数,咱们会在 Writer 中去攒 Batch,Batch 满之后再将 Batch 中的数据发送给 BookKeeper,并递增 Sequence Number。nextSequence 办法用来获取物化的同步点,因而须要间接将 Batch 发送给 BookKeeper,同时也要递增 Sequence Number。
Persist 的办法用于在 Checkpoint 制作时,保障 Changelog 数据被长久化到 BookKeeper 上。因而除了须要将 Batch 发送给 BookKeeper 并递增 Sequence Number 之外,因为后面的操作思考到优化 BookKeeper 写入性能而开启了 Deferred Sink,没有要求 BookKeeper 刷盘,Persist 时还须要调用 Force 办法强制 BookKeeper 将后面收到的 entry 刷盘。最初,Persist 办法还须要收集并整顿 Ledger ID 和相干的 Offset 组装成 BookKeeper Changelog State Handle 返回给 Checkpoint Coordinator。
BookKeeper Changelog State Handle 中记录了存储在 BookKeeper 上的 Changelog 数据的元信息:一部分是 BookKeeper 的地址、数据摘要类型和数据加密密钥,另一部分蕴含了 Ledger ID 和相干 Offset。
如上图所示,暗影局部示意了此次 Checkpoint 须要蕴含的 State Change,笼罩了两个 Ledger,因而须要别离记录两个 Ledger ID、Start Offset 和 End Offset。
介绍完 Changelog 的写操作,再联合 Checkpoint Restore 的流程来看下读操作。
首先 Checkpoint Coordinator 会从 Durable Storage 上读取 Metadata 并解析出其中的 BookKeeper Changelog State Handle,而后将这些 Handle 分发给不同的 BookKeeper Changelog Reader,由 Reader 负责发动读申请从 BookKeeper 中读取数据并 Apply 到 State Table 上。
如何实现 BookKeeper Changelog State Handle 在 Checkpoint Metadata 中的序列化和反序列化呢?
因为目前所有的 State Handle 类型的序列化和反序列化都是硬编码的,十分不便于实现第三方的 Changelog Storage,咱们以后的计划是减少一个 Customer Keyed State Handle 的接口,容许 Keyed State Handle 本人实现序列化和反序列化办法。
Customer Keyed State Handle 在序列化时会首先将应用的序列化器的类名写入到 Metadata。再应用序列化器将 Handle 写入 Metadata。反序列化时,首先从 Metadata 中读取出序列化器类名,再依据类名应用 ClassLoader Load 出序列化器,最初应用序列化器读出 Customer Handle。
上图是 BookKeeper Changelog Storage 的配置项。除 BookKeeper 的地址外分为三类,别离用于管制 Ledger 的正本配置、Ledger 的滚动配置和批量上传配置,并且都提供了默认值。
须要阐明一下,因为以后的 Shared State Registry 不反对 BookKeeper Changelog State Handle 这类并非基于 Stream State Handle 的实现,因而咱们临时通过 TTL+ 内部服务的形式去清理 Ledger。
咱们在 State/Checkpoint 方向上的将来布局如下:
- 持续欠缺 BookKeeper Changelog Storage,补充相干的指标,实现引擎外部的 Ledger 清理,并实现 Benchmark 测试和性能的剖析,把握能力边界。
- 持续推动 Changelog 性能的落地,推动 POC 业务线上落地 Changelog 性能,在事务化 Sink 场景推广 Changelog 性能。
点击查看原文视频 & 演讲 PPT
更多内容
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://click.aliyun.com/m/1000372333/