乐趣区

关于后端:蚂蚁流场景状态演进和优化

本文整顿自蚂蚁团体实时计算组技术专家闵文俊在 FFA 2023 核心技术(一)中 的分享,内容对于蚂蚁流场景状态演进和优化的钻研,次要分为以下三局部:

  1. 状态后端的演进
  2. 优化与适配
  3. 将来瞻望

一. 状态后端的演进

1. 蚂蚁状态后端的演进

在蚂蚁的生产环境中,最早有一套流计算零碎外部的我的项目,代号为 Kepler。在 Kepler 零碎实现了一套可插拔的状态后端存储系统。在咱们生产环境中应用最宽泛的是基于 Hbase 和 OceanBase 这种近程存储的状态后端,它的劣势在于 checkpoint 十分的快,因为流工作在计算的过程中,它的数据曾经写到近程存储中,并且它的 failover 也会比拟快,因为在 failover 的过程中,它不波及到数据的下载以及复原的过程。它也不依赖于本地磁盘,对于云化环境会比拟敌对。

并且在分布式存储系统中,个别都提供了拜访这些数据的 API,所以对于这些曾经存储在状态后端外面的数据是能够通过这些 API 间接拜访、查问进去,比方咱们线上会通过 Hbase 的 API 间接将咱们存储在这些零碎中的流工作的状态的 offset 间接裸露进去,这就相似于 Flink 外面的 qureyable state 这种能力。

它的劣势也比拟显著。首先,它的性能会比拟差。因为对于这种分布式存储系统,它所能提供的拜访提早,根本都是在毫秒级,而本地存储个别可能达到微秒级,所以这个时延就比拟高,它的整体的拜访 TPS 就比拟受限,个别不太适宜于大状态的工作。第二,它的稳定性会有余,因为它会受限于第三方服务的稳定性影响,它整体的 SLA 就不如本地存储。

2. 基于 RocksDB 本地状态后端

在咱们陆续从蚂蚁外部资源的流计算,逐渐切到社区的 Flink 的过程中,状态后端也就天然切换到 Flink 自带的基于 RocksDB 的本地状态存储中。

RocksDB 状态存储也是当初应用的十分多的状态存储,它的劣势次要是,它是基于 RocksDB 这种高性能的本地 KVstore,所以它的状态拜访的延时非常低,它能满足工夫敏感比拟高的工作吞吐的需要。稳定性更好。

它的劣势在于,第一因为 RocksDB 自身是基于 LSMtree 这样的构造的,就义了肯定的读的性能,来满足它的写的性能。然而当咱们写入的前台流量比拟大的时候,它的后盾的 compaction 会去逐渐占用更多的机器的 IO 和 CPU 资源,而影响到前台的写入以及拜访的需要,体现的比拟显著. 在线上的工作中会看察看到的景象,工作在高峰期的时候,会看到它的吞吐反而会有降落的体现。

第二,这种基于 RocksDB 的本地状态后端,它的 Rescale 和 Failover 也会比较慢。

第三,它的 Checkpoint 的代价会比拟高,因为它波及到本地的状态文件的上传. 为了更好的解决这些问题,蚂蚁外部自研了一套基于 RocksDB 的本地的 KV 存储去尝试优化以上的几个问题。

3. RocksDB compaction 过程

在介绍本地 KV 存储之前,咱们先看一下 RocksDB 的 compaction 的过程。它在 compaction 过程中,会将某一层级校验出它所须要进行 compaction 的文件,而后去进行 compact。在 compact 的过程中,它会挑选出下一层级中与这一层的文件,将 key 存在 overlap 的文件进行 Merge。在 Merge 的过程中,它的目标是去升高空间放大,以及通过文件的合并来缩小后续查问中的文件的开销。然而在合并的过程中,因为咱们自身 KV 的格局,并且一般来说 value 的是比拟大的。然而在合并的过程中,其实这个 value 是不须要随着 KV 合并的过程而合并的。所以,基于这样的一个思维,提出了 KV 拆散的 RocksDB 的加强的实现。

4. AntKV: 基于 RocksDB KV 拆散的本地存储引擎

社区或者工业界有十分多的 KV 拆散的思路,比拟典型的是论文 whiskey 外面的一个 KV 拆散的实际。咱们的 KV 整体的思路也是靠近于这种 KV 拆散的思维,这个图外面显示的是一个 KV value 的写入的过程。

首先,它的 value 局部会被写入到 value log 里,同时会将 value log 所对应的文件 offset 替换成原始的 KV 数据外面 value 局部,而后再写到 LSM tree 里。在后续的 compaction 的过程中,只有 SST file 外面的数据才会参加 compaction,value log 这个数据不会参加 compaction。

