乐趣区

关于flink:Flink-State-Backend-Improvements-and-Evolution-in-2021

摘要:本文整顿自 ASF Member、Apache Flink & HBase PMC、阿里巴巴资深技术专家李钰 (绝顶),Apache Flink Committer、阿里巴巴技术专家唐云 (茶干) 在 Flink Forward Asia 2021 核心技术专场的演讲。次要内容包含:

  1. State Backend Improvement
  2. Snapshot Improvement
  3. Future Work

FFA 2021 直播回放 & 演讲 PDF 下载

一、State Backend improvement

在过来一年中,Flink 社区 state-backend 模块有了很大的倒退。在 1.13 版本之前,用户对于状态相干算子的性能不足监控伎俩,也没有很好的方法能够获悉状态读写操作的提早。

咱们引入了状态拜访的延时监控,原理就是在每次状态拜访前后,应用 system.nowTime 去统计拜访延时,而后将其存储在一个 histgram 类型的指标中,监控性能关上对其性能的影响,尤其是 state 拜访的性能影响是比拟小的。

对于状态拜访提早监控相干的配置,须要特别强调的是采样距离和保留历史数据这两个配置,采样距离越小,数据后果就越精确,然而对日常拜访的性能影响也稍大一些;历史数据保留的个数越多,数据后果越准确,然而内存占用也会稍大一些。

在 Flink 1.14 版本中,咱们终于将 RocksDB 从 5.17 降级到 6.20 版本,除了 RocksDB 本身若干 bug 的修复,新版 RocksDB 也减少了一些个性,能够用于 Flink 1.14 和 1.15 中。首先它反对了 ARM 平台,能够保障 Flink 作业可能在 arm 根底上运行,其次提供了更细粒度的 WriteBuffer 内存管控,晋升内存管控的稳定性。此外,还提供了 deleteRange 接口,为之后在扩容场景下的性能晋升带来十分大的帮忙。

随着云原生的愈发风行,通过 K8s 调入在容器环境中运行 Flink 作业曾经成为越来越多厂商的抉择,这其中不可避免须要思考受限资源如何稳固运行,尤其是内存应用方面的管控。而诞生在 2010 年度的 RocksDB 在这方面的能力先天有些有余,Flink 1.10 才首次引入了内存管控。在过来的一年中,RocksDB 在内存管控方面又有了一些提高和改善。

首先回顾一下 RocksDB 内存方面的问题,谈这个问题之前要理解 Flink 是如何应用 state 和 RocksDB 的。

  • Flink 每申明一个 state,都会对应 RocksDB 中的一个 column family,column family 是 RocksDB 中独立的内存调配,它们之间通过物理资源来隔离;
  • 其次,Flink 并不限度用户在一个 operator 内申明的 state 数目,所以它也没有限度 column family 数目;
  • 最初,Flink 在 slot-sharing 机制下一个 slot 内能够存在多个蕴含 keyed state 的 operator。

基于以上三个起因,即便不思考 RocksDB 本身在内存治理上的限度,实践上来说 Flink 的应用形式就有可能导致不受限的内存应用。

上图定义了一个 SQL 类的多个 RocksDB 的实例,共享了一个 writeBuffer manager 及其对应的 block cache,其中治理多个 writeBuffer 的 manager 将它所申请的内存在 block cache 中进行记账,数据相干的 block 在 block cache 中进行缓存,缓存包含数据相干的 data block,索引相干的 index block 和过滤用的 filter block,能够简略地了解成写缓存和读缓存。

由此可见,writeBuffer manager 与 block cache 协同工作的形式就是 manager 在 block cache 中进行记账。

manager 在 buffer 申请流程后,会以 io blocks 为根本单位在 block cache 中进行内存降级。默认 io block 是单个 writeBuffer 的 1/8,writeBuffer 的配置是 64Mb,所以 io block 的 size 是 8Mb,而这 8Mb 内存申请会再次拆成若干 dummy entry,调配到 Block 若干的 shard 中。须要特地阐明的一点是,Flink 降级 RocksDB 之后,dummy entry 的最小单元降到了 256KB,升高了内存申请超额的概率。

因为 RocksDB 自身的设计是为多线程思考的,所以在一个 cache 中会存在多个 shard,所以它的内存申请就会比较复杂。

WriteBuffer manager 外部实现产生的内存中,可变的 WriteBuffer 何时转化为不可变的 WriteBuffer。Immutable table 刷到磁盘上的过程中,默认 mutable writebuffer 的使用量是有下限的,达到下限之后就会提前 flush 这些 WriteBuffer。这会导致一个问题,即便写入的数据量不大,一旦申请 arena block,尤其是 arena block 比拟多的状况下,就会提前触发 member table flush 的问题。从用户角度来说,会发现本地存在大量很小的 SST 文件,整体的读写性能也很差,因而 Flink 社区专门对此做了 arena block size 的配置校验性能。

目前 RocksDB 本身存在内存管控的有余和限度,所以须要在特定场景下预留一部分对外内存给 RocksDB 超额应用。对照上图 Flink process 内存模型,能够看到须要在 jvm-overhead 上对内存进行适当的保留,避免 RocksDB 超用。右边的表格展现了 jvm-overhead 相干的默认配置值,如果想要将 jvm-overhead 配置成 512Mb,只有将 mini 和 max 都配置成 512Mb 即可。

