关于flink:基于-Log-的通用增量-Checkpoint

40次阅读

共计 7454 个字符,预计需要花费 19 分钟才能阅读完成。

摘要:本文整顿自 Apache Flink Contributor 俞航翔 9 月 24 日在 Apache Flink Meetup 的演讲。次要内容包含:

  1. Checkpoint 性能优化之路
  2. 解析 Changelog
  3. 一览 State/Checkpoint 优化

点击查看直播回放 & 演讲 PPT

一、Checkpoint 性能优化之路

Flink 作为一个 Stateful 计算引擎,State 是其十分重要的概念,它反对 Stateful 算子通过 State 记录多个 Events 之间的信息,并在 Checkpoint 时做状态长久化,存储全局一致性快照,在复原时通过 Resume Checkpoint 以及 Replay 来实现不同语义的一致性保障。

容错是长期运行的流式零碎十分重要的局部,而 Checkpoint 的次要目标就是解决 Failover 问题。基于此指标,它的生命周期齐全由 Flink 治理。因而对于不同的 StateBackend 能够用特定的原生格局进行存储,利用 StateBackend 的外部机制比方 Incremental Checkpoint 做进一步优化等。

基于以上机制,Checkpoint 目前的设计指标就是更轻量以及更疾速的复原。

那 Flink 在 Chekpoint 性能上做了哪些优化呢?

最晚期,在反对轻量级异步 Snapshot 算法后,Flink Checkpoint 的性能往前迈了一大步。在该机制下,Flink 将 Barrier 作为非凡 Record 在 Graph 中流动,同时将耗时较大的文件上传等工作放到异步过程中进行,极大升高了对主流程的影响。

在 1.0 版本中,Flink 开始反对 RocksDB StateBackend,这对大状态下的存储提供了很好的反对。1.3 版本 Flink 实现了基于 RocksDB Incremental Checkpoint 的机制,进一步晋升了 Checkpoint 在异步阶段的性能。1.11 版本 Flink 引入了 Unaligned Checkpoint,并在 1.13 版本达到了 Production-ready 状态,对于 Barrier 对齐有瓶颈的作业,这个技术让作业在反压比较严重的状况下仍然能够做出 Checkpoint。1.14 版本引入的 Buffer Debloating 能够通过动静调整 Network Buffer 大小来减速 Barrier 流动,进一步减速 Aligned Checkpoint 实现,缩小 Unaligned Checkpoint 存储的数据量。1.15 和 1.16 中 Flink 引入了 Changelog StateBackend,它通过更通用的 Incremental Checkpoint 机制进一步晋升了 Checkpoint 的异步性能。

咱们能够通过这张图来看一下这些技术在作业执行链路中的作用。

当 Checkpoint 触发时,Barrier 会随着 Graph 流动。当关上 Buffer Debloating 后,Flink 会通过计算吞吐等形式动静调整 Network Buffer 的大小来减速 Barrier 传递。当 Barrier 达到 Stateful 节点时,如果是 Aligned Checkpoint,则算子会期待 Barrier 对齐再触发后续的 Checkpoint 行为;如果是 Unaligned Checkpoint,则会间接将 Barrier 传递给后续算子,同时触发 StateBackend 上的 Checkpoint,并将 Buffer 中的内容存储到 HDFS,不会阻塞,如图两头的实线所示。

在 StateBackend 外部触发 Checkpoint 时,基于异步 Checkpoint 算法,异步局部会进行文件上传,如图中实线所示。而在开启了 RocksDB Incremental Checkpoint 时,会做增量文件上传,如上图中,只有更新后的 S2、S4 须要上传到远端。

Changelog StateBackend 的外围机制如上图最下方虚线所示,会将原先的 StateBackend 异步上传和 Changelog 局部进行解耦,引入了额定独立上传到 HDFS 的过程,这个过程将稳固、继续地产生在作业运行过程中。

咱们能够通过 Checkpoint Metrics 相干信息来直观感触下这些技术的作用。

Metrics 上对于 Size 的要害指标有 Checkpointed DataSize 和 Full Checkpoint DataSize 两个,是 1.15 版本在 Flink UI 中透出的指标,咱们能够通过这两个指标间接看出增量 Checkpoint 的性能以及存储在远端的空间大小。