留神这外面, 并不是所有的 KV 都参加 KV 拆散的实现。因为从察看到的景象来看,流计算里也并不是所有的数据都满足 KV 拆散的 pattern。因为从 KV 拆散的 pattern 的次要受害场景来说,value 比拟大的时候,通过缩小 value 的 compaction 的 IO 操作来晋升整体吞吐。如果 value 并不大,就会导致后续查问的时候,须要先通过 LSM 查到对应的 key 所对应的 value 的 address,而后再跳转到 value log 里去查问这个 value 所对应的实在的数据,这样就减少了查问的开销。所以,为了解决这样的问题,在 KV 拆散的时候,可能让它自适应的抉择 KV 拆散的策略,简略来说就是会通过 value 的阈值是否达到配置的大小,来抉择它是否 KV 拆散。在此之外,还反对了其余的一些性能,包含 value log 的空间回收 GC、TTL 和 check point、Merge Op、AsyncRestore 还有 Table API 以及其余的一些优化。

这里次要讲一下它在 GC 下面的反对。因为在非 KV 拆散的模式下,它的 compaction 的过程中,就曾经会将各个 SST 文件中反复的 key 所对应的数据只保留最新的那一份,所以在 compaction 过程中,这个空间就曾经被自然而然的回收。

然而在拆散的模式下,在参加 compaction 过程中,它其实在 LSM tree 只有 key 和 value Address 的局部去参加 compaction。所以,当这个 value 不再被援用的时候,它就曾经成为垃圾被放在 value log 外面,而且须要一个清理机制将其清理。

这里的思路有很多种,咱们当初的计划外面采纳的是一种被动回收的策略,就是在 compaction 的过程中,咱们会去记录每一个 value log 所对应的 value 占用的垃圾比例。因为咱们在 compaction 的过程中就能够晓得哪些 key 被 compact 掉,所以咱们晓得这个 key 所对应的 value log 的文件,因而咱们就能齐全统计出这样的数据。当 value log 垃圾占比达到百分之百的时候,就能够平安的将这个 value log 间接清理掉。

然而还会存在另外一个问题,value log 外面有可能某一些 key 迟迟的没有参加 compaction 过程,就会造成 value log 残留下来,导致肯定水平的空间放大。对于这个问题,咱们次要是通过设置一个清理的阈值解决,比方当达到百分之九十的时候,咱们就须要通过这些垃圾占比过高的文件去反向查找 SST 文件,而后将这些 SST 文件作为下一次 compaction 的候选,而后将它进行 compaction。在这个过程中,它就能将这些 value log 的空间回收掉。

从以上来看,这套 AntKV 的存储系统的次要的劣势在于,首先升高了 compaction 的读写放大,对于线上的成果来看,整体的 get put 的耗时更加稳固,也相比于之前更低。第二点是更高效的缓存,因为它升高的 LSM tree 自身有大小,所以它更多的 key 能够被缓存在 block cache 外面,以及自适应的 KV 拆散, 实用多种场景。

它的次要劣势在于 scan 的效率会受影响,因为 scan 在 KV 拆散模式中,可能会变成一个随机 IO,以及它的数据清理会延后,于是带来额定的磁盘的开销。

从 AntKV 这个角度来去测试整体的吞吐读写测试的话,相比于 RocksDB 来说,达到了两到四倍左右的晋升,比拟可观。

二.优化与适配

从引擎侧去和 AntKV 适配,咱们次要实现了 AntKV 的 statebackend,Backend 实现了插件化革新,避免和 RocksDB 的 JNI 的命名抵触,并且在 checkpoint 阶段去对 AntKV 所产生的额定的 value log 的数据文件进行长久化治理。其余的优化包含 scan 的 prefetch 优化,为了解决在它 scan 产生随机 IO 的状况下如何晋升它 scan 的性能的问题、以及基于 learned index 的这种 O(1) 的查问优化可能极大的晋升它的 get 的时效性、以及在 Statebackend 外面内置反对 TTL 的性能。

1. No Unique Key Join 优化

以上是从 AntKV 的角度来看存储引擎。从引擎的角度来看,以上是一个十分典型的 No Unique Key Join 的一个 case。咱们晓得在 Flink 外面,在它 Join 的场景中,它会依据你 Join Input 流是否蕴含 Unique Key 以及 Join Key 是否蕴含 Unique Key 去抉择适合的状态存储的形式。

