乐趣区

关于后端:基于-Log-的通用增量-Checkpoint

摘要:本文整顿自阿里巴巴开发工程师,Apache Flink Contributor 俞航翔,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容次要分为四个局部:

  1. Checkpoint 性能优化之路
  2. Changelog 机制解析
  3. Changelog 性能测试
  4. 总结与布局

点击查看原文视频 & 演讲 PPT

一、Checkpoint 性能优化之路

2.1 Checkpoint 总览

家喻户晓,Flink 是有状态的分布式计算引擎,状态是 Flink 中十分重要的概念,而在 Flink 中状态和 Checkpoint 机制是密不可分的,因而在探讨 Flink 在 Checkpoint 上优化历程之前,先来看下为什么 Checkpoint 这么重要,Checkpoint 到底做了些什么呢?

Checkpoint 概念并不生疏,它在各种零碎中都呈现过,其次要目标就是容错及保障利用在产生故障后仍然可能失常运行。故障对长期利用的零碎是无奈防止的,而数据处理提早是流计算零碎中十分重要的指标。如何在产生故障后保障利用尽快恢复并追上最新的数据是流计算零碎须要重点解决的问题。而相比基于容机制的故障复原,Checkpoint 机制会更轻量、更易用。

进一步,很多业务对故障复原后的数据一致性提出了更高的要求。Flink 的 Checkpoint 机制反对了 Exactly-once 语义,在 Source 反对回放和 Sink 反对事务后,能够做到端到端的 Exactly-once 语义。在 Checkpoint 和复原性能优化到肯定水平后,利用能够做到真正的、好像没有呈现故障似的长期运行。

Flink 是如何基于 Checkpoin 机制做到的呢?

在作业的运行过程中,Flink 的 Stateful 算子会通过 State 记录多个 events 之间的信息,Flink 会定期执行 Checkpoint 把这些状态长久化,将全局一致性快照上传到远端存储中,而在产生故障后,Flink 的每个 Task 将会下载长久化的状态数据到本地并从新构建本地的状态数据结构。如果 Source 反对重放,整个 Pipeline 会从记录的上一个位点开始重放,作业开始失常运行。

基于这两个局部的探讨,能够理解到 Checkpoint 须要围绕两个重要指标设计:轻量级和疾速的 Failover。

1.2 更轻量、更疾速的 Checkpoint

基于以上两个设计指标,Flink 在 Checkpoint 做了诸多优化,咱们能够联合 Checkpoint Metrics 来一览这些优化的作用。

在 0.9 这个版本中,Flink 引入了轻量的异步快照算法。这个算法有两个外围点:

  • 一是在作业粒度,将 Barrier 作为非凡的 Record 在 Graph 中传递,收到 Barrier 算子将执行 Checkpoint。
  • 二是在算子粒度,Checkpoint 的执行步骤分成了同步阶段和异步阶段,并将较重的操作,如文件上传等,放到了异步阶段。

在 Metrics 中咱们能够看到 Checkpoint 端到端的耗时被分成了多个时间段,其中咱们在遇到 Checkpoint 性能问题时,首先会查看的就是同步阶段的耗时和异步阶段的耗时。之后呈现的各项技术就是次要针对这两个阶段的优化。

在 1.0 这个版本中,Flink 反对了 RocksDB StateBackend,让大状态作业领有了更高的稳定性,然而大状态的 Checkpoint 却成为了瓶颈。随着状态增大,咱们能够看到 Full Checkpoint Data Size,即全量 Checkpoint 的数据量会有显著增大,进而导致 Checkpoint 异步局部的耗时减少显著。

因而在 1.3 版本中,Flink 反对了基于 RockDB 的 Incremental Checkpoint。在这种机制下,State Backend 在异步阶段只须要上传增量文件即可,大大减少了 Checkpoint 在异步阶段上传的文件量,从而缩短了 Checkpoint 的异步耗时。

通常,在 Metrics 中看到异步阶段耗时过长,同时 Full Checkpoint Data Size 较大时,能够首先思考开启该配置。开启之后,能够通过 Checkpointed Data Size 看到增量局部的大小。

那么同步阶段的耗时能够进一步放大吗?

