乐趣区

关于Flink:Flink-113State-Backend-优化及生产实践分享

本文由社区志愿者佳伟整顿,唐云 (茶干) 在 5 月 22 日北京站 Flink Meetup 分享的《State Backend Flink – 1.13 优化及生产实践分享》。内容包含:

  1. 鸟瞰 Flink 1.13 state-backend 变动
  2. RocksDB state-backend 内存管控优化
  3. Flink state-backend 模块倒退布局

一、鸟瞰 Flink 1.13 state-backend 变动

1. State 拜访的性能监控

首先,Flink 1.13 中引入了 State 拜访的性能监控,即 latency trackig state。

通过对每次拜访前后的 System#nanoTime 求差,失去 state 拜访提早值 (latency)。此性能不局限于 State Backend 的类型,自定义实现的 State Backend 也能够复用此性能。State 拜访的性能监控会产生肯定的性能影响,所以,默认每 100 次做一次取样 (sample)。上图即监控后果展现。

State 拜访的性能监控开启后,对不同的 State Backend 性能损失影响不同:

  • 对于 RocksDB State Backend,性能损失大略在 1% 左右;
  • 而对于 Heap State Backend,性能损失最多可达 10%。

上图所示是三个相干的配置项,默认状况下此性能是敞开的,需通过指定参数 state.backend.latency-track.keyed-state-enabled=true 来手动开启。

2. 对立的 Savepoint 格局

Flink 1.13 之后,Savepoint 反对切换 State Backend,极大晋升了零碎应用性。创立 Savepoint 后,可批改作业拓扑中 State Backend 的类型,如从 RocksDB 切换成 Heap,或从 Heap 切换成 RocksDB,但切换仅限于 Savepoint。Checkpoint 所存储的文件格式与 State Backend 类型相干,而非通用格局,Checkpoint 目前暂不反对该性能。

3. 更清晰的 API

还有一个比拟重要的改变,就是对于概念上的清晰化。Flink 1.13 中将状态和检查点两者辨别开来。

在 Flink 中,State Backend 有两个性能:

  • 提供状态的拜访、查问;
  • 如果开启了 Checkpoint,会周期向近程的 Durable storage 上传数据和返回元数据 (meta) 给 Job Manager (以下简称 JM)。

在之前的 Flink 版本中,以上两个性能是混在一起的,即把状态存储和检查点的创立概念抽象得混在一起,导致初学者对此局部感觉很凌乱,很难了解。

目前,State Backend 的品种如上图所示,因为概念的凌乱,导致之前的写法中,RocksDB State Backend 中是能够嵌入 Memory State Backend 或 Heap State Backend 的。实际上,RocksDB 外面嵌入的 State Backend,形容的是其外部 Checkpoint 数据传输方向。

对于 Memory State Backend,在原始构建下,未指定任何的 filepath。且在不开启 HA 的模式下,会将所有 Checkpoint 数据返回给 JM。当 Memory State Backend 指定 filepath,满足上传条件时,Checkpoint 数据间接上传到指定 filepath 下,数据内容不会返回给 JM。

对于 Fs State Backend,数据会间接上传到所定义的 filepath 下。

当然,大家线上用的最多的还是 RocksDB State Backend 搭配上一个近程 fs 地址,旧的写法对于应用 Flink 的用户来说,容易造成状态和检查点了解凌乱。

Flink 1.13 中两个概念被拆开:

  1. 其中,State Backend 的概念变窄,只形容状态拜访和存储;
  2. 另外一个概念是 Checkpoint storage,形容的是 Checkpoint 行为,如 Checkpoint 数据是发回给 JM 内存还是上传到近程。所以,绝对应的配置项也被拆开。

以后不仅须要指定 State Backend,还须要指定 Checkpoint Storage。以下就是新老接口的对应关系:

当然,尽管旧接口目前依然保留,但还是举荐大家应用新接口,向新形式迁徙,从概念上也更清晰一些。

4. RocksDB partitioned Index & filter

上面要提的就是对于 RocksDB 的优化:

Flink 1.13 中对 RocksDB 减少了分区索引性能。如上图所示,RocksDB Block Cache 中存储的数据蕴含三局部:

  1. Data Block (实在数据)
  2. Index Block (每条数据的索引)
  3. Filter Block (对文件的 Bloom Filter)

能够通过方块大小显著看出块大小,Index 和 Filter 是显著大于 Data 的。以 256M SSD 文件为例,Index Block 大略是 0.5M,Filter Block 大略是 5M,Data Block ze 则默认是 4KB。当 Cache Block 是几百 MB 的时候,如果文件数特地多,Index 和 Filter 一直的替出换入,性能会十分差,尤其是在默认开启了内存管控后。比拟显著的景象是,IO 特地频繁,性能始终上不去。

Flink 1.13 中,复用了 RocksDB 的 partitioned Index & filter 性能,简略来说就是对 RocksDB 的 partitioned Index 做了多级索引。也就是将内存中的最上层常驻,上层依据须要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,绝对于内存比拟小的场景中,性能晋升 10 倍左右。所以,如果在内存管控下 Rocksdb 性能不如预期的话,这也能成为一个性能优化点。

目前共有两个参数可管制这个性能:

  1. state.backend.rocksdb.memory.partitioned-index-filters:true (默认 false)
  2. state.backend.rocksdb.block.metadata-blocksize (多级索引内存配置)

5. 默认行为变动

Flink 1.13 中,默认行为产生如上图所示的变动。

  • 不再反对 state.backend.async 配置项,所有的 Checkpoint 均是异步的 (同步 Checkpoint 场景很少,已去除);
  • state.backend.rocksdb.checkpoint.transfer.thread.num 默认值增大到 4 RocksDB 增量 Checkpoint 时,4 个线程多线程上传文件 RocksDB 从增量 Checkpoint 复原数据时,采纳 4 个线程多线程下载。