以图为例,Input1 是一个没有 Unique Key 的流,Input2 是一个 Join key 蕴含了 Unique Key 的流。当 Input1 进来的时候,如果它去查问 Input2 的数据,它的查问的模式是以 Join Key 去查找这种模式。所以它的模式是比拟合乎于 AntKV 这样的优化,它是一个小 Key 大 Value 的 Pattern,它的查问过程也只须要 get 点查。而当我 Input2 的数据来的时候,它的存储模式是依照 Join Key 加 Record 拼接成一个大 Key,而它的 Value 是一个示意这个 Record 的行数,所以在整体存储的 Pattern 下,它看起来就会比拟奇怪。而后它在查问的过程中,就须要依照 Join key 先去 seek 到某一个前缀,而后在 iterator 去遍历出这个 Pattern 下所有的 Record。首先,这个查问效率在非 KV 拆散模式或者在 RocksDB 的场景中,它的成果就不好。这是因为,首先这种大 Key 在查找的过程中就波及比拟大的一个字符串的比拟以及它的缓存效率会比拟低。

那么有没有方法将这种 Pattern 和 KV 拆散的优化匹配下来呢?咱们就想方法去将存储格局改成一个比拟适合的形式。如图的 Join Key 拼对应的一个 List Record,就是一种设计形式。而后,咱们在线上也采纳了多种形式去测试这种场景。

计划如图中显示的,它应用的是一个 Value State,而后 Join Key 就对应到一个 List Record,示意这个 Join Key 下所有呈现的 Record。计划二是 Map State,它也是以 Join Key 作为前缀,再加上 Record 的 MD5 值拼接成一个 Key,而后它的 Value 是 Record 以及它呈现的次数。计划三是应用的一个 List State 的 Value,就是在我写入的过程中,有一条 Record,Append 写进去的这样的成果。从咱们线上测试的成果来看,就是这三种计划,都看到它的 Join Key 的 Value Pattern 都是小 Key 带 Value 的这种模式。

所以,计划一、计划二、计划三的 TPS 就是优于原始计划的。而计划一的整体吞吐应该能达到原始计划的四倍以上,所以咱们最终抉择了计划一的实现计划。以上就是蚂蚁在状态后端上的演进。

2. 状态问题优化

2.1 小文件问题

小文件的问题在社区也是一个比拟有名的问题,它次要在咱们外部可能会导致两个问题。第一个问题是会导致咱们线上呈现一些 Task 继续 deploying,它没有方法复原。第二个问题是小文件导致咱们的文件数扩充继续有余,继续影响 DFS 的稳定性。

咱们先来看一下为什么会有小文件的问题。小文件的问题可能次要和增量 Checkpoint 的计划无关,因为增量 Checkpoint 自身就是问题。

2.2 增量 Checkpoint 问题

增量 Checkpoint 的过程中要上传全量文件的优化计划,然而它在增量 Checkpoint 的过程中,会将每一个 Task 在 Checkpoint 之前,将 RocksDB 先 Flush 生成本次 Checkpoint 之间的新增的 SST 文件,而后再将这些本次和上次之间 Diff 进去的 SST 文件上传到近程,然而这些 SST 文件通常不会很大。在继续的过程中,这些 SST 文件就会导致产生很多的小文件。

3. 对状态复原的影响

而后咱们再来看为什么这种小文件会导致 Task deploy 的问题。首先,咱们能够看到在 Checkpoint 的过程中,它是一个增量的过程,比方咱们图中的 Subtask Manager 在做 Checkpoint 的过程中,它可能在 value logs 里有一个小于 20KB 的小文件,那么它在现有的机制里就会间接将这个数据传到 Job Manager 中,由 Job Manager 写入到 Metadata 文件外面,这也是一种优化机制,为了无需独自生成一个文件,将那些特地小的文件间接放到 Job Manager 的 MataData。而在这个继续的过程中,它可能会产生比拟多的小文件,并记录在 Matadata Data 外面。因为做 Checkpoint 是一个增量的过程,然而在复原的过程中,是一个 FullRestore 的过程,就可能会存在提交 Task 到 Task Executor 上,部署的这个 Descriptor 蕴含了 ByteStream Statehandle 以及 File Stream Handle。这些 ByteStream Statehandle 在继续的增量、Checkpoint 的过程中,可能累加超过 RPC 限度的大小,最终导致工作在跑的过程中,因为挂一次而导致再也复原不了。因为要传输的 State Handle 曾经超过了 RPC 的限度。这个问题可能会和咱们的这套 KV 拆散的架构有肯定的关系。因为在非 KV 拆散的模式下,它的 SST 会继续的做 compaction。然而在 KV 拆散模式下的 Value Log 的 Compaction 的频率会低于 SST 的 Compaction 的频率,因为自身的目标是为了缩小 Value Log 的 Compaction 的频率。