在生产实践过程中,在方才的算法下,Alignment Duration,即对齐工夫,通常是同步阶段耗时较长的局部。例如,作业两头的一个算子可能会收到多个上游算子的输出,而在刚刚提及的算法中,为了保障 Exactly-once 语义,须要等到多个 Barries 对齐后,算子才会触发整个 Checkpoint,这会导致整体 Checkpoint 可能会因为链路的算子解决过慢而让整个 Checkpoint 做不进去。这个时候对齐工夫就会变长。因而在 1.11 中,Flink 反对了 Unaligned Checkpoint,并在 1.13 中该性能达到 Production Ready 状态。

该性能开启后,在单个 Barrier 达到某个算子时,能够间接被传递到 Output Buffer 最初,同时间接触发 State Backend 层的 Checkpoint,并把承受最初一个 Barrier 前的 Input Buffer 和 Output Buffer 中的 inflate data 存储到远端。而在复原时,额定复原这一部分的数据,并回放 Input Buffer 中的数据到算子上。这种形式在缩小同步对齐工夫的同时,还提供了 Exactly-once 的语义保障。

这个时候咱们能够在 Flink UI 上看到 Unaligned Checkpoint 已被置为 true,同时能够看到 persisted in-flight data 有显著的回升。

但 Unaligned Checkpoint 的关上会因为要存储额定的 in-flight data 导致空间放大问题,有没有方法能够缩小这部分空间的开销呢?同时 Checkpoint Metrics 中还有个指标——Start Delay。这个指标是 Barrier 从创立到达到该算子的工夫,有没有方法能够进一步减速整个作业的 Barrier 流动,让 Barrier 更快可能达到某个算子,从而更快触发 Checkpoint 呢?

为了解决这个问题,1.14 中引入了 Buffer Debloating 机制。该性能能够通过检测网络流量状况来动静调节 Network Buffer 的大小,从而放慢 Aligned Checkpoint 触发,缩小 Unaligned Checkpoint 的额定空间开销。

以上各项技术能够优化 Checkpoint 的同步耗时和异步耗时,其中 Unaligned Checkpoint 和 Buffer Debloating 机制的联合能够绝对较小的代价来大大减少同步阶段的耗时。

那么异步阶段的耗时是否有进一步改良的空间呢?1.15 中引入了通用增量 Checkpoint,即 Changelog StateBackend,该性能在 1.16 中达到 Production Ready 的状态,也就是这次分享的配角。

二、Changelog 机制解析

2.1 RocksDB Incremental Checkpoint 机制

首先,正如咱们方才提到,基于 RocksDB 的 Incremental Checkpoint 机制能够缩小异步阶段须要上传的文件量,然而他没法从根本上保障 Checkpoint 过程的疾速和稳固。为什么呢?

咱们先来回顾下它的机制。在写入流程中,Record 会先被写入到 RocksDB 的内存数据结构,即 MemTable 中。而后 MemTable 在满了之后会变成 ImmuTable MemTable。在达到阈值时,ImmuTable MemTable 会被 Flush 到磁盘中造成 SST Files。

这个过程可能会触发 SST 的 Compaction,在 Level Compaction 机制下,可能会触发多层的级联 Compaction,进一步产生大量的新文件。如图所示,黄色标注的新文件和旧文件就造成了全量的 SST。

在 Checkpoint 的同步流程中,会先强制触发 MemTable 的 Flush,相似于方才的过程是可能会触发 SST 的 Compaction 的。而后会执行一个本地的 Checkpoint。这个其实是一个 Hard Link 的过程,是绝对轻量的。而在 Checkpoint 的异步流程中,则会把增量局部的文件上传到远端存储中,并写入一些 Meta 信息。

这个过程中存在的问题:

首先,Flush 可能会触发多层 Level Compaction,进一步导致大量文件须要从新上传。

其次,除了数据量达到阈值时,Checkpoint 的同步阶段也是会强制 Flush MemTable。在大规模作业下,会导致多个 Task 同时触发 Compaction 和文件上传,进一步导致资源和 Checkpoint 耗时随 Checkpoint 周期抖动。

最初,Checkpoint 的端到端耗时取决于最长的链路。而对于大规模作业,每次 Checkpoint 都可能因为某个 Task 的异步工夫过长而导致整体耗时变长,最终导致 Checkpoint 耗时长且不稳固。

