本文作者:刘涛,阿里云智能技术专家。
01 Compaction Topic 介绍
一般来说,音讯队列提供的数据过期机制有如下几种,比方有基于工夫的过期机制——数据保留多长时间后即进行清理,也有基于数据总量的过期机制——数据分区数据量达到肯定值后进行清理。
而 Compaction Topic 是一种基于 key 的数据过期机制,即对于雷同 key 的数据只保留最新值。
该个性的利用场景次要为保护状态信息,或者在须要用到 KV 构造时,能够通过 Compaction Topic 将 key-value 信息间接保留到 MQ,从而解除对外部数据库的依赖。比方保护生产位点,能够将生产组加分区作为 key,将生产位点做 offset,以音讯模式发送到 MQ,压缩之后,生产时获取最新 offset 信息即可。另外,像 connect 里的 source 信息比方 Binlog 解析位点或其余 source 解决的位点信息均可存到 Compaction Topic。同时 Compaction Topic 也反对 存储 RSQLDB 与 RStreams 的 checkpoint 信息。
02 须要解决的问题
Compaction 过程中,须要解决如下几个问题:
第一,数据写入过程中,数据如何从生产者发送到 broker 并且最终落盘,数据主备之间的 HA 如何保障?
第二,整个 compaction 的流程包含哪几个步骤?如果数据量太大,如何优化?
第三,数据生产时如何索引音讯?如果找不到音讯指定的 offset 音讯,如何解决?
第四,如果有机器故障,如何复原老数据?
03 方案设计与实现
第一,数据如何写入。
首先写入到 CommitLog,次要为复用 CommitLog 自身的 HA 能力。而后通过 reput 线程将 CommitLog 音讯依照 Topic 加 partition 的维度拆分到不同文件里,按分区整顿音讯,同时生成索引。这样最终音讯就按 Topic 加 partition 的粒度做了规整。
在 compaction 过程中,为什么不在原先的 commitLog 上做规整,而是再额定按分区做规整?起因如下:
- 所有数据都会写到 CommitLog,因而单个 Topic 的数据不间断。如果要遍历单个 topic 的所有数据,可能须要跳着读,这样就会导致大量冷读,对磁盘 IO 影响比拟大。
- CommitLog 数据有主动过期机制,会将老数据删除,因而不能将数据间接写到 CommitLog,而 CompactionLog 里的老数据为按 key 过期,不肯定会删除。
- compact 以分区为维度进行。如果多个分区同时做 compact,效率较低。因为很多分区的 key 同时在一个构造里,会导致同一个分区可能 compact 的数据比拟少,并且 compact 之后也须要从新写一份么,因而,索性就在 compact 之前将音讯通过 reput service 从新归整一遍。
Compact 流程如下:
第一步,确定须要做 compaction 的数据文件列表。个别大于两个文件,须要排除以后正在写的文件。
第二步,将上一步筛选出的文件做遍历,失去 key 到 offset 的映射关系。
第三步,依据映射关系将须要保留的数据从新写到新文件。
第四步,用新文件替换老文件,将老文件删除。
第二步的构建 OffsetMap 次要目标在于能够晓得哪文件须要被保留、哪文件须要被删除,以及文件的前后关系,这样就能够确定写入的布局,确定布局之后,就能够依照 append 的形式将须要保留的数据写到新文件。
此处记录的并非 key 到 value 的信息,而是 key 到 Offset 的信息。因为 value 的数据 body 可能较长,比拟占空间,而 offset 是固定长度,且通过 offset 信息也能够明确音讯的先后顺序。另外,key 的长度也不固定,间接在 map 存储原始 key 并不适合。因而咱们将 MD5 作为新 key,如果 MD5 雷同 key 认为也雷同。
做 compaction 时会遍历所有音讯,将雷同 key 且 offset 小于 OffsetMap 的值删除。最终通过原始数据与 map 构造失去压缩之后的数据文件。
上图为目录构造展现。写入时下面为数据文件,上面为索引,要 compact 的是标红两个文件。压缩后的文件存储于子目录,须要将老文件先标记为删除,将子目录文件与 CQ 同时移到老的根目录。留神,文件与 CQ 文件名一一对应,能够一起删除。
随着数据量越来越大,构建的 OffsetMap 也会越来越大,导致无奈包容。
因而不能应用全量构建形式,不能将所有要 compact 的文件的 OffsetMap 一次性构建,须要将全量构建改为增量构建,构建逻辑也会有小的变动。
第一轮构建:如上图,先构建下面局部的 OffsetMap,而后遍历文件,如果 offset 小于 OffsetMap 中对应 key 的 offset 则删除,如果等于则保留。而上面局部的音讯的 offset 必定大于 OffsetMap 内的 offset,因而也须要保留。
第二轮构建:从上一次完结的点开始构建。如果上一轮中的某个 key 在新一轮中不存在,则保留上一轮的值;如果存在,则仍然依照小于删除、大于保留的准则进行构建。
将一轮构建变为两轮构建后,OffsetMap 的大小显著升高,构建的数据量也显著升高。
原先的索引为 CommitLog Position、Message Size 和 Tag Hush,而当初咱们复用了 bcq 构造。因为 Compact 之后数据不间断,无奈依照先前的形式间接查找数据所在物理地位。因为 queueOffset 仍然为枯燥增排列,因而能够通过二分查找形式将索引找出。
二分查找须要 queueoffset 信息,索引构造也会发生变化,而 bcq 带有 queueoffse 信息,因而能够复用 bcq 的构造。
Queueoffset 在 compact 前后放弃不变。如果 queueoffset 不存在,则获取第一个大于 queueoffset 的音讯,而后从头开始将所有全量数据发送给客户端。
机器故障导致音讯失落时,须要做备机的重建。因为 CommitLog 只能复原最新数据,而 CompactionLog 须要老数据。之前的 HA 形式下,数据文件可能在 compact 过程中被被删除,因而也不能基于复制文件的形式做主备间同步。
因而,咱们实现了基于 message 的复制。即模仿生产申请从 master 上拉取音讯。拉取位点个别从 0 开始,大于等于 commitLog 最小 offset 时完结。拉取完结之后,再做一次 force compaction 将 CommitLog 数据与复原时的数据做一次 compaction,以保障保留的数据是被压缩之后的数据。后续流程不变。
04 应用阐明
生产者侧应用现有生产者接口,因为要按分区做 compact,因而须要将雷同 key 路由到雷同的 MessageQueue,须要本人实现相干算法。
消费者侧应用现有消费者接口,生产到音讯后,存入本地类 Map 构造中再进行应用。咱们的场景大多为从头开始拉数据,因而须要在开始时将生产位点重置到 0。拉取完当前,将音讯 key 与 value 传入本地 kv 构造,应用时间接从该构造拿取即可。