共计 3102 个字符,预计需要花费 8 分钟才能阅读完成。
作者: 施晓罡
本文来自 2018 年 8 月 11 日在北京举行的 Flink Meetup 会议,分享来自于施晓罡,目前在阿里大数据团队部从事 Blink 方面的研发,现在主要负责 Blink 状态管理和容错相关技术的研发
本文主要内容如下:
- 有状态的流数据处理;
- Flink 中的状态接口;
- 状态管理和容错机制实现;
- 阿里相关工作介绍;
一. 有状态的流数据处理
1.1 什么是有状态的计算
计算任务的结果不仅仅依赖于输入,还依赖于它的当前状态,其实大多数的计算都是有状态的计算。比如 wordcount, 给一些 word, 其计算它的 count, 这是一个很常见的业务场景。count 做为输出,在计算的过程中要不断的把输入累加到 count 上去,那么 count 就是一个 state。
1.2. 传统的流计算系统缺少对于程序状态的有效支持
- 状态数据的存储和访问;
- 状态数据的备份和恢复;
- 状态数据的划分和动态扩容;
在传统的批处理中,数据是划分为块分片去完成的,然后每一个 Task 去处理一个分片。当分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于 state 的需求还是比较小的。
对于流计算而言,对 State 有非常高的要求,因为在流系统中输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要将状态数据很好的管理起来。很不幸的是,在传统的流计算系统中,对状态管理支持并不是很完善。比如 storm, 没有任何程序状态的支持,一种可选的方案是 storm+hbase 这样的方式去实现,把这状态数据存放在 Hbase 中,计算的时候再次从 Hbase 读取状态数据,做更新在写入进去。这样就会有如下几个问题
- 流计算系统的任务和 Hbase 的数据存储有可能不在同一台机器上,导致性能会很差。这样经常会做远端的访问,走网络和存储;
- 备份和恢复是比较困难,因为 Hbase 是没有回滚的,要做到 Exactly onces 很困难。在分布式环境下,如果程序出现故障,只能重启 Storm,那么 Hbase 的数据也就无法回滚到之前的状态。比如广告计费的这种场景,Storm+Hbase 是是行不通的,出现的问题是钱可能就会多算,解决以上的办法是 Storm+mysql,通过 mysql 的回滚解决一致性的问题。但是架构会变得非常复杂。性能也会很差,要 commit 确保数据的一致性。
- 对于 storm 而言状态数据的划分和动态扩容也是非常难做,一个很严重的问题是所有用户都会在 strom 上重复的做这些工作,比如搜索,广告都要在做一遍,由此限制了部门的业务发展。
1.3.Flink 丰富的状态访问和高效的容错机制
Flink 在最早设计的时候就意识到了这个问题,并提供了丰富的状态访问和容错机制。如下图所示:
二.Flink 中的状态管理
2.1. 按照数据的划分和扩张方式,Flink 中大致分为 2 类:
- Keyed States
- Operator States
2.1.1.Keyed States
Keyed States 的使用
Flink 也提供了 Keyed States 多种数据结构类型
Keyed States 的动态扩容
2.1.2.Operator State
Operator States 的使用
Operator States 的数据结构不像 Keyed States 丰富,现在只支持 List
Operator States 多种扩展方式
Operator States 的动态扩展是非常灵活的,现提供了 3 种扩展,下面分别介绍:
- ListState: 并发度在改变的时候,会将并发上的每个 List 都取出,然后把这些 List 合并到一个新的 List, 然后根据元素的个数在均匀分配给新的 Task;
- UnionListState: 相比于 ListState 更加灵活,把划分的方式交给用户去做,当改变并发的时候,会将原来的 List 拼接起来。然后不做划分,直接交给用户;
- BroadcastState: 如大表和小表做 Join 时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。做的更新也相同,当改变并发的时候,把这些数据 COPY 到新的 Task 即可
以上是 Flink Operator States 提供的 3 种扩展方式,用户可以根据自己的需求做选择。
使用 Checkpoint 提高程序的可靠性
用户可以根据的程序里面的配置将 checkpoint 打开,给定一个时间间隔后,框架会按照时间间隔给程序的状态进行备份。当发生故障时,Flink 会将所有 Task 的状态一起恢复到 Checkpoint 的状态。从哪个位置开始重新执行。
Flink 也提供了多种正确性的保障,包括:
- AT LEAST ONCE;
- Exactly once;
备份为保存在 State 中的程序状态数据
Flink 也提供了一套机制,允许把这些状态放到内存当中。做 Checkpoint 的时候,由 Flink 去完成恢复。
从已停止作业的运行状态中恢复
当组件升级的时候,需要停止当前作业。这个时候需要从之前停止的作业当中恢复,Flink 提供了 2 种机制恢复作业:
- Savepoint: 是一种特殊的 checkpoint,只不过不像 checkpoint 定期的从系统中去触发的,它是用户通过命令触发,存储格式和 checkpoint 也是不相同的,会将数据按照一个标准的格式存储,不管配置什么样,Flink 都会从这个 checkpoint 恢复,是用来做版本升级一个非常好的工具;
- External Checkpoint:对已有 checkpoint 的一种扩展,就是说做完一次内部的一次 Checkpoint 后,还会在用户给定的一个目录中,多存储一份 checkpoint 的数据;
三.状态管理和容错机制实现
下面介绍一下状态管理和容错机制实现方式,Flink 提供了 3 种不同的 StateBackend
- MemoryStateBackend
- FsStateBackend
- RockDBStateBackend
用户可以根据自己的需求选择,如果数据量较小,可以存放到 MemoryStateBackend 和 FsStateBackend 中,如果数据量较大,可以放到 RockDB 中。
下面介绍 HeapKeyedStateBackend 和 RockDBKeyedStateBackend
第一,HeapKeyedStateBackend
第二,RockDBKeyedStateBackend
Checkpoint 的执行流程
Checkpoint 的执行流程是按照 Chandy-Lamport 算法实现的。
Checkpoint Barrier 的对齐
全量 Checkpoint
全量 Checkpoint 会在每个节点做备份数据时,只需要将数据都便利一遍,然后写到外部存储中,这种情况会影响备份性能。在此基础上做了优化。
RockDB 的增量 Checkpoint
RockDB 的数据会更新到内存,当内存满时,会写入到磁盘中。增量的机制会将新产生的文件 COPY 持久化中,而之前产生的文件就不需要 COPY 到持久化中去了。通过这种方式减少 COPY 的数据量,并提高性能。
四. 阿里相关工作介绍
4.1.Flink 在阿里的成长路线
阿里是从 2015 年开始调研 Flink,2015 年 10 月启动 Blink 项目,并完善 Flink 在大规模生产下的一些优化和改进。2016 年双 11 采用了 Blink 系统,为搜索,推荐,广告业务提供服务。2017 年 5 月 Blink 已成为阿里的实时计算引擎。
4.2. 阿里在状态管理和容错相关的工作
正在做的工作,基于 State 重构 Window 方面的一些优化,阿里也正在将功能做完善。后续将包括 asynchronous Checkpoint 的功能完善,并和社区进一步沟通和合作。帮助 Flink 社区完善相关方面的工作。