为了解决这样的问题,咱们参考了社区之前分享的一套基于 Segment 增量的 Checkpoint 计划。这套计划的次要思路就在于每一个 Task 在做 Checkpoint 过程中,并不会为每一个 SST 文件都生成一个近程的 DFS 文件,而是让每一个 Task 上 Checkpoint 过程中的要上传的文件,尽可能复用一个远端的文件,并且记录每一个 SST 文件。在这个文件中所占用的 Offset 和对应的 Length,去找到它所对应的 Segment。而后通过这种形式,就曾经可能十分好的缓解小文件的问题。

咱们上线之后发现小文件的问题尽管解决了,然而它还是会有空间放大的问题。如下面的工作外面,它从工作侧汇报上来,这个工作只是占用了 38 G 左右的状态大小。但实际上,从 DFS 测试, Checkpoint 目录所占用的空间可能达到了它原始大小的七百多倍左右,并且它的文件数也比拟多。所以,这种 case 并没有达到咱们所预期的成果。

4. 基于 Segment 增量 Checkpoint 计划

有一个问题在于,可能在某一些 DFS 的文件外面,它的文件继续在被援用。比方咱们这个文件外面的第一个文件的四号 SST,它继续被前面的 checkpoint 援用,并没有被 GC 或者被合并成另外一个 SST 文件。这样的话就导致整个文件的大小无奈开释,最终导致它的文件空间放大和文件数无奈消减的问题。

当初要解决这个问题,我的思路非常简单,相似于咱们之前在 AntKV 外面 Value Log 开释的思路,就是在存储空间和 IO 之间,尽可能寻找到一个均衡,比方在上传的时候检测这个 DFS 文件的无效比例,当文件无效比例低于多少的时候,咱们就会将这个原始文件中的 Segment 作为本次 Checkpoint 中新增的 SST 上传到近程。这样的话,原始的 DFS 的文件就能够间接被清理掉。

5. 小文件合并存储空间放大问题

在上线之后,咱们能够看到它的成果,理论存储量降落到了 Full State 的 1.4 倍左右,文件数也大大减少了。

6. 大工作 Union State restore 性能瓶颈

第二个问题是大工作的 Union State restore 的问题。Union State restore 的过程其实是须要每一个 TM 去 Union State,它实质的意思就是每一个 TM 在做 Checkpoint 的过程中,每一个 sub task 上传本人的这部分状态,然而在复原过程中,每一个 subtask 都能复原到原始的状态。当咱们以这种状态复原的时候,就能够看到它是一种 All-to-all 模式的复原。当咱们的并发比拟大的时候,比方咱们当初这种是基于 File StateHandle 的模式复原,它的 DFS 就会成为瓶颈。从咱们线上的工作来看,比方有两千多左右的并发,它整个复原时长可能会达到半个小时左右的长度,这个在业务上是无奈容忍的。

咱们也尝试另一种形式,比方去调低阈值,升高文件数。如果咱们让它间接写到 Job Manager 的 Matter Data 外面,这样就只有一个文件去复原,然而它的 Job Manager 也会成为瓶颈,它会频繁的在复原的过程中去 OOM。OOM 的起因也很简略,因为咱们放在了它的 ByteStreamStateHandle,那么在提交工作的时候,TDD 的大小会占用 Job Manager 的内存。

从咱们提交的理论状况来看,它在复原的过程中,这个 StreamStateHandle 可能会占到百万级别的大小,最终 JM 根本无法复原。

解决这个问题的思路,第一点是在 JobManager 侧,咱们只去创立一份原始的 Statehandle,就不必给每一个 Task 都复制一份,第二点是利用 Blob Server 去传输 TDD 中的大状态对象。

这个 Blob Server 在 Job Manager 外面自身就在用,所以咱们也是利用了这样的一个机制去传输大对象。因为 Blob Server 自身也是在做这样的事件。

第三点是在同一个 TaskManager 下面,咱们只去拉取一次 Union State 的状态,数据 Task 之间的初始化是共享的。通过以上的一系列的优化,最终它原来半小时的重启工作,当初能够没有压力地重启,而且 JM 内存也不再是它的瓶颈。

7. 工作 Rescale 问题

最初一个问题是工作 Rescale 的问题,社区也在继续优化这个问题。