因而咱们引入了 Changelog 机制,其指标就是为了进一步提供稳固且疾速的 Checkpoint。

首先,RocksDB 因为 Compaction 导致上传文件量的不稳固,进一步会导致 Checkpoint 耗时突增,而开启 Changelog 就能够大幅缩小 Checkpoint 耗时突增的状况。同时在生产实践中,咱们有时会发现 CPU 和网络带宽随着 Checkpoint 周期性抖动。这通常是因为 Checkpoint 触发了多个 Task 的 Compaction,同时导致大量文件须要从新上传,这是可能会影响到作业自身的稳定性甚至其余作业的稳定性的。而 Changelog 是能够改善这个状况的。

其次,Changelog 通过上传绝对固定增量的形式,极大缩小异步局部耗时,保障在状态量较大时仍然能在秒级 / 亚秒级实现 Checkpoint。

而更疾速的 Chekpoint 能够带来什么益处呢?

一个是更小的端到端提早。Transactional Sink 的提交依赖于 Checkpoint 的实现,而 Checkpoint 实现得越快,能够让 Sink 的数据越陈腐。另一个是更少的数据回追,咱们晓得在 Failover 时会从最近的一个 Checkpoint 复原并回追位点后的数据,而更快的 Checkpoint 也就意味着更少的数据回追。

当然,Changelog 会带来额定的开销,如空间开销、复原耗时开销等,但总体上这些开销是可控的,且相比其带来的收益,开销是绝对较小的,前面咱们会通过试验来进一步阐明。

那 Changelog 是如何做到这些晋升的呢?

Changelog 的机制其实很像 DB 中的 Checkpoint + WAL 机制,因而在介绍 Changelog 机制之前,咱们先看一下 DB 中的相似机制。

为了能疾速从故障中复原,DB 中通常会开启 WAL 性能。当开启该性能并写入数据到 DB 中时,会将 Record 以操作日志的模式先写入到磁盘的 WAL 构造中,而后再更新内存的数据结构,同时在内存和磁盘间同步。因为 WAL 是程序写入,因而该过程是十分快的。而在触发 Checkpoint 时,DB 会在本地做一个全量快照,同时在实现后将历史的 WAL Truncate 掉。而在故障复原时,会首先从 Checkpoint 中从新构建 DB,并将 WAL 回放到 DB 中,从而复原到故障前的状态。

为什么会须要这两种机制呢?首先,因为相比 WAL,Checkpoint 的操作绝对较重,无奈做到频繁且细力度的快照,而程序写的 WAL 能够做到。其次,WAL 有较大的空间放大问题,这在复原时须要额定的回放开销,因而定期的 Checkpoint 能够缩小这部分开销。

2.2 Changelog Incremental Checkpoint 术语

咱们能够基于 DB 中的机制引申到 Changelog 中的一些概念。State Table 是算子本地状态数据的读写构造,比方 RocksDB,其相似于上图左侧的 DB 数据结构。

State Changelog 是以 Append-only Log 模式存储的状态记录。DSTL 则是 Changelog 局部的存储组件,这两个局部相似于左图中的 WAL 构造。Materialization 是 State Table 的长久化过程,会定时触发,并且在胜利之后会穿 Turncate 掉 Changelog,相似于上图左侧中的 Checkpoint 的过程。

因而,简而言之,Changelog 就是通过额定的 DSTL 继续上传增量,并利用 Materialization 来实现定期刷新 State Table 全量快照的过程。

2.3 Changelog Incremental Checkpoint 外围机制

咱们能够进一步来看它的一些外围机制。首先,在作业粒度,端到端的整个过程是在 Flink Checkpoint 的机制下实现的,是与之前相似的。而在算子粒度,咱们能够同样通过三条链路来看整体过程:

  1. 在读写链路上,新写入的 Record 会以 State Changelog 的模式写入 DSTL 和 State Table 中,而读取时只会从 State Table 中读取。
  1. 在 Checkpoint 链路上,蕴含四个次要局部:
  • 一是在作业运行时,Changelog 局部会被 DSTL 定期间断上传到远端存储中。
  • 二是在 Flink Checkpoint 触发时,Changelog 局部会被间接上传。
  • 三是 Materialization 会被定时触发,同时在实现后会 Truncate 掉历史 Changelog。
  • 四是基于上述三个过程,JM 将获取到由 Materialization 局部和 Changelog 局部组成的 Handle 保留到 Meta 信息中。
  1. 在复原链路上,首先会复原 State Table,包含从远端下载文件和从新构建 State Table,而后再从远端下载并从新回放 Changelog 局部。