残余的指标次要是对于耗时的,StartDelay 指标过大通常是因为作业反压,个别须要先排查作业逻辑。此外,StartDelay 过大时,如果作业逻辑容许,可开启 Unaligned Checkpoint 和 Buffer Debloating 以减速 Checkpoint 实现。如果 Aligned Duration 指标比拟长,能够思考开启 Unaligned Checkpoint。

同步 Duration 和异步 Duration 是整个 Checkpoint 是咱们通常最关注的两个局部,而异步局部的耗时通常是最常见的瓶颈点。异步局部的优化能够通过开启 Incremental Checkpoint 以缩小异步上传量,或者通过开启 Changelog StateBackend 来进一步缩短异步耗时。

二、解析 Changelog

Changelog 的外围指标如下:

  1. 更稳固的 Checkpoint:通过解耦 Compaction 和 Checkpoint 过程,使 Checkpoint 更稳固,大幅缩小 Checkpoint Duration 突增的状况,还可进一步缩小 CPU 抖动,使网络带宽变得更安稳。

在大规模、大状态作业上常常会呈现 CPU 随着 Checkpoint 周期性抖动,进而影响作业和集群稳定性的状况。Changelog 通过解耦 Checkpoint 触发 Compaction 的过程,能够使 CPU 变得更安稳。另外,在异步过程中,Compaction 导致的大量文件同时上传有时会将网络带宽打满,而 Changelog 是可能缓解该情况的。

  1. 更疾速的 Checkpoint:Checkpoint 期间会上传绝对固定的增量,能够达到秒级实现 Checkpoint 的指标。
  1. 更小的端到端提早:Flink 中实现端到端的 Exactly-once 语义次要依赖于 Checkpoint 的实现工夫。Checkpoint 实现越快,Transactional sink 能够提交得更频繁,保障更好的数据新鲜度。后续可与 Table Store 联合,保障 Table Store 上的数据更陈腐。
  1. 更少的数据回追:通过设置更小的 Checkpoint Interval 减速 Failover 过程,能够缩小数据回追。

尽管目前 Changelog 的机制下,Restore 时在 TM 上会有额定的 Replay 工夫开销,但总体来看,消耗的工夫还是绝对缩小的。

那 RocksDB Incremental Checkpoint 为什么做不到疾速且稳固呢?

咱们先看下 RocksDB 的拜访机制:当一条 Record 写到 RocksDB 时,首先会写到 Memtable,数据量达到 Memtable 阈值后会 Memtable 变为 Immutable Memtable;当数据量再达到整个 Memory 所有 Memtable 的阈值后,会 Flush 到磁盘,造成 SST Files。L0 的 SST files 之间是有 Overlap 的。Flink 默认应用 RocksDB 的 Level Compaction 机制,因而在 L0 达到阈值后,会持续触发 Level Compaction,与 L1 进行 Compaction,进一步可能触发后续 Level Compaction。

咱们再来看一下 Checkpoint 同步阶段和异步阶段做了些什么。在同步过程中,Checkpoint 首先会触发 Memtable 强制 Flush,这一过程可能会触发前面级联的 Level Compaction,该步骤可能导致大量文件须要从新上传。同时,同步过程中会做 Local Checkpoint,这是 RocksDB 本地的 Checkpoint 机制,对 Rocksdb 而言其实就是 Hard Link 一些 SST Files,是绝对轻量的。异步过程会将这些 SST Files 上传,同时写入 Meta 信息。

咱们能够看到两个重要局部:

  1. 数据量达到阈值,或者 cp 的同步阶段,是会触发 Memtable Flush,进一步触发级联 Level Compation,进一步导致大量文件须要从新上传的
  1. 在大规模作业中,每次 Checkpoint 可能都会因为某一个 Subtask 异步工夫过长而须要从新上传很多文件。端到端 Duration 会因为 Compaction 机制而变得很长。

那么 Changelog 做了什么改良呢?

在介绍 Changelog 的机制之前,我先介绍下几个外部的术语

