通过本文你能 get 到以下几点:
- Flink 内应用大状态时,该如何配置?
- 常见的负载平衡策略有哪些?
- Flink 源码中在抉择 RocksDB 状态磁盘时,存在的问题。
- 一些解决方案,并剖析了每种计划的利弊。
一、为什么要优化?(优化背景)
Flink 反对多种 StateBackend,当状态比拟大时目前只有 RocksDBStateBackend 可供选择。
RocksDB 是基于 LSM 树原理实现的 KV 数据库,LSM 树读放大问题比较严重,因而对磁盘性能要求比拟高,强烈建议生产环境应用 SSD 作为 RocksDB 的存储介质。然而有些集群可能并没有配置 SSD,仅仅是一般的机械硬盘,当 Flink 工作比拟大,且对状态拜访比拟频繁时,机械硬盘的磁盘 IO 可能成为性能瓶颈。在这种状况下,该如何解决此瓶颈呢?
应用多块硬盘来分担压力
RocksDB 应用内存加磁盘的形式存储数据,当状态比拟大时,磁盘占用空间会比拟大。如果对 RocksDB 有频繁的读取申请,那么磁盘 IO 会成为 Flink 工作瓶颈。
强烈建议在 flink-conf.yaml 中配置 state.backend.rocksdb.localdir 参数来指定 RocksDB 在磁盘中的存储目录。当一个 TaskManager 蕴含 3 个 slot 时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间互相争抢同一个磁盘 io,这样必然导致三个并行度的吞吐量都会降落。
庆幸的是 Flink 的 state.backend.rocksdb.localdir 参数能够指定多个目录,个别大数据服务器都会挂载很多块硬盘,咱们冀望同一个 TaskManager 的三个 slot 应用不同的硬盘从而缩小资源竞争。具体参数配置如下所示:
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb
留神:务必将目录配置到多块不同的磁盘上,不要配置单块磁盘的多个目录,这里配置多个目录是为了让多块磁盘来分担压力。
如下图所示是笔者测试过程中磁盘的 IO 使用率,能够看出三个大状态算子的并行度别离对应了三块磁盘,这三块磁盘的 IO 均匀使用率都放弃在 45% 左右,IO 最高使用率简直都是 100%,而其余磁盘的 IO 均匀使用率为 10% 左右,绝对低很多。由此可见应用 RocksDB 做为状态后端且有大状态的频繁读写操作时,对磁盘 IO 性能耗费的确比拟大。
上述属于现实状况,当设置多个 RocksDB 本地磁盘目录时,Flink 会随机抉择要应用的目录,所以就可能存在三个并行度共用同一目录的状况。
如下图所示,其中两个并行度共用了 sdb 磁盘,一个并行度应用 sdj 磁盘。能够看到 sdb 磁盘的 IO 均匀使用率曾经达到了 91.6%,此时 sdb 的磁盘 IO 必定会成为整个 Flink 工作的瓶颈,会导致 sdb 磁盘对应的两个并行度吞吐量大大降低,从而使得整个 Flink 工作吞吐量升高。
如果服务器挂载的硬盘数量较多,个别不会呈现该状况,然而如果工作重启后吞吐量较低,能够查看是否产生了多个并行度共用同一块磁盘的状况。
Flink 可能会呈现多个并行度共用同一块磁盘的问题,那该如何解决呢?
二、罕用的负载平衡策略
从景象来看,为 RocksDB 调配了 12 块磁盘,仅仅有 3 个并行度须要应用 3 块磁盘,然而有肯定几率 2 个并行度共用同一块磁盘,甚至可能会有很小的几率 3 个并行度共用同一块磁盘。这样咱们的 Flink 工作很容易因为磁盘 IO 成为瓶颈。
上述调配磁盘的策略,实际上就是业界的负载平衡策略。通用的负载平衡策略有 hash、随机以及轮循等策略。
Hash 策略
工作自身通过某种 hash 策略后,将压力分担到多个 Worker 上。对应到上述场景,就是将多个 slot 应用的 RocksDB 目录压力分担到多块磁盘上。然而 hash 可能会有抵触的状况,hash 抵触示意多个不同的 Flink 并行度,通过 hash 后失去的 hashCode 一样,或者 hashCode 对硬盘数量求余后被调配到同一块硬盘。
Random 策略
随机策略是每来一个 Flink 工作,生成一个随机数,将压力随机调配到某个 Worker 上,也就是将压力随机调配到某块磁盘。然而随机数也会存在抵触的状况。
Round Robin 策略
轮循策略比拟容易了解,多个 Worker 轮流接收数据即可,Flink 工作第一次申请 RocksDB 目录时应用目录 1,第二次申请目录时应用目录 2,顺次申请即可。该策略是分配任务数最平均的策略,如果应用该策略会保障所有硬盘调配到的工作数相差最大为 1。
最低负载策略 / Least Response Time(最短响应工夫)策略
依据 Worker 的响应工夫来分配任务,响应工夫短阐明负载能力强,应该多调配一些工作。对应到上述场景就是检测各个磁盘的 IO 使用率,使用率低示意磁盘 IO 比拟闲暇,应该多分配任务。
指定权重策略
为每个 Worker 调配不同的权重值,权重值高的任务分配更多的工作,个别调配的工作数与权重值成正比。
例如 Worker0 权重值为 2,Worker1 权重为 1,则分配任务时 Worker0 调配的工作数尽量调配成 Worker1 工作数的两倍。该策略可能并不适宜以后业务场景,个别雷同服务器上每个硬盘的负载能力相差不会很大,除非 RocksDB 的 local dir 既蕴含 SSD 也蕴含 HDD。
三、源码中如何调配磁盘?
笔者线上应用 Flink 1.8.1 版本,呈现了有些硬盘调配了多个并行度,有些硬盘一个并行度都没有调配。能够大胆的猜想一下,源码中应用 hash 或者 random 的概率比拟高,因为大多数状况下,每个硬盘只分到一个工作,小几率调配多个工作(要解决的就是这个小几率调配多个工作的问题)。
如果应用轮循策略,必定会保障每个硬盘都调配一个并行度当前,才会呈现单硬盘调配两个工作的状况。而且轮循策略能够保障调配的硬盘是间断的。
间接看 RocksDBStateBackend 类的局部源码:
/** Base paths for RocksDB directory, as initialized.
这里就是咱们上述设置的 12 个 rocksdb local dir */
private transient File[] initializedDbBasePaths;
/** The index of the next directory to be used from {@link #initializedDbBasePaths}.
下一次要应用 dir 的 index,如果 nextDirectory = 2,则应用 initializedDbBasePaths 中下标为 2 的那个目录做为 rocksdb 的存储目录 */
private transient int nextDirectory;
// lazyInitializeForJob 办法中,通过这一行代码决定下一次要应用 dir 的 index,// 依据 initializedDbBasePaths.length 生成随机数,// 如果 initializedDbBasePaths.length = 12,生成随机数的范畴为 0-11
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
剖析完简略的源码后,咱们晓得了源码中应用了 random 的策略来调配 dir,跟咱们所看到的景象可能匹配。随机调配有小概率会呈现抵触。(写这篇文章时,Flink 最新的 master 分支代码依然是上述策略,尚未做任何改变)
四、应用哪种策略更正当?(各种策略带来的挑战)
random 和 hash 策略在工作数量比拟大时,能够保障每个 Worker 承当的任务量根本一样,然而如果任务量比拟小,例如将 20 个工作通过随机算法调配给 10 个 Worker 时,就会呈现有的 Worker 调配不到工作,有的 Worker 可能调配到 3 或 4 个工作。所以 random 和 hash 策略不能解决 rocksdb 调配磁盘不均的痛点,那轮循策略和最低负载策略呢?
轮循策略
轮循策略能够解决上述问题,解决形式如下:
// 在 RocksDBStateBackend 类中定义了
private static final AtomicInteger DIR_INDEX = new AtomicInteger(0);
// nextDirectory 的调配策略变成了如下代码,每次将 DIR_INDEX + 1,而后对 dir 的总数求余
nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;
通过上述即可实现轮循策略,申请磁盘时,从 0 号磁盘开始申请,每次应用下一块磁盘即可。
■ 带来的问题:
Java 中动态变量属于 JVM 级别的,每个 TaskManager 属于独自的 JVM,所以 TaskManager 外部保障了轮循策略。如果同一台服务器上运行多个 TaskManager,那么多个 TaskManager 都会从 index 为 0 的磁盘开始应用,所以导致 index 较小的磁盘会被常常应用,而 index 较大的磁盘可能常常不会被应用到。
■ 解决方案 1:
DIR_INDEX 初始化时,不要每次初始化为 0,能够生成一个随机数,这样能够保障不会每次应用 index 较小的磁盘,实现代码如下所示:
// 在 RocksDBStateBackend 类中定义了
private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));
然而上述计划不能齐全解决磁盘抵触的问题,同一台机器上 12 块磁盘,TaskManager0 应用 index 为 0、1、2 的三块磁盘,TaskManager1 可能应用 index 为 1、2、3 的三块磁盘。后果就是 TaskManager 外部来看,实现了轮循策略保障负载平衡,然而全局来看,负载并不平衡。
■ 解决方案 2:
为了全局负载平衡,所以多个 TaskManager 之间必须通信能力做到相对的负载平衡,能够借助第三方的存储进行通信,例如在 Zookeeper 中,为每个服务器生成一个 znode,znode 命名能够是 host 或者 ip。应用 Curator 的 DistributedAtomicInteger 来保护 DIR_INDEX 变量,存储在以后服务器对应的 znode 中,无论是哪个 TaskManager 申请磁盘,都能够应用 DistributedAtomicInteger 将以后服务器对应的 DIR_INDEX + 1,从而就能够实现全局的轮循策略。
DistributedAtomicInteger 的 increment 的思路 :先应用 Zookeeper 的 withVersion api 进行 +1 操作(也就是 Zookeeper 提供的 CAS api),如果胜利则胜利;如果失败,则应用分布式互斥锁进行 +1 操作。
基于上述形容,咱们失去两种策略来实现轮循,AtomicInteger 只能保障 TaskManager 外部的轮循,不能保障全局轮循。如果要基于全局轮循,须要借助 Zookeeper 或其余组件来实现。如果对轮循策略要求比拟刻薄,能够应用基于 Zookeeper 的轮循策略,如果不想依赖内部组件则只能应用 AtomicInteger 来实现。
最低负载策略
思维就是 TaskManager 启动时,监测所有 rocksdb local dir 对应的磁盘最近 1 分钟或 5 分钟的 IO 均匀使用率,筛掉 IO 使用率较高的磁盘,优先选择 IO 均匀使用率较低的磁盘,同时在 IO 均匀使用率较低的磁盘中,仍然要应用轮循策略来实现。
■ 面临的问题
- Flink 工作启动时,只能拿到磁盘以后的 IO 使用率,是一个瞬时值,会不会不靠谱?
- Flink 工作启动,不可能期待工作先采集 1 分钟 IO 使用率当前,再去启动。
- 不想依赖内部监控零碎去拿这个 IO 使用率,要思考通用性。
- 假如曾经拿到了所有硬盘最近 1 分钟的 IO 使用率,该如何去决策呢?
- 对于 IO 均匀使用率较低的磁盘中,仍然要应用轮循策略来实现。
- IO 均匀使用率较低,这里的较低不好评判,相差 10% 算低,还是 20%、30%。
- 而且不同的新工作对于磁盘的使用率要求也是不一样的,所以评判难度较大。
■ 新思路(discussing)
启动阶段不采集硬盘的负载压力,应用之前的 DistributedAtomicInteger 根本就能够保障每个硬盘负载平衡。然而工作启动后一段时间,如果因为 Flink 工作导致某个磁盘 IO 的均匀使用率绝对其余磁盘而言十分高。咱们能够抉择迁徙高负载硬盘的数据到低负载硬盘。
基于此剖析,最低负载策略比拟麻烦,笔者目前尚未实现此策略。
五、总结
本文剖析了目前 Flink 应用大状态时遇到的问题,并给出了多种解决方案。
目前笔者曾经实现了随机、TaskManager 内轮循、基于 Zookeeper 的全局轮循三种策略,并利用到生产环境,能够间接在 flink-conf.yaml 文件中配置策略。目前来看基于 Zookeeper 的全局轮循策略十分好。之后尽量会回馈给社区。