关于java:RocketMQ-Compaction-Topic的设计与实现

43次阅读

共计 2836 个字符,预计需要花费 8 分钟才能阅读完成。

本文作者:刘涛,阿里云智能技术专家。

01 Compaction Topic 介绍

一般来说,音讯队列提供的数据过期机制有如下几种,比方有基于工夫的过期机制——数据保留多长时间后即进行清理,也有基于数据总量的过期机制——数据分区数据量达到肯定值后进行清理。

而 Compaction Topic 是一种基于 key 的数据过期机制,即对于雷同 key 的数据只保留最新值。

该个性的利用场景次要为保护状态信息,或者在须要用到 KV 构造时,能够通过 Compaction Topic 将 key-value 信息间接保留到 MQ,从而解除对外部数据库的依赖。比方保护生产位点,能够将生产组加分区作为 key,将生产位点做 offset,以音讯模式发送到 MQ,压缩之后,生产时获取最新 offset 信息即可。另外,像 connect 里的 source 信息比方 Binlog 解析位点或其余 source 解决的位点信息均可存到 Compaction Topic。同时 Compaction Topic 也反对 存储 RSQLDB 与 RStreams 的 checkpoint 信息。

02 须要解决的问题

Compaction 过程中,须要解决如下几个问题:

第一,数据写入过程中,数据如何从生产者发送到 broker 并且最终落盘,数据主备之间的 HA 如何保障?

第二,整个 compaction 的流程包含哪几个步骤?如果数据量太大,如何优化?

第三,数据生产时如何索引音讯?如果找不到音讯指定的 offset 音讯,如何解决?

第四,如果有机器故障,如何复原老数据?

03 方案设计与实现

第一,数据如何写入。

首先写入到 CommitLog,次要为复用 CommitLog 自身的 HA 能力。而后通过 reput 线程将 CommitLog 音讯依照 Topic 加 partition 的维度拆分到不同文件里,按分区整顿音讯,同时生成索引。这样最终音讯就按 Topic 加 partition 的粒度做了规整。

在 compaction 过程中,为什么不在原先的 commitLog 上做规整,而是再额定按分区做规整?起因如下:

  1. 所有数据都会写到 CommitLog,因而单个 Topic 的数据不间断。如果要遍历单个 topic 的所有数据,可能须要跳着读,这样就会导致大量冷读,对磁盘 IO 影响比拟大。
  2. CommitLog 数据有主动过期机制,会将老数据删除,因而不能将数据间接写到 CommitLog,而 CompactionLog 里的老数据为按 key 过期,不肯定会删除。
  3. compact 以分区为维度进行。如果多个分区同时做 compact,效率较低。因为很多分区的 key 同时在一个构造里,会导致同一个分区可能 compact 的数据比拟少,并且 compact 之后也须要从新写一份么,因而,索性就在 compact 之前将音讯通过 reput service 从新归整一遍。

Compact 流程如下:

第一步,确定须要做 compaction 的数据文件列表。个别大于两个文件,须要排除以后正在写的文件。

第二步,将上一步筛选出的文件做遍历,失去 key 到 offset 的映射关系。

第三步,依据映射关系将须要保留的数据从新写到新文件。

第四步,用新文件替换老文件,将老文件删除。

第二步的构建 OffsetMap 次要目标在于能够晓得哪文件须要被保留、哪文件须要被删除,以及文件的前后关系,这样就能够确定写入的布局,确定布局之后,就能够依照 append 的形式将须要保留的数据写到新文件。

此处记录的并非 key 到 value 的信息,而是 key 到 Offset 的信息。因为 value 的数据 body 可能较长,比拟占空间,而 offset 是固定长度,且通过 offset 信息也能够明确音讯的先后顺序。另外,key 的长度也不固定,间接在 map 存储原始 key 并不适合。因而咱们将 MD5 作为新 key,如果 MD5 雷同 key 认为也雷同。

做 compaction 时会遍历所有音讯,将雷同 key 且 offset 小于 OffsetMap 的值删除。最终通过原始数据与 map 构造失去压缩之后的数据文件。

上图为目录构造展现。写入时下面为数据文件,上面为索引,要 compact 的是标红两个文件。压缩后的文件存储于子目录,须要将老文件先标记为删除,将子目录文件与 CQ 同时移到老的根目录。留神,文件与 CQ 文件名一一对应,能够一起删除。

随着数据量越来越大,构建的 OffsetMap 也会越来越大,导致无奈包容。

因而不能应用全量构建形式,不能将所有要 compact 的文件的 OffsetMap 一次性构建,须要将全量构建改为增量构建,构建逻辑也会有小的变动。

第一轮构建:如上图,先构建下面局部的 OffsetMap,而后遍历文件,如果 offset 小于 OffsetMap 中对应 key 的 offset 则删除,如果等于则保留。而上面局部的音讯的 offset 必定大于 OffsetMap 内的 offset,因而也须要保留。

第二轮构建:从上一次完结的点开始构建。如果上一轮中的某个 key 在新一轮中不存在,则保留上一轮的值;如果存在,则仍然依照小于删除、大于保留的准则进行构建。

将一轮构建变为两轮构建后,OffsetMap 的大小显著升高,构建的数据量也显著升高。

原先的索引为 CommitLog Position、Message Size 和 Tag Hush,而当初咱们复用了 bcq 构造。因为 Compact 之后数据不间断,无奈依照先前的形式间接查找数据所在物理地位。因为 queueOffset 仍然为枯燥增排列,因而能够通过二分查找形式将索引找出。

二分查找须要 queueoffset 信息,索引构造也会发生变化,而 bcq 带有 queueoffse 信息,因而能够复用 bcq 的构造。

Queueoffset 在 compact 前后放弃不变。如果 queueoffset 不存在,则获取第一个大于 queueoffset 的音讯,而后从头开始将所有全量数据发送给客户端。

机器故障导致音讯失落时,须要做备机的重建。因为 CommitLog 只能复原最新数据,而 CompactionLog 须要老数据。之前的 HA 形式下,数据文件可能在 compact 过程中被被删除,因而也不能基于复制文件的形式做主备间同步。

因而,咱们实现了基于 message 的复制。即模仿生产申请从 master 上拉取音讯。拉取位点个别从 0 开始,大于等于 commitLog 最小 offset 时完结。拉取完结之后,再做一次 force compaction 将 CommitLog 数据与复原时的数据做一次 compaction,以保障保留的数据是被压缩之后的数据。后续流程不变。

04 应用阐明

生产者侧应用现有生产者接口,因为要按分区做 compact,因而须要将雷同 key 路由到雷同的 MessageQueue,须要本人实现相干算法。

消费者侧应用现有消费者接口,生产到音讯后,存入本地类 Map 构造中再进行应用。咱们的场景大多为从头开始拉数据,因而须要在开始时将生产位点重置到 0。拉取完当前,将音讯 key 与 value 传入本地 kv 构造,应用时间接从该构造拿取即可。

正文完
 0