State Table 是本地状态数据读写构造,比方 RocksDB。咱们更偏向于将 Changelog 了解为 StateBackend 上的性能加强,已有的 StateBackend(HashmapStateBackend/RocksDBStateBackend,或者自定义的一种 StateBackend)均能够关上该性能。而且咱们在 1.16 中实现了 Changelog 开到关和关到开的兼容性,用户能够十分不便地在存量作业中应用。

Materialization 是 State Table 长久化的过程,能够了解为 RocksDBStateBackend 或 HashmapStateBackend 做 Checkpoint 的过程。目前会定时触发,实现一次胜利的 Materialization 后会 Truncate 掉 Changelog,即做 Changelog 的清理。

DSTL 是 Changelog 的存储组件。Changelog 的写入须要提供长久化、低提早、一致性及并发反对。目前基于 DFS 实现了 DSTL,后续咱们将持续摸索其余实现形式。

Changelog 的机制很像 WAL 的机制。

如图所示,图中上面局部为 State Table,下面为 Changelog 的存储局部即 DSTL。

首先,在状态写入时,会同时写到 State Table 和 DSTL,如果 State Table 是 Rocksdb,那么它的后续流程就像咱们方才提到的一样,包含写 Memtable,Flush,触发 Compaction 等等过程。DSTL 这个局部会以操作日志的形式追加写入 DSTL,咱们也反对了不同 State 类型的各种操作的写入。

其中 DSTL 会有一套残缺的定时长久化机制长久化到远端存储中,所有 Changelog 将会在运行过程中间断上传,同时在 Checkpoint 上传较小的增量。

State Table 会定时进行 Materialization,在实现一次残缺的 Materialization 后将会对 Changelog 进行 Truncate,清理掉生效的 Changelog,而后新的 Checkpoint 将以这个 Materialization 为基准持续继续上传增量。

咱们按读写流程、Checkpoint 流程、Restore 流程再总结下这个过程。

在状态写入时,会双写到 State Table 和 Dstl,读取时会从 State Table 中读取,即读写链路上只是多了个双写,且 Dstl 的局部是 append-only 的写入,会十分轻量级。

在 Checkpoint 时,依赖于定时 Materilize State Table,以及定期 Persist Changelog,须要上传的增量会十分小,在上传小增量后只须要把 Materialization 局部的 Handle 和 Changelog 局部的 Handle 组合返回给 jm 即可。同时咱们也会依赖于 Truncate 机制去及时清理有效 Changelog。

在 Restore 时,咱们拿到了 State Table 的 Handle 和 Changelog 的 Handle,State Table 局部会按之前的形式进行 Retsore,比方是 Rocksdb,那么在没开启 Local Recovery 时,会先下载远端 SST Files,再 Rebuild。Changelog 局部再没有开启 Local Recovery 时,会先下载远端 State Change,而后从新 Apply 到 State Table,Rebuild Changelog 局部。

Changelog 开启后,Checkpoint 文件上传过程会变得十分平滑。此前,只有在 Checkpoint 时才会上传文件;而当初有了 Materialization 以及定期做 Changelog 增量上传,理论做 Checkpoint 时须要上传的增量变得十分小。

上图为相干罕用参数的含意和应用办法。

Changelog 可能使 Checkpoint 更疾速以及更稳固,然而会存在三个额定开销:

  • 额定的存储空间。Truncate 之前,State Changelog 会始终占用额定的存储空间。
  • 额定的复原开销。Restore 过程须要额定 Apply 以及额定下载,因而也须要额定的复原,复原过程会占用耗时。
  • 额定的性能开销。State Changelog 会做定时上传,存在肯定的性能开销。

咱们基于 RocksDB incremental 与 Changelog 做了 Benchmark。Changelog 下应用的 State Table 为 RocksDB,开启 Incremental,应用 OSS 作为存储介质;将 Checkpoint Interval 设置为 1 秒,对 RocksDB 而言意味着尽可能快地执行 Checkpoint;将 Materialization Interval 设置为 3 分钟,Source Rate 设置为 10k/s,该速率对于两者而言都是比拟日常的流量。

