关于rocketmq:一张图进阶-RocketMQ-消息存储

41次阅读

共计 6552 个字符,预计需要花费 17 分钟才能阅读完成。

前言

三此君看了好几本书,看了很多遍源码整顿的 一张图进阶 RocketMQ 图片,对于 RocketMQ 你只须要记住这张图!感觉不错的话,记得点赞关注哦。

【重要】视频在 B 站同步更新,欢送围观,轻轻松松涨姿态。一张图进阶 RocketMQ- 音讯存储(视频版)

https://www.bilibili.com/vide…

本文是“一张图进阶 RocketMQ”第 5 篇,对 RocketMQ 不理解的同学能够先看看后面 4 期:

  1. 一张图进阶 RocketMQ- 整体架构
  2. 一张图进阶 RocketMQ – NameServer
  3. 一张图进阶 RocketMQ – 音讯发送
  4. 一张图进阶 RocketMQ – 通信机制

后面两期咱们次要分享了 RocketMQ 是如何将音讯发送进来的,当初音讯曾经被 Netty 送上路了,接力棒曾经交给了 Broker。如果咱们本人来实现 Broker 会怎么实现呢?首先必定得把音讯存起来吧,不然宕机了,音讯失落了,那就离大谱了。

可是音讯要以什么构造存储呢?二进制、JSON、PB?从性能上来看必定都是能够的,那 RocketMQ 到底是怎么搞的?

解决了存储构造问题,那音讯存到哪里呢?数据库,本地文件,还是对象存储服务器?从性能的角度必定也都是能够的。可是,哪家数据库能够反对单机十万级吞吐量?那我间接通通存到数据库得了,瞎折腾些啥。难道存在本地文件就能够了?咱们本人实现不能够,然而 RocketMQ 能够,那 RocketMQ 有什么黑科技呢?

所以咱们明天就来聊一聊 Broker 如何存储音讯,【首先明确咱们的指标】咱们须要先理解 RocketMQ 的存储构造,也就是音讯是如何组织的。理解了存储构造,咱们能力更好的了解存储流程,不然咱们不晓得为什么流程是这样的。最初咱们须要理解有哪些机制撑持 RocketMQ 单机十万级吞吐量。

存储架构

音讯在 Broker 上的存储构造如上图,所有相干文件放在 ROCKETMQ_HOME 下,有哪些文件呢?寄存音讯自身的 CommitLog,以及音讯的索引文件 ConsumeQueue 和 IndexFile:

  • CommitLog

从物理构造上来看,所有的音讯都存储在 CommitLog 外面,其实就是所有的音讯依照“音讯在 CommitLog 各字段示意图”所示,挨个按顺序存储到文件中。

单个 CommitLog 文件大小默认 1G,文件名长度为 20 位,右边补零,残余为起始偏移量。比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。音讯次要是程序写入日志文件,当文件满了,写入下一个文件。CommitLog 程序写,能够大大提高写入效率。

然而问题来了,音讯发送的时候咱们指定了 Topic,当初所有 Topic 都程序个写入到 CommitLog,存入的时候是劳碌了(程序写),然而获取音讯可就麻烦了。如果我要获取某个 Topic 的音讯,须要遍历 commitlog 文件,依据 topic 过滤音讯。CommitLog 这个渣男,只管本人爽。有什么方法能够进步音讯查问效率呢?

  • ConsumeQueue

咱们再回顾一下,音讯存入的时候是指定了 Topic,同时咱们也说了每个 Topic 会对应多个 ConsumeQueue(queueId 标识)。要害就在 ConsumeQueue 上,ConsumeQueue 是指定 Topic 音讯的索引文件,怎么了解呢?从“音讯在 ConsumeQueue 各字段示意图”可知,每个条目共 20 个字节,别离为 8 字节的 commitlog 物理偏移量、4 字节的音讯长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,能够像数组一样随机拜访每一个条目,每个 ConsumeQueue 文件大小约 5.72M。ConsumeQueue 文件能够看成是基于 topic 的 commitlog 索引文件。Consumer 即可依据 ConsumeQueue 来查找待生产的音讯。

因为 ConsumeQueue 里只存偏移量信息,所以尺寸是无限的,在理论状况中,大部分的 ConsumeQueue 可能被全副读入内存,所以这个两头构造的操作速度很快,能够认为是内存读取的速度。此外为了保障 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存储了 ConsumeQueues、Message Key、Tag 等所有信息,即便 ConsumeQueue 失落,也能够通过 CommitLog 完全恢复进去。
ConsumeQueue 文件夹的组织形式如下:topic/queue/file 三层组织构造,具体存储门路为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

  • IndexFile

IndexFile 是另一种可选索引文件,提供了一种能够通过 key 或工夫区间来查问音讯的办法。IndexFile 索引文件其底层实现为 hash 索引,相似于 Java 1.7 HashMap,计算 Key 的 hashcode,hashcode 取余失去 hash 槽,拉链法解决哈希抵触。Index 文件的存储地位是:$HOME \store\index${fileName},文件名 fileName 是以创立时的工夫戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 能够保留 2000W 个索引。