当然,性能晋升的同时,对 HDFS 底层压力更大些,如果降级后 HDFS 不稳固,可思考是否与此处相干。

二、RocksDB state-backend 内存管控优化

Flink 1.10 开始做 state-backend 内存优化,在之后的每个版本中都有相干改良。

对 RocksDB State Backend 做内存管控的最根本起因在于 Flink state 与 RocksDB 的 Column Family (独立内存) 一一对应。

在 Flink 1.10 之前,如果申明两个 state,会各自享受本人的 Write Buffer 和 Cache 内存,Flink 并没有对一个 operator 中的 state 数量限度,实践上用户能够设置几千个、几万个 state,可能导致容器内存撑爆。另外,Flink 在 slot-sharing 机制下,一个 slot 内能够存在多个蕴含 keyed state 的 operator,也很难保障 state 个数不超。

多个 RocksDB 会有多个 Write Buffer Manager。如上图所示,以单个 Write Buffer Manager 为例,它将本人的内存 reserve 到 Block Cache 中,依据本人的内存管控逻辑来实现记账,Block Cache 内有 LRU Handle,超出预算时,会被踢出。

上图提到的 arena block,是 Write Buffer 最小内存调配单元,默认是 Write buffer 默认配置的 1/8,内存默认为 8MB。但在极其状况下,磁盘上会呈现小文件过多的景象,导致性能十分差。如当整体内存调配过小时,Write Buffer 所管控的内存数量也就会比拟少,刚开始申请内存时,默认申请 8MB 内存,当已用内存达到总内存的 7/8 时,会被置为 Immutable (置为不可变),之后这部分数据被替出到磁盘上。

如果单个 arena block 内存占比过大,可能会呈现临界 arena block 只写了几 KB,但触发了 Write Buffer 的内存行为管控,将 arena block 置为了 Immutable,之后的数据就会被刷进来,产生小文件,导致性能十分差。对于 LSM DB 来说,提前 flush,对读放大性能产生很大影响,Write Buffer 无奈缓存更多读申请。

咱们引入对 arena block 大小有强校验,当 arena block 大小不适合时,会打印 Warning 级别日志,认为以后须要对 arena block 大小作出相应调整。即须要升高 arena block 大小,从而解决数据提前被 flush 的问题,进而晋升性能。

RocksDB Block Cache 为了进步并发性能,将 arena block 分成了若干个分片 (shards)。本质上是 Write Buffer Manager 在做 reserve 时,将 arena block 拆成了若干个 dummy entry,实际上只做了记账,会占据 block cache 的逻辑容量。目前 Flink 应用的 RocksDB 版本中,shards 默认是 1MB,可能会有 shards 的数据超过预算的危险。起初的 RocksDB 高版本中,将 1MB 调成了 256KB 来解决这个危险。因为 Flink 1.13 中没有对 RocksDB 版本升级,所以这个问题仍然存在。此外,Flink 1.13 中,没有将 RocksDB Block Cache 内存管控设置成严格模式 (Strict Mode)。

目前社区用的 RocksDB 的版本是 5.17.2,与 RocksDB 社区最新的 6.17+ 版本,相差大略一两千个 commit。社区在尝试降级 RocksDB 版本时,发现高版本有一些性能回退,即便尽力解决,也只是解决了其中一部分,在局部拜访接口下,还是有大概不到 10% 的性能降落。所以,Flink 1.13 决定暂不降级 RocksDB 版本,社区预计会在 Flink 1.14 中做相应降级,引入 RocksDB 一些新的 future,借此补救目前已知的 10% 性能回退的 Gap。

综上各种问题,RocksDB 内存管控不欠缺,加上 Writer Buffer 对 Data Block 不严格的管控,在实践上还是存在肯定小几率内存超用的。但就目前来看,整体还是比较稳定,超用的局部不会太多。如果想手动多分一部分内存给 RocksDB 来避免超用,预防在云原生的环境因 OOM 被 K8S kill,可手动将 JVM OverHead 内存调大,如上图所示。

之所以不调大 Task Off-Heap,是因为目前 Task Off-Heap 是和 Direct Memeory 混在一起的,即便调大整体,也并不一定会分给 RocksDB 来做 Buffer,所以咱们举荐通过调整 JVM OverHead 来解决内存超用的问题。同理,如果 Flink 中用到其余相干库,遇到类似问题,也能够尝试将 JVM OverHead 调大来解决。如果想查明内存透露起因,也能够联合相应 jemalloc + jeprof 等剖析工具排查解决。

三、Flink state-backend 模块倒退布局

以下为 state-backend 模块在 Flink1.14、1.15 中的倒退布局:

要阐明的是,目前只有 RocksDB 反对增量 Checkpoint。

对于 Changelog,在 Apache Kafka 和 Apache Pulsar 中都有这个概念。Changelog 的引入,是 Flink 作为流式计算零碎,对传统消息中间件的借鉴。即在数据上传的同时,做一个 proxy,将数据定期写到内部的 log 里,每次做 Checkpoint 时不须要等数据上传,进而使 Checkpoint 的工夫更加可控。

Flink 1.13 曾经实现了 proxy 代理层,理论的逻辑层还没有实现,在 Flink 1.14 中会做具体实现,包含相干 log 清理逻辑。心愿在 Flink 1.14 中对状态和检查点性能有更好的晋升,尤其是目前二阶段提交依赖于 Checkpoint commit,Changelog State Backend 的引入,预计在 Flink 1.14 能够尽快解决相干痛点。

退出移动版