结果显示,RocksDB 侧 Duration 不稳固,时少时多。Checkpoint Datasize 存在周期性特色,每隔 4 个会减少,这是因为 Checkpoint 期间会 Flush Memtable 触发 Compaction 导致,且 RocksDB 配置 L0 的阈值为 4。而 Changelog 局部执行很稳固。

上图展现了 Checkpoint Duration 状况,咱们截取了 P99 的数据做了两组参数,次要针对不同单并发 State Size。结果显示,CP 端到端的延时,Changelog 能够在 1s 以内实现,RocksDB 提早约 17 秒。

大 State 在 1GB 单并发场景下,空间开销约为 1.2-1.5 倍。在内存中会更多一些,因为 RocksDB 在内存中数据会变得更紧致。

理论测试中,Sliding Window 场景下的空间耗费会更显著,可能会超出两倍。因为 Sliding Window 时,每个 Windows 之间 State 不共享,会存储多份。另外,Sliding Window 在 Checkpoint 期间会一直触发 Purge 操作,Sliding Size 设置越小,Purge 越多,RocksDB 绝对能更好地合并 Push 操作。另外,因为 Changelog 应用了操作日志形式存储,Truncate 比较慢,局部全量会放得很大。

针对于 Sliding Window 的优化也始终在探讨中,比方反对 Changelog 之间的 Merge 形式。目前机制下,是否启用 Changelog 是空间放大与 Checkpoint 稳定性和更快速度之间的取舍。目前云上的空间绝对便宜,因而在大多数状况下,就义空间换取性能和稳定性的计划是能够承受的。

将 Local Recovery 敞开后,RocksDB 与 Changelog 之间存在 9 秒的时间差。联合 Local Recovery 性能将 download 局部的差距抹掉,最终工夫差距为 3 秒左右。在极限 TPS 上,Changelog 会有 10% 的损耗。测试时将整个 Benchmark 打满,打到反压后测试其极限状态。后续咱们将针对 DSTL 上传局部做优化。

值得注意的是,这里测试的是极限状况。日常状况下,两者的 TPS 性能相差不多,甚至 Changelog 更优。因为在 同样的 Interval 设置下,RocksDB Compaction 会变得更频繁。

将来,咱们的优化方向次要蕴含以下三个:

  • 第一,缩小空间耗费,以及优化极限 TPS 场景。
  • 第二,联合 Failover 2.0,使 Checkpoint 做得更快、使 Recovery 变得更快以及实现 Reactive Mode 性能。因为在 Reactive Mode 下,减少了资源后会依赖于该机制做 Failover 后重启。如果可能将 Checkpoint 做得更快,状态复原也会变得更快。
  • 第三,让 Table Store 取得更好的数据新鲜度。

三、一览 State/Checkpoint 优化

1.16 版本在 State 和 Checkpoint 方面也做了不少优化。

可用性方面,做了针对于 RocksDB 监控和可用性的晋升,导出了 Database Level 监控。同时也进步了 Unaligned Checkpoint 和 Aligned Checkpoint 之间的切换可用性。

性能方面,基于 DeleteRange 将 RocksDB Rescale 性能晋升 2-10 倍,基于 Overdraft Buffer 晋升了 Checkpoint 性能。

前面在 Flink Forward Asia 2022 核心技术专场分享的文章中,会有更加残缺的展现!

Q&A

Q:RocksDB 与 Changelog 两种存储,相当于在 HDFS 上存了两份,文件系统容量 1.2 是如何计算得出的?

A:按 Full Checkpoint Size 计算得出。

Q:在 HDFS 上的容量应该相当于两倍?

A:不肯定是两倍。做完一次 Materialization 后,Changelog 局部的增量是会变动的。HDFS 上也会做定时清理,防止收缩。

Q:HDFS 有两份,在复原时也下载了两份数据,须要做数据版本合并吗?

A:是的。RocksDB 先复原,而后将 Changelog Apply,相似于合并操作。

Q:Changelog 数据应该曾经比拟精确,再做合并操作是否冗余?

A:Changelog 机制是基于 State Table 上做增量,因而作业复原时还是须要全量数据。

点击查看直播回放 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…

正文完
 0