共计 1979 个字符,预计需要花费 5 分钟才能阅读完成。
当我们使用 Flink 进行流式计算时,通常会产生各种形式的中间结果,我们称之为 State。有状态产生,就必然涉及到状态的存储,那么 Flink 中定义了哪些形式的状态存储呢,下面一一给大家介绍一下。
State Backends
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
MemoryStateBackend
顾名思义,MemoryStateBackend 状态后端是将状态数据以 Object 的形式存放于 Java Heap 中。
当执行检查点时,MemoryStateBackend 会为当前的状态生成 snapshot,然后将快照信息作为检查点 ack 消息的一部分发送给 JobManager(master 节点),JobManager 会将收到的快照数据存放于自己的堆内存中。
MemoryStateBackend 默认采用异步 snapshots 的方式来避免数据流管道阻塞,这是一种比较推荐的方式。当然,我们也可以通过配置来禁用这种方式。
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); // MAX_MEM_STATE_SIZE 表示最大允许的状态容量
MemoryStateBackend 的使用限制
- 每个状态的大小默认限制为 5MB,可以通过构造函数设置状态大小
- 不管如何配置最大状态大小,都不能超过 akka 帧大小
- 聚合状态大小必须合乎 JobManager 的内存大小
基于以上这些限制,我们通常建议在如下场景中使用 MemoryStateBackend:
- 本地开发调试
- 无状态作业或者保存少量状态的作业
此外,官方建议将托管内存(Managed Memory
)设置为 0,这样可以确保为 JVM 上的用户程序分配最大的内存。
FsStateBackend
FsStateBackend 需要配置一个文件系统 URL,如:“hdfs://namenode:40010/flink/checkpoints”or“file:///data/flink/checkpoints”。
FsStateBackend 将作业执行过程中的动态数据存放在 TaskManager 的内存当中,当执行检查点时,状态快照数据会被存储在配置的文件系统目录中,还有一部分 metadata 数据会被存储在 JobManager 的内存当中。
同样的,FsStateBackend 也是默认采用异步 snapshot 的方式。我们可以通过实例化 FsStateBackend 来更改快照生成方式。
new FsStateBackend(path, false);
官方建议在以下场景中使用 FsStateBackend:
- 作业中包含大状态、长窗口以及大键值状态
- 高可用应用场景
同样官方建议将托管内存(Managed Memory
)设置为 0,这样可以确保为 JVM 上的用户程序分配最大的内存。
RocksDBStateBackend
RocksDBStateBackend 同样需要配置一个文件系统 URL:“hdfs://namenode:40010/flink/checkpoints”or“file:///data/flink/checkpoints”。
RocksDBStateBackend 将作业执行过程中的动态数据存放在 RocksDB 数据库中,RocksDB 数据库默认存储在 TaskManager 的数据目录下。当执行检查点时,整个 RocksDB 数据库会被存档到配置的文件系统目录下。只有少量的 metadata 数据存储在 JobManager 的内存当中。
同样地,RocksDBStateBackend 通常也采用异步 snapshot 的方式。
使用上的一些限制:
- 由于 RocksDB 的 JNI bridge API 是基于 byte[] 的,因此可支持的最大 key 值大小是 2^31 byte。这个限制一般情况下不会有问题,但当作业中的状态是基于不断地 merge 操作生成时,很容易超过这个大小限制,这个时候就会出现检索失败的错误。
官方建议在以下场景中使用 RocksDBStateBackend:
- 作业中包含大状态、长窗口以及大键值状态
- 高可用应用场景
乍一看,好像跟 FsStateBackend 没啥区别?其实不是,这里需要注意的是,当我们使用 RocksDBStateBackend 作为状态存储时,可以维护的状态大小仅仅受限于程序可访问的磁盘空间大小。这就使得我们可以维护比 FsStateBackend 更大的作业状态。
当然,这也带来一个问题:由于与状态后端之间的所有读写操作都要经过 de-/serialization,因此这种方式牺牲了一定的吞吐量。
总结
- MemoryStateBackend、FsStateBackend 都是基于堆的状态存储
- RocksDBStateBackend 是目前唯一的一种支持增量 checkpoint 的状态后端