咱们在很早的时候就曾经在尝试社区的 DeleteRange API 的优化。咱们发现在 DeleteRange API 的模式下,InitialDB 的抉择能够有更宽松的限度。

因为在晚期的版本中,它的 Rescale 的过程就是每个 Task 先去下载属于我的 Handle 数据,下载回来之后,须要将数据导入进去,而后再将不属于我的 DB 数据删除掉。原来实现这种删除是通过遍从来删除,所以它须要一个高比例的 DB 作为它的 base,否则它的删除代价也是很高的。然而当初在 DeleteRange 这种 API 的模式下,从右图能够看到,在咱们的测试中,它 Rescale 的耗时散布外面 DeleteRange 所占用的工夫能够忽略不计,根本都是毫秒级别。所以在 DeleteRange 模式下,咱们应该尽可能的抉择一个 DB 作为它的 baseDB。

第二点是,咱们发现在 Rescale 的过程中,它磁盘的开销可能会达到原始的两到三倍。因为咱们跑的是一个云化环境,对于磁盘有比拟高的限度,所以咱们对于每一个 Worker 都有磁盘监控,当它磁盘超过某个值的时候,就可能会触发 Kill 的机制,让它开释掉,否则稳定性就无奈失去保障。然而在 Rescale 的过程中,以上图为例,它在复原的过程中,四扩到八个场景中,每一个原始的 Handle 比方 1 到 10 局部的状态会被 1 到 5 和 5 到 10 这两局部都下载。这样的话,状态在复原的过程中它是 Double 的,并且可能因为 InitialDB 抉择的并不适合导致我在 Open 时创立一个全新的 DB 将原始的数据全副导入进去,这样就可能会达到两到三倍的存储开销。

要解决这个问题,第一点就是刚刚咱们说到的,DeleteRange InitialDB 的抉择。第二个点是咱们在复原的过程中会去做磁盘的肯定弹性,提供了磁盘肯定的弹性能力,不让它在 Rescale 的过程中因为复原的过程间接将一个 Worker 干掉。第三个是尝试应用 SST ingest API 去实现常量工夫的 Rescale,对于这个性能,社区也在做这样的尝试。在咱们外部的实现过程中,第一点最开始实现的也是相似于社区之前的计划,就是在复原的过程中,将原始的 DB 去遍历,而后通过 SST writer 的 API 去写出图里的测试后果。能够看到,它的次要耗时还是在于原始的 DB 遍历和 Put 的开销上,所以通过 SST writer 这样的形式,还是不能解决问题。所以咱们也在看社区现有的计划,以及咱们外部的 AntKV 同学致力去将整体的耗时尽可能的降下来。

三.将来瞻望

将来瞻望次要分为两方面。

一方面是冀望可能在现有的状态后端上,实现更快的 Failover 和 Rescale 的能力。第二块是基于咱们当初的 AntKV 的这套存储引擎的个性,去做出更多的算子级别的性能优化,包含 Join 场景的优化,以及在 Windows 上的场景也会去做这样的摸索。

另一方面是升高本地存储的依赖,因为支流的还是在迁往云化环境,所以对于本地存储的依赖也将会成为一个问题。

其次是局部计算下沉存储引擎,咱们当初也在摸索 Paimon 数据湖产品,它也是基于 LSM tree 的湖存储。从图能够看到,它所反对的是比拟灵便的去定义两个 SST 文件合并,它的 key 合并的 Merge engine,以及当初曾经反对的 Aggregation 的 Merge engine,在生产业务中比较简单的聚合场景就曾经能够通过这种模式玩转起来。

它的益处在于,因为有很多业务在 Flink 外面做聚合的时候,须要 Flink 状态去帮它存储全量的数据。然而咱们不举荐它将 State 作为一个长久化的存储。然而在数据湖外面,它齐全能够通过这种形式,因为在计算存储端的 Merge 过程中,就曾经被实现了。

另外一个益处在于,这种 Merge 的过程是大批量的去做的,它会比原始的通过 Flink State 的 Record 级别去 Get Put 的这种形式的性能会更好。然而它的代价也是会用肯定的时延去换整体的吞吐,对于那些能够承受这样的时延开销的业务场景,咱们曾经在尝试应用这种形式去接入这种业务,来满足他们的业务需要。

Flink Forward Asia 2023

本届 Flink Forward Asia 更多精彩内容,可微信扫描图片二维码观看全副议题的视频回放及 FFA 2023 峰会材料!

更多内容

流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
59 元试用 实时计算 Flink 版(3000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

退出移动版