在该机制下,Checkpoint 的过程会变得十分平滑。每个 Task 会定时触发 Materialization 去长久化全量数据,而在作业运行时和 Flink Checkpoint 触发时,都会上传绝对较小的增量数据。

在这个机制下,每个 Task 的 Materialization 过程是绝对独立且不会影响到 Checkpoint 上传的数据量,因而 State Table 的具体机制和 Incremental Checkpoint 过程是解耦开的,Checkpoint 的过程能够变得十分稳固且疾速。

同样在复原时,每个 Task 将基于最近的 Materialization 构建 State Table,并在其上回放 Materialization 和最近 Checkpoint 之间的 Changelog 历史即可。

随同着 Checkpoint 变得疾速且稳固,Changelog 是会带来一些开销的,次要分为三个方面:

  • 一是额定的存储空间开销。咱们晓得 Changelog 局部是以操作日志的模式写入的,目前没有 Merge 机制,在 Truncate 之前是会持续增长的。因而绝对原生的如 Rocksdb 的机制,是会有空间放大的问题的。

    值得一提的是,远端存储的费用是绝对便宜的,比方 Aliyun OSS 规范存储大概在 0.12 元 /GB/ 月,咱们能够思考用大量增长的远端存储的费用来换取更稳固且疾速的 Checkpoint。

  • 二是额定的复原开销。在复原时除了 State Table 局部的复原,咱们还须要下载并回放 Changelog 局部,这部分耗时就是额定的下载和回放带来的。
  • 三是额定的性能开销。Changelog 机制引入了额定的双写步骤,是会对作业 TPS 下限产生影响的。值得注意的是,这里影响的是作业的 TPS 下限,而从日常运行的 TPS per core 来看,Changelog 是可能达到更高的。

在前面的试验中,咱们能够看到整体的开销是可控的。

三、Changelog 性能测试

这里将通过三个试验来验证 Changelog 机制的稳定性和执行性能,并察看它在空间放大、复原性能和极限 TPS 上的开销。

3.1 Changelog Incremental Checkpoint 的应用

首先介绍一下 Changelog 的应用形式,以及联合事例介绍在试验中的相干根底配置。

  • 第一个参数是启用 Changelog 的参数,咱们只有把它设置成 true 即可开启 Changelog,1.16 中咱们也反对了该参数的兼容性。
  • 第二个参数是 Materialization 的距离,它能够肯定水平上管制空间放大,在试验中,咱们把它设置为 3 分钟。
  • 第三个参数是 Changelog 局部的存储介质,生产实践中咱们能够设置成 File System 来将 Changelog 存储在 DFS 中;DFS 也是目前惟一反对的 Changelog 存储,之后咱们也会思考推出其余存储模式。
  • 第四个参数是 Changelog 在 DFS 上的存储门路,在第三个参数设置为 Filesystem 时须要设置,配置形式相似 Checkpoint 门路。

3.2 Benchmark 实验设计

整个测试分为三个试验来比照测试 RocksDB 和开启 Changelog 时的作业指标,来察看 Changelog 为作业带来的益处和额定的开销,同时阐明开销的可控性。

试验 A:比照测试在日常流量下,不同状态大小下的 CP 稳定性、速度和空间放大率。其中咱们设置了单并发 100MB 和 1.2GB 两种设置,别离对应状态全内存和落盘后的场景。

试验 B:在试验 A 的根底上,通过手动制作异样来触发 Failover,以比照测试在日常流量下作业复原的性能。

试验 C:让作业反压,比照测试状态算子在反压时的 TPS,以察看 Changelog 双写对 TPS 的影响。

这里咱们次要记录了 Value State 的测试后果,咱们也测试了不同类型 State 和不同 Operator 的性能,如 Window State,这部分数据将在后续的 Blog Post 中进一步分享给大家。

首先咱们能够从 Checkpoint 耗时图来看 Checkpoint 的稳定性。