在内存无限的场景下,data block,index Block 以及 fliter blocks 是存在竞争问题的。上图的 Block 实例是依照理论大小进行绘制的,以 256Mb 文件的 SST 举例,其中 index block 大概是 0.5Mb,fliter block 大略是 5Mb,data block 个别是 4KB-64KB,能够看到 block 的竞争会导致大量的换入换出,极大影响读性能。

为了上述问题,咱们将 RocksDB 的 partition-index 和 partition-filter 性能进行封装,优化了内存受限状况下的性能。就是将索引 index 和过滤 filter 进行分层存储,从而能够在无限内存中尽可能存储数据 block,缩小磁盘的读取概率,从而晋升整体性能。

除了对于稳定性相干的改善,Flink 还着重重构了 state 相干的 API,对老手的了解会更敌对。

以前的 API 是混合了状态读写的 statebackend 和负责容错备份的 checkpoint 两者的概念。以 MemoryStatebackend 和 FsStateBackend 为例,二者在状态读写、拜访对象方面是完全相同的,区别仅在于容错备份,所以初学者很容易混同其中的区别。

上图展现了更新之后的 Flink 状态读写和容错点查 API 与更新前的区别。

新版里咱们将状态拜访与容错备份进行了别离的设置,上图是新版 API 与旧版 API 的对照表格。能够看到,MemoryStatebackend 和 FsStateBackend 负责状态读写的都是 HashMaoStateBackend 的状态存储。

二者的最大区别就是在 checkpoint 容错方面,一个是对应全内存的 ManagercCheckpointStorage,而另一个对应的是基于文件的 FileSystemSCheckpointStorage。置信通过对 API 的重构,可能给开发者以更粗浅的了解。

二、Snashot Improvement

SavePoint 自身是与 state-backend 结耦的,并不局限于是通过什么样的 state-backend 实现。然而以前的 Flink 版本中,不同的 state-backend 的 SavePoint 格局是不同的,然而在新版 Flink 中,社区对立了相干的 SavePoint 格局,对于同样的作业能够在不失落状态的状况下无缝切换 state-backend。

此外,社区还进一步加强了 unaligned checkpoint 的稳定性。将 channel 中的 buffer 作为 in-flight 数据,看作 operator state 的一部分进行提前长久化,防止 barrier 对齐的工夫。

此外,在新版 Flink 中,社区反对了传统的 aligned 与 unaligned 之间的主动切换,只有设置一个全局的超时工夫,Flink 的 checkpoint 达到阈值之后,就会主动的进行切换,置信这个性能的引入也能够进一步帮忙开发者取得更好的 checkpoint 性能。

三、Future Work

将来,咱们会进一步提高 RocksDB backend 的生产易用性。一方面咱们会将 RocksDB 外部一些要害的性能指标,例如 block cache 命中率等增加到规范监控指标中,从而能够更加不便地对 RocksDB 的性能进行调优。另一方面,咱们打算将 RocksDB 的日志文件重定向到 TM 日志目录下或者 TM 日志中,可能更不便地查看 RocksDB 日志信息,来定位问题和调优。

其次咱们会进一步梳理明确 Flink 的快照语义,目前在 Flink 中有三种状态的快照,别离是 checkpoint,savepoint 和 retained checkpoint。

  • 其中 checkpoint 是零碎快照,其数据生命周期齐全由 Flink 框架管制,用于在异样产生时进行 fail over,一旦作业进行,将被主动删除;
  • savepoint 负责对立格局的数据备份,其生命周期与 Flink 作业解耦,齐全由用户管制,能够用来实现 Flink 作业的版本升级、跨集群迁徙、state-backend 的切换等需要;
  • 而 retained checkpoint 的语义和生命周期目前都比拟含糊,它能够独立于 Flink 作业生命周期之外存在,但当基于它复原并且关上增量快照的时候,新作业的 checkpoint 会依赖其中的数据,从而导致用户很难判断何时能够平安地将其删除。

为了解决这一问题,社区提出了 FLIP-193,要求使用者基于 retained checkpoint 启动作业的时候,申明是采纳 claim 还是 no-claim 模式。

  • 如果采纳 claim 模式,则该 retained checkpoint 的数据生命周期齐全由新作业掌控,即随着数据 Compaction 的产生,当新快照不再依赖于 retained checkpoint 当中的数据时,新作业能够将其平安删除;
  • 而如果采纳 no-claim 模式,则新作业不能批改 retained checkpoint 的数据,这意味着新作业在第一次快照时须要做物理拷贝,不能引用 retained checkpoint 当中的数据。这样,在须要的时候能够随时手动将 retained checkpoint 的删除,而不须要放心影响基于其复原的作业。

此外,后续咱们打算对用户管制的快照赋予更清晰的语义,引入 native format 的 savepoint 的概念来代替 retained checkpoint。

最初介绍一下正在进行中的 FLIP-158 的工作。它引入了 Changelog based state backend 来实现更疾速安稳的增量快照,相当于引入了一种基于 log 打点形式的快照。相比于目前已有的 snapshot 持重的增量快照机制,它有更短的快照距离,但同时会就义一些状态数据处理延时。这其实就是在提早和容错之间的取舍和平衡。


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

退出移动版