作者:张森泽
随着 RocketMQ 5.1.0 的正式公布,多级存储作为 RocketMQ 一个新的独立模块达到了 Technical Preview 里程碑:容许用户将音讯从本地磁盘卸载到其余更便宜的存储介质,能够用较低的老本缩短音讯保留工夫。本文具体介绍 RocketMQ 多级存储设计与实现。
设计总览
RocketMQ 多级存储旨在 不影响热数据读写的前提下 将数据卸载到其余存储介质中,实用于两种场景:
- 冷热数据拆散:RocketMQ 早先产生的音讯会缓存在 page cache 中,咱们称之为 热数据 ;当缓存超过了内存的容量就会有热数据被换出成为 冷数据。如果有少许消费者尝试生产冷数据就会从硬盘中从新加载冷数据到 page cache,这会导致读写 IO 竞争并挤压 page cache 的空间。而将冷数据的读取链路切换为多级存储就能够防止这个问题;
- 缩短音讯保留工夫:将音讯卸载到更大更便宜的存储介质中,能够用较低的老本实现更长的音讯保留工夫。同时多级存储反对为 topic 指定不同的音讯保留工夫,能够依据业务须要灵便配置音讯 TTL。
RocketMQ 多级存储比照 Kafka 和 Pulsar 的实现最大的不同是咱们应用准实时的形式上传音讯,而不是等一个 CommitLog 写满后再上传,次要基于以下几点思考:
- 均摊老本:RocketMQ 多级存储须要将全局 CommitLog 转换为 topic 维度并从新构建音讯索引,一次性解决整个 CommitLog 文件会带来性能毛刺;
- 对小规格实例更敌对:小规格实例往往配置较小的内存,这意味着热数据会更快换出成为冷数据,期待 CommitLog 写满再上传自身就有冷读危险。采取准实时上传的形式既能躲避音讯上传时的冷读危险,又能尽快使得冷数据能够从多级存储读取。
Quick Start
多级存储在设计上心愿升高用户心智累赘:用户无需变更客户端就能实现无感切换冷热数据读写链路,通过简略的批改服务端配置即可具备多级存储的能力,只需以下两步:
- 批改 Broker 配置,指定应用 org.apache.rocketmq.tieredstore.TieredMessageStore 作为 messageStorePlugIn
- 配置你想应用的贮存介质,以卸载音讯到其余硬盘为例:配置 tieredBackendServiceProvider 为 org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment,同时指定新贮存的文件门路:tieredStoreFilepath
可选项:反对批改 tieredMetadataServiceProvider 切换元数据存储的实现,默认是基于 json 的文件存储
更多应用阐明和配置项能够在 GitHub 上查看多级存储的 README [ 1]**
技术架构
architecture
- 接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher 接入层实现 MessageStore 中的局部读写接口,并为他们减少了异步语意。TieredDispatcher 和 TieredMessageFetcher 别离实现了多级存储的上传 / 下载逻辑,相比于底层接口这里做了较多的性能优化:包含应用独立的线程池,防止慢 IO 阻塞拜访热数据;应用预读缓存优化性能等。
- 容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue 容器层实现了和 DefaultMessageStore 相似的逻辑文件形象,同样将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都通过 FileQueue 持有底层物理文件的援用。有所不同的是多级存储的 CommitLog 改为 queue 维度。
-
驱动层:TieredFileSegment
驱动层负责保护逻辑文件到物理文件的映射,通过实现 TieredStoreProvider 对接底层文件系统读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的实现,能够将数据转移到其余硬盘或通过 fuse 挂载的对象存储上。
音讯上传
RocketMQ 多级存储的音讯上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每当有音讯发送到 Broker 会调用 TieredDispatcher 进行音讯散发,TieredDispatcher 将该音讯写入到 upload buffer 后立刻返回胜利。整个 dispatch 流程中不会有任何阻塞逻辑,确保不会影响本地 ConsumeQueue 的构建。
TieredDispatcherTieredDispatcher 写入 upload buffer 的内容仅为音讯的援用,不会将音讯的 body 读入内存。因为多级贮存以 queue 维度构建 CommitLog,此时须要从新生成 commitLog offset 字段。
upload buffer触发 upload buffer 上传时读取到每条音讯的 commitLog offset 字段时采纳拼接的形式将新的 offset 嵌入到原音讯中。
上传进度管制
每个队列都会有两个要害位点管制上传进度:
- dispatch offset:曾经写入缓存然而未上传的音讯位点
- commit offset:已上传的音讯位点
upload progress类比消费者,dispatch offset 相当于拉取音讯的位点,commit offset 相当于确认生产的位点。commit offset 到 dispatch offset 之间的局部相当于已拉取未生产的音讯。
音讯读取
TieredMessageStore 实现了 MessageStore 中的音讯读取相干接口,通过申请中的逻辑位点(queue offset)判断是否从多级存储中读取音讯,依据配置(tieredStorageLevel)有四种策略:
- DISABLE:禁止从多级存储中读取音讯;
- NOT\_IN\_DISK:不在 DefaultMessageStore 中的音讯从多级存储中读取;
- NOT\_IN\_MEM:不在 page cache 中的音讯即冷数据从多级存储读取;
- FORCE:强制所有音讯从多级存储中读取,目前仅供测试应用。
<!—->
/**
* Asynchronous get message
* @see #getMessage(String, String, int, long, int, MessageFilter) getMessage
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
* @param queueId Queue ID to query.
* @param offset Logical offset to start from.
* @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter);
须要从多级存储中读取的音讯会交由 TieredMessageFetcher 解决:首先校验参数是否非法,而后依照逻辑位点(queue offset)发动拉取申请。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取音讯。
// TieredMessageFetcher#getMessageAsync similar with
TieredMessageStore#getMessageAsync
public CompletableFuture<GetMessageResult>
getMessageAsync(String group, String topic, int queueId,
long queueOffset, int maxMsgNums, final MessageFilter messageFilter)
TieredFileSegment 保护每个贮存在文件系统中的物理文件位点,并通过为不同存储介质实现的接口从中读取所需的数据。
/**
* Get data from backend file system
*
* @param position the index from where the file will be read
* @param length the data size will be read
* @return data to be read
*/
CompletableFuture<ByteBuffer> read0(long position, int length);
预读缓存
TieredMessageFetcher 读取音讯时会预读一部分音讯供下次应用,这些音讯暂存在预读缓存中。
protected final Cache<MessageCacheKey /* topic, queue id and queue offset */,
SelectMappedBufferResultWrapper /* message data */> readAheadCache;
预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的音讯量相似拥塞窗口采纳加法增、乘法减的机制管制:
- 加法增:从最小窗口开始,每次减少等同于客户端 batchSize 的音讯量。
- 乘法减:当缓存的音讯超过了缓存过期工夫仍未被全副拉取,在清理缓存的同时会将下次预读音讯量减半。
预读缓存反对在读取音讯量较大时候片并发申请,以获得更大带宽和更小的提早。
某个 topic 音讯的预读缓存由生产这个 topic 的所有 group 共享,缓存生效策略为:
- 所有订阅这个 topic 的 group 都拜访了缓存
- 达到缓存过期工夫
故障复原
上文中咱们介绍上传进度由 commit offset 和 dispatch offset 管制。多级存储会为每个 topic、queue、fileSegment 创立元数据并长久化这两种位点。当 Broker 重启后会从元数据中复原,持续从 commit offset 开始上传音讯,之前缓存的音讯会从新上传并不会失落。
开发计划
面向云原生的存储系统要最大化利用云上存储的价值,而对象存储正是云计算红利的体现。RocketMQ 多级存储心愿一方面利用对象存储低成本的劣势缩短音讯存储工夫、拓展数据的价值;另一方面利用其共享存储的个性在多正本架构中兼得老本和数据可靠性,以及将来向 Serverless 架构演进。
tag 过滤
多级存储拉取音讯时没有计算音讯的 tag 是否匹配,tag 过滤交给客户端解决。这样会带来额定的网络开销,打算后续在服务端减少 tag 过滤能力。
播送生产以及多个生产进度不同的消费者
预读缓存生效须要所有订阅这个 topic 的 group 都拜访了缓存,这在多个 group 生产进度不统一的状况下很难触发,导致无用的音讯在缓存中沉积。须要计算出每个 group 的生产 qps 来估算某个 group 是否在缓存生效前用上缓存的音讯。如果缓存的音讯预期在生效前都不会被再次拜访,那么它应该被立刻过期。相应的对于播送生产,音讯的过期策略应被优化为所有 Client 都读取这条音讯后才生效。
和高可用架构的交融
目前次要面临以下三个问题:
- 元数据同步:如何牢靠的在多个节点间同步元数据,slave 降职时如何校准和补全缺失的元数据;
- 禁止上传超过 confirm offset 的音讯:为了防止音讯回退,上传的最大 offset 不能超过 confirm offset;
- slave 降职时疾速启动多级存储:只有 master 节点具备写权限,在 slave 节点降职后须要疾速拉起多级存储断点续传。
相干链接:
[1] README
https://github.com/apache/rocketmq/blob/develop/tieredstore/README.md
点击此处查看音讯队列 RocketMQ 产品详情