文章转载自:https://ververica.cn/develope…
作者:邱从贤(山智)
Apache Flink 是一个有状态的流计算框架,状态是作业算子中曾经解决过的内存状态,供后续解决时应用。状态在流计算很多简单场景中十分重要,比方:
- 保留所有历史记录,用来寻找某种记录模式
- 保留最近一分钟的所有记录,用于对每分钟的记录进行聚合统计
- 保留以后的模型参数,用于进行模型训练
有状态的流计算框架必须有很好的容错性,能力在生产环境中施展用途。这里的容错性是指,不论是产生硬件故障,还是程序异样,最终的后果不丢也不重。
Flink 的容错性从一开始就是一个十分弱小的个性,在遇到故障时,可能保障不丢不重,且对失常逻辑解决的性能影响很小。
这外面的外围就是 checkpoint 机制,Flink 应用 checkpoint 机制来进行状态保障,在 Flink 中 checkpoint 是一个定时触发的全局异步快照,并长久化到长久存储系统上(通常是分布式文件系统)。产生故障后,Flink 抉择从最近的一个快照进行复原。有用户的作业状态达到 GB 甚至 TB 级别,对这么大的作业状态做一次 checkpoint 会十分耗时,耗资源,因而咱们在 Flink 1.3 中引入了增量 checkpoint 机制。
在增量 checkpoint 之前,Flink 的每个 checkpoint 都蕴含作业的所有状态。咱们在察看到状态在 checkpoint 之间的变动并没有那么大之后,反对了增量 checkpoint。增量 checkpoint 仅蕴含上次 checkpoint 和本次 checkpoint 之间状态的差别(也就是“增量”)。
对于状态十分大的作业,增量 checkpoint 对性能的晋升非常明显。有生产用户反馈对于 TB 级别的作业,应用增量 checkpoint 后能将 checkpoint 的整体工夫从 3 分钟降到 30 秒。这些工夫节俭次要归功于不须要在每次 checkpoint 都将所有状态写到长久化存储系统。
如何应用
以后,仅可能在 RocksDB StateBackend 上应用增量 checkpoint 机制,Flink 依赖 RocksDB 外部的备份机制来生成 checkpoint 文件。Flink 会主动清理掉之前的 checkpoint 文件, 因而增量 checkpoint 的历史记录不会有限增长。
为了在作业中开启增量 checkpoint,倡议具体浏览 Apache Flink 的 checkpoint 文档,简略的说,你能够像之前一样开启 checkpoint,而后将构造函数的第二个参数设置为 true 来启用增量 checkpoint。
Java 示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(filebackend, true));
Scala 示例
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new RocksDBStateBackend(filebackend, true))
Flink 默认保留一个胜利的 checkpoint,如果你须要保留多个的话,能够通过上面的配置进行设置:
state.checkpoints.num-retained
原理解析
Flink 的增量 checkpoint 以 RocksDB 的 checkpoint 为根底。RocksDB 是一个 LSM 构造的 KV 数据库,把所有的批改保留在内存的可变缓存中(称为 memtable),所有对 memtable 中 key 的批改,会笼罩之前的 value,以后 memtable 满了之后,RocksDB 会将所有数据以有序的写到磁盘。当 RocksDB 将 memtable 写到磁盘后,整个文件就不再可变,称为有序字符串表(sstable)。
RocksDB 的后盾压缩线程会将 sstable 进行合并,就反复的键进行合并,合并后的 sstable 蕴含所有的键值对,RocksDB 会删除合并前的 sstable。
在这个根底上,Flink 会记录上次 checkpoint 之后所有新生成和删除的 sstable,另外因为 sstable 是不可变的,Flink 用 sstable 来记录状态的变动。为此,Flink 调用 RocksDB 的 flush,强制将 memtable 的数据全副写到 sstable,并硬链到一个长期目录中。这个步骤是在同步阶段实现,其余剩下的局部都在异步阶段实现,不会阻塞失常的数据处理。
Flink 将所有新生成的 sstable 备份到长久化存储(比方 HDFS,S3),并在新的 checkpoint 中援用。Flink 并不备份前一个 checkpoint 中曾经存在的 sstable,而是援用他们。Flink 还可能保障所有的 checkpoint 都不会援用曾经删除的文件,因为 RocksDB 中文件删除是由压缩实现的,压缩后会将原来的内容合并写成一个新的 sstable。因而,Flink 增量 checkpoint 可能切断 checkpoint 历史。
为了追踪 checkpoint 间的差距,备份合并后的 sstable 是一个绝对冗余的操作。然而 Flink 会增量的解决,减少的开销通常很小,并且能够放弃一个更短的 checkpoint 历史,复原时从更少的 checkpoint 进行读取文件,因而咱们认为这是值得的。
举个栗子
上图以一个有状态的算子为例,checkpoint 最多保留 2 个,上图从左到右别离记录每次 checkpoint 时本地的 RocksDB 状态文件,援用的长久化存储上的文件,以及以后 checkpoint 实现后文件的援用计数状况。
- Checkpoint 1 的时候,本地 RocksDB 蕴含两个 sstable 文件,该 checkpoint 会把这两个文件备份到长久化存储,当 checkpoint 实现后,对这两个文件的援用计数进行加 1,援用计数应用键值对的形式保留,其中键由算子的以后并发以及文件名所组成。咱们同时会保护一个援用计数中键到对应文件的隐射关系。
- Checkpoint 2 的时候,RocksDB 生成两个新的 sstable 文件,并且两个旧的文件还存在。Flink 会把两个新的文件进行备份,而后援用两个旧的文件,当 checkpoint 实现时,Flink 对这 4 个文件都进行援用计数 +1 操作。
- Checkpoint 3 的时候,RocksDB 将 sstable-(1),sstable-(2) 以及 sstable-(3) 合并成 sstable-(1,2,3),并且删除了三个旧文件,新生成的文件蕴含了三个删除文件的所有键值对。sstable-(4) 还持续存在,生成一个新的 sstable-(5) 文件。Flink 会将 sstable-(1,2,3) 和 sstable-(5) 备份到长久化存储,而后减少 sstable-4 的援用计数。因为保留的 checkpoint 数达到下限(2 个),因而会删除 checkpoint 1,而后对 checkpoint 1 中援用的所有文件(sstable-(1) 和 sstable-(2))的援用计数进行 -1 操作。
- Checkpoint 4 的时候,RocksDB 将 sstable-(4),sstable-(5) 以及新生成的 sstable-(6) 合并成一个新的 sstable-(4,5,6)。Flink 将 sstable-(4,5,6) 备份到长久化存储,并对 sstabe-(1,2,3) 和 sstable-(4,5,6) 进行援用计数 +1 操作,而后删除 checkpoint 2,并对 checkpoint 援用的文件进行援用计数 -1 操作。这个时候 sstable-(1),sstable-(2) 以及 sstable-(3) 的援用计数变为 0,Flink 会从长久化存储删除这三个文件。
竞争问题以及并发 checkpoint
Flink 反对并发 checkpoint,有时晚触发的 checkpoint 会先实现,因而增量 checkpoint 须要抉择一个正确的基准。Flink 仅会援用胜利的 checkpoint 文件,从而避免援用一些被删除的文件。
从 checkpoint 复原以及性能
开启增量 checkpoint 之后,不须要再进行其余额定的配置。如果 Job 异样,Flink 的 JobMaster 会告诉所有 task 从上一个胜利的 checkpoint 进行复原,不论是全量 checkpoint 还是增量 checkpoint。每个 TaskManager 会从长久化存储下载他们须要的状态文件。
只管增量 checkpoint 能缩小大状态下的 checkpoint 工夫,然而天下没有收费的午餐,咱们须要在其余方面进行舍弃。增量 checkpoint 能够缩小 checkpoint 的总工夫,然而也可能导致复原的时候须要更长的工夫 。 如果集群的故障频繁,Flink 的 TaskManager 须要从多个 checkpoint 中下载须要的状态文件(这些文件中蕴含一些曾经被删除的状态),作业复原的整体工夫可能比不应用增量 checkpoint 更长。
另外在增量 checkpoint 状况下,咱们不能删除旧 checkpoint 生成的文件,因为新的 checkpoint 会持续援用它们,这可能导致须要更多的存储空间,并且复原的时候可能耗费更多的带宽。
对于管制便捷性与性能之间均衡的策略能够参考此文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/large_state_tuning.html