所以,RocketMQ 音讯存储架构次要有 CommitLog,ConsumeQueue,IndexFile 形成。咱们发送一条音讯,会先格式化成“音讯在 CommitLog 各字段示意图”中的样子,程序写入 CommitLog 中,而后 Broker 会依照”音讯在 ConsumeQueue 各字段示意图“所示构建一条索引记录,存入该音讯所属 Topic 的 ConsumeQueue 索引文件中。如果有 IndexFile,还会构建 IndexFile。

当初咱们曾经晓得了 RocketMQ 音讯的存储构造,接下来咱们的就要理解 RocketMQ 是如何构建 CommitLog、ConsumeQueue 和 IndexFile,以及 RocketMQ 如何保障性能,撑持单机十万级吞吐量的?这是本文的次要指标,肯定要抓住次要指标,不要走丢咯。

启动流程

理解了 RocketMQ 音讯在磁盘中是怎么存储的,咱们就能够来看看具体的存储流程了。首先,还是先来看看 Broker 的启动流程。初始化过程都是这个鸟样,只看初始化过程齐全不知所云,然而不看初始化过程,间接看具体执行流程也是摸不着头脑,一堆组件不晓得从哪里来的,所以咱们还是先耐着性子大抵看看。但这并不是咱们关注的重点,留神几个关键点即可。

  • 初始化启动环境。部署好 RocketMQ 后,执行 /bin/mqbroker 脚本,次要用于设置 RocketMQ 目录环境变量,例如 ROCKETMQ_HOME。而后调用 ./bin/runbroker.sh 进入 RocketMQ 的启动入口,次要设置了 JVM 启动参数,比方 JAVA_HOME、Xms、Xmx。执行 main 函数。
  • 初始化 BrokerController。该初始化次要蕴含 RocketMQ 启动命令行参数解析、NettyRemotingServer 初始化、Broker 各个模块配置参数解析、Broker 各个模块初始化、过程关机 Hook 初始化等过程。
  • 启动 RocketMQ 的各个组件。然而这些组件并不是每一个都是外围组件,局部组件会在前面的流程中应用,这里混个眼生,如果前面流程没有提及的大家能够暂且跳过,咱们的指标是把握 RocketMQ 的核心内容,而不是每个细节。

    • MessageStore:存储层服务,比方 CommitLog、ConsumeQueue 存储管理,音讯刷盘,索引构建 等。
    • RemotingServer:一般通道申请解决服务。个别的申请都是在这里被解决的。
    • FastRemotingServer:VIP 通道申请解决服务。如果一般通道比较忙,那么能够应用 VIP 通道,个别作为客户端降级应用。
    • BrokerOuterAPI:Broker 拜访对外接口的封装对象。
    • PullRequestHoldService:Pull 长轮询服务。
    • ClientHousekeepingService:清理心跳超时的生产者、消费者、过滤服务器。
    • FilterServerManager:过滤服务器治理。

    存储流程

    在后面 RocketMQ 存储构造中咱们理解了 RocketMQ 将所有音讯程序写入 CommitLog,而后构建 ConsumeQueue/IndexFile 索引文件,所以这个小结咱们次要的指标就是看看这些文件是如何构建的。

  • Broker 启动流程中很要害的一点是启动了 NettyRemotingServer,在 RocketMQ 通信机制(视频)中咱们介绍过 Broker(NettyRemotingServer) 初始化会监听端口期待客户端连贯,当客户端发送申请的时,NettyRemotingServer WorkerGroup 解决可读事件,调用 NettyServerHandler.channelRead0() 解决数据。
    接着调用链到 processRequestCommand 办法,这个办法次要是依据申请中的 RequestCode,从本地缓存 processorTable 中获取相应的 Processor 来执行后续逻辑。处理器是什么?处理器的缓存从哪里来?

    Processor 就是用来解决特定申请的执行者,例如,生产者存入音讯应用 SendMessageProcessor,查问音讯应用 QueryMessageProcessor,拉取音讯应用 PullMessageProcessor。在 Broker 启动流程中有一步是注册 Processor,以 RequestCode 为 Key,Processor 为值,增加到 processorTable 缓存中。接着 RocketMQ 音讯发送(视频)流程来看,当生产者的申请达到 Broker,Broker 获取的 Processor 应为 SendMessageProcessor。封装一个 Runable 对象,run 办法内调用 SendMessageProcessor.processRequest,提交到线程池,持续前面的解决。

  • SendMessageProcessor.processRequest 调用 sendMessage 办法,次要蕴含音讯的校验及重试逻辑解决,而后调用存储模块 DefaultMessageStore 存储音讯。
    音讯校验:校验 Broker 是否配置可写,校验 Topic 名字是否为默认值,获取或创立 topicConfig,判断 queueId 是否超过限度。
    重试音讯解决:消费者生产失败后会将音讯发回给 Broker,这里咱们暂且认为就是生产者发送的申请,先看上面的流程。
  • DefaultMessageStore.putMessage 只是做了很多的校验,简略看看即可。包含:如果以后 Broker 进行工作则回绝音讯写入、Broker 为 SLAVE 角色则回绝音讯写入、以后 RocketMQ 不反对写入则回绝音讯写入、主题长度超过 256 个字符则回绝音讯写入、音讯属性长度超过 65536 个字符则回绝音讯写入、PageCache 忙则报错。而后调用 CommitLog.putMessage 存入音讯。
  • 看到这里应该略微相熟一些了,终于到咱们期待已久的 CommitLog 出场了。次要是提早音讯解决,而后获取能够写入的 CommitLog 进行写入。
    提早音讯解决:如果音讯的提早级别大于 0,将音讯的原主题名称与原音讯队列 ID 存入音讯属性中,用提早音讯主题 SCHEDULE_TOPIC、音讯队列 ID 更新原先音讯的主题与队列,这是并发音讯生产重试要害的一步。但不是这个本节的次要指标,后文会进一步剖析。
    关键点在如何获取能够写入的 CommitLog。存储构造大节外面有提到每个 CommitLog 默认大小 1G,写完一个文件,以偏移量命名创立下一个文件。每个 1G 大小 CommitLog 的在代码层面对应的是 MappedFile,而多个 MappedFiled 组成 MappedFileQueue。逻辑上的 CommitLog 通过持有 MappedFileQueue 治理多个 MappedFile。所以,获取能够写入的 CommitLog 也就是获取 MappedFileQueue 最初一个 MappedFile,为什么是最初一个,因为后面的曾经写完了呀。来看看 RocketMQ 逻辑与物理存储的对应关系应该可能更直观的了解。
  • 获取到最初一个 MappedFile 后,调用 MappedFile.appendMessage 将音讯追加到该文件中。可是只管是程序写入,然而连小学生都晓得写磁盘还是很慢,难道想这样撑持 RocketMQ 单机十万吞吐量?too young too simple!从逻辑存储构造和物理存储构造的映射关系来看,MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件读写的通道),通过 fileChannel 能够拜访物理 CommitLog 文件,然而 RocketMQ 并没有间接应用 fileChannel,而是映射到一个 MappedByteBuffer,咱们的目标就是把音讯写入这个 ByteBuffer 中,进而写入 MappedFile 对应的 CommitLog 文件。为什么须要这样做,还有哪些细节,会在”文件内存映射“小结中为大家解答。
  • 持续看流程,失去 MappedFile 对应的 ByteBuffer,咱们须要将音讯序列化,写入 ByteBuffer 中。

    1. 构建音讯 id, createMessageId
    2. 获取该音讯在音讯队列的偏移量,CommitLog 中保留了以后所有音讯队列的以后待写入偏移量。
    3. 判断是否是事务音讯:这里次要解决 Prepared 类型和 Rollback 类型的音讯,设置音讯 queueOffset 为 0
    4. 计算音讯总大小,calMsgLength。
    5. 判断文件的残余空间,是否足够写入当条音讯,如果不能够,则将文件开端写入残余空间大小 + 固定魔数;而后返回一个 END_OF_FILE 的后果
    6. 如果空间足够,这将这条音讯写入之前失去的 MappedFile 的 ByteBuffer 中。
    7. 将各字段依照”音讯在 CommitLog 各字段示意图“存入 Bytebuffer,而后返回 PUT_OK 后果

    总结

    以上就是明天 RocketMQ 音讯存储的次要内容,音讯只是写入到 CommitLog 对应的 ByteBuffer 中,下一期就是咱们重要的零拷贝行将退场。咱们简略总结一下明天的内容:

  • 要了解音讯的存储流程须要先晓得音讯的存储构造:在物理上音讯挨个程序写入 CommitLog,为了晋升音讯查问效率须要构建音讯的索引文件 ConsumeQueue/IndexFile;
  • Broker 启动时进行参数解析,并初始化了 NettyRemotingServer,启动存储服务用于音讯存储及索引构建等;
  • Broker 收到音讯存储申请,通过层层校验,获取 CommitLog 对应的 MappedFile,将音讯写入 MappedFile 对应的内存映射 ByteBuffer;

以上就是明天全副的内容,如果感觉本期的内容对你有用的话记得点赞、关注、转发、珍藏,这将是对我最大的反对。

如果你须要 RocketMQ 相干的所有材料,能够评论区留言,或者关注公众号:三此君。回复:mq,即可。

音讯曾经写入 ByteBuffer,写入 ByteBuffer 就能够了吗?那收到音讯间接抛弃岂不是更好。音讯要落在磁盘上才不会失落,所以下一期咱们要分享的就是音讯的刷盘及索引构建,PageCache 及零拷贝也将闪亮退场。感激观看,下期不见不散。

参考文献

  • RocketMQ 官网文档
  • RocketMQ 源码
  • 丁威, 周继锋. RocketMQ 技术底细:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.
  • 李伟. RocketMQ 分布式消息中间件:外围原理与最佳实际. 电子工业出版社, 2020-08.
  • 杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.
正文完
 0