由上图能够看到,RocksDB 的 Checkpoint duration 是绝对不稳固的,如果关上 SubTask 详情和 Rocksdb Metric 能够进一步看到,这个工夫的长短是和 Compaction 相干的。而 Changelog 是绝对稳固的,继续上传增量的形式能够让 Changelog 的 Checkpoint 执行得十分稳固。

另外,咱们能够看到 Checkpoint Duration 的变动是和 Checkpoint 周期有相关性的,比方每四个会久一些,这个是因为后面说的 Checkpoint 同步阶段时会触发 MemTable Flush,而 RocksDB 的 LO 层默认每 4 个会触发一次 Compaction 从而导致更多文件更新,这会进一步加剧 Rocksdb Checkpoint 的不稳定性。

对于 Checkpoint 的执行性能,从途中能够看到,随着 State Size 增大,关上 Changelog 后 Checkpoint 的 p99 耗时能够稳固在 1s 左右,而 Rocksdb 则从 8 秒减少到近 20 秒。这是因为 Changelog 继续上传固定增量的形式,能够让触发 Checkpoint 时的增量大小变得十分稳固且疾速。

在试验 A 中能够看到,开启 Changelog 是会有额定空间开销的,相比 Rocksdb 的空间放大大概在 1.2-2 倍,其中状态较小,比方在内存中时会更大。这是因为 RocksDB 的合并机制相比 Changelog 记录操作日志的形式能更好地缩小总数据量。特地地,当状态在内存中时,RocksDB 能有更好的合并成果。

针对这个状况,咱们能够通过反对 Changelog 局部的 Merge 来进一步减小空间上的开销。但就目前现状而言,如上文所说,远端存储的费用是绝对便宜的,咱们齐全能够思考用大量增长的远端存储的的费用来换取更稳固且疾速的 Checkpoint。

能够通过反对 Changelog 局部的 Merge 来减小空间上的开销。但就目前现状而言,远端存储的费用是绝对便宜的。如上文所说,能够思考用大量增长的远端存储费用来换取更稳固且疾速的 Checkpoint。

试验 B 次要测试额定复原开销。能够看到,在不关上 Local Recovery 时,关上 Changelog 相比 Rocksdb 约有 40% 的额定耗时开销,关上 Local Recovery 后,两者的复原开销都能够靠近疏忽。同时,联合 Checkpoint 端到端耗时的晋升,思考作业整体的 Failover 流程,关上 Changelog 后还是会更快的。

Changelog 复原时的开销次要来自哪里呢?由上面的公式能够看到,额定开销次要来自 Changelog 局部的下载和回放。而在开启 local recovery 后下载工夫被抹去,只有回放上还有额定较小的开销。这个局部能够进一步优化,比方通过 Remote Apply,即远端事后 Merge 的形式,运行时将 Changelog 局部继续 Apply 到 State Table 的文件中,进而放大须要回放的数据。同时这种形式也能缩小远端存储开销。

但目前来看开销上影响不是那么大,也欢送大家应用反馈,以便咱们能关注并开发最外围的优化点。

试验 C 令作业反压,同时通过分步启用 Checkpoint 和 Local Recovery 来察看 Changelog 和开启 Local Recovery 后对极限 TPS 的影响。下图中能够看出,因为双写的额定开销相比于 RocksDB,Changelog 的极限 TPS 降落约 10%~20%。而因为三写的额定开销,关上 Local Recovery 后极限 TPS 约降落 5% 左右。这个局部的性能改善曾经在 FLINK-30345 中实现,进一步的测试详情能够参考后续的 Blog Post。

四、总结与布局

本篇分享首先从 Checkpoint 的基本概念登程,介绍了多项 Checkpoint 的优化技术和他们在 Flink UI 中的体现,而后从 RocksDB Incremental Checkpoint 机制存在的问题登程,引出了 Changelog 的设计指标,并解析了 Changelog 的 Checkpoint 机制。最初通过三组试验直观的感触了 Changelog 的劣势和开销,同时也介绍了 Changelog 的应用。

将来咱们会围绕三个方向进行优化:

  • 针对下面三个问题进行进一步的性能优化。
  • 作为 Fault Tolerance 2.0 中的一员,让整个容错过程变得更轻量、更易用。
  • 联合 Table Store,为 Table Store 提供更高的数据新鲜度。

点击查看原文视频 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://click.aliyun.com/m/1000372333/

退出移动版