乐趣区

关于java:RocketMQ核心原理

RocketMQ 外围原理

简介

​ RocketMQ 在阿里外部叫做 Metaq(最早名为 Metamorphosis,中文意思“变形记”,是作家卡夫卡的中篇小说代表作,可见是为了致敬 Kafka)。RocketMQ 是 Metaq 3.0 之后开源的版本。Metaq 在阿里巴巴团体外部、蚂蚁金服、菜鸟等各业务中被宽泛应用,接入了上万个利用零碎中。并安稳撑持了历年的双十一大促(万亿级的音讯),在性能、稳定性、可靠性等方面表现出色,在整个阿里技术体系和大中台策略中施展着无足轻重的作用。Metaq 最早源于 Kafka,晚期借鉴了 Kafka 很多优良的设计。然而因为 Kafka 是 Scale 语言编写而阿里系次要应用 Java,且无奈满足阿里的电商、金融业务场景,所以誓嘉(花名)团队用 Java 从新造轮子,并做了大量的革新和优化。在此之前,淘宝有一款消息中间件名为 Notify,目前曾经逐渐被 Metaq 所取代。第一代的 Notify 次要应用了推模型,解决了事务音讯;第二代的 MetaQ 次要应用了拉模型,解决了程序音讯和海量沉积的问题。相比起 Kafka 应用的 Scale 语言编写,RabbitMQ 应用 Erlang 语言编写,基于 Java 的 RocketMQ 开源后更容易被宽泛的钻研,以及其余大厂定制开发。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211021003058681.png” alt=”image-20211021003058681″ style=”zoom:25%;” />

应用场景

  • 利用解耦
  • 流量削峰
  • 数据散发

个性

  • 公布与订阅
    公布是指某个生产者向某个 topic 发送音讯。音讯的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的音讯。
  • 音讯程序
    一类音讯生产时能依照发送的程序来生产
  • 音讯过滤
    能够依据 Tag 进行音讯过滤,也反对自定义属性过滤。音讯过滤目前是在 Broker 端实现的。
  • 音讯可靠性
  • 至多一次
    每个音讯必须投递一次。Consumer 先 Pull 音讯到本地,生产实现后,才向服务器返回 ack,如果没有生产肯定不会 ack 音讯。
  • 回溯生产
    Consumer 曾经生产胜利的音讯,因为业务上需要须要从新生产。
  • 事务音讯
    利用本地事务和发送音讯操作能够被定义到全局事务中,要么同时胜利,要么同时失败。
  • 提早音讯
    是指音讯发送到 broker 后,不会立刻被生产,期待特定工夫投递给真正的 topic。
  • 音讯重试
    Consumer 生产音讯失败后,要触发重试机制令音讯再生产一次。
  • 音讯重投
    生产者在发送音讯时,同步音讯失败会重投;异步音讯有重试;oneway 没有任何保障。

架构

角色介绍

角色 阐明
Producer 音讯的发送者
Consumer 音讯发送者
Broker 暂存和传输音讯
NameServer 治理 Broker
Topic 辨别音讯的品种,一个发送者能够发送音讯给一个或者多个 Topic;一个音讯的接收者能够订阅一个或者多个 Topic 音讯
Message Queue 相当于是 Topic 的分区,用于并行发送和接管音讯

架构图

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211021003011407.png” alt=”image-20211021003011407″ style=”zoom:25%;” />

​ NameServer 是一个简直无状态节点,可集群部署,节点之间无任何信息同步。Broker 分为 Master 与 Slave,一个 Master 能够对应多个 Slave,然而一个 Slave 只能对应一个 Master。Master 与 Slave 的对应关系通过指定雷同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 示意 Master,非 0 示意 Slave。Master 也能够部署多个。每个 Broker 与 NameServer 集群中的所有节点建设长连贯,定时注册 Topic 信息到所有 NameServer。以后 RocketMQ 版本在部署架构上反对一 Master 多 Slave,但只有 BrokerId= 1 的从服务器才会参加音讯的读负载。
​ Producer 与 NameServer 集群中的其中一个节点(随机抉择)建设长连贯,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建设长连贯,且定时向 Master 发送心跳。Producer 齐全无状态,可集群部署。
​ Consumer 与 NameServer 集群中的其中一个节点(随机抉择)建设长连贯,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建设长连贯,且定时向 Master、Slave 发送心跳。Consumer 既能够从 Master 订阅音讯,也能够从 Slave 订阅音讯,消费者在向 Master 拉取音讯时,Master 服务器会依据拉取偏移量与最大偏移量的间隔(判断是否读老音讯,产生读 I /O),以及从服务器是否可读等因素倡议下一次是从 Master 还是 Slave 拉取。

外围个性以及原理

音讯发送

​ 不同的业务场景须要生产者采纳不同的写入策略。比方同步发送、异步发送、Oneway 发送、提早发送、发送事务音讯等。

音讯发送后返回状态有如下四种:

  • FLUSH_DISK_TIMEOUT
  • FLUSH_SLAVE_TIMEOUT
  • SLAVE_NOT_AVAILABLE
  • SEND_OK

不同状态在不同的刷盘策略和同步策略的配置下含意是不同的

FLUSH_DISK_TIMEOUT:示意没有在规定工夫内实现刷盘(须要 Broker 的刷盘策略被设置成 SYNC_FLUSH 才会报这个谬误)

FLUSH_SLAVE_TIMEOUT:示意在主备形式下,并且 Broker 被设置成 SYNC_MASTER 形式,没有在设定工夫内实现主从同步。

SLAVE_NOT_AVAILABLE:这个状态产生的场景和 FLUSH_SLAVE_TIMEOUT 相似,示意在主备形式下,并且 Broker 被设置成 SYNC_MASTER,然而没有找到被配置成 Slave 的 Broker。

SEND_OK:示意发送胜利,发送胜利的具体含意,比方音讯是否曾经被存储到磁盘?音讯是否被同步到了 Slave 上?音讯在 Slave 上是否被写入磁盘?须要联合所配置的刷盘策略、主从策略来定。

​ 这个状态还能够简略了解为,没有产生下面列出的三个问题状态就是 SEND_OK。

Oneway 发送

​ 在一些对速度要求高,然而可靠性要求不高的场景下,比方日志收集类利用,能够采纳 Oneway 形式发送。Oneway 形式只发送申请不期待应答,即 将数据写入客户端的 Socket 缓冲区就返回,不期待对方返回后果。这种形式发送音讯能够缩短到奥妙级别。或者能够采纳多个 Producer 并发发送的形式,这种形式下无需太多关注写入性能问题。RocketMQ 引入了一个并发窗口,在窗口内音讯能够并发地写入 DirectMem 中,而后异步地将 间断一段无空洞的数据 刷入文件系统当中。同时 程序写 CommitLog 可让 RocketMQ 无论在 HDD 还是 SSD 磁盘状况下都能 放弃较高的写入性能。在 Linux 操作系统层级进行调优,举荐应用 EXT4 文件系统,IO 调度算法应用 deadline 算法。

音讯生产

次要能够总结为以下几点

  1. 音讯生产形式(Pull 和 Push)
  2. 音讯生产的模式(播送模式和集群模式)
  3. 流量管制(可联合 sentinel 来实现,前面补充)
  4. 并发线程数设置
  5. 音讯的过滤(Tag、Key)TagA||TagB||TagC * null

音讯存储

存储形式选型

​ Apache 下开源的另外一款 MQ—ActiveMQ(默认采纳的 KahaDB 做音讯存储)可选用 JDBC 的形式来做音讯长久化,通过简略的 xml 配置信息即可实现 JDBC 音讯存储。因为,一般关系型数据库(如

Mysql)在单表数据量达到千万级别的状况下,其 IO 读写性能往往会呈现瓶颈。在可靠性方面,该种计划十分依赖 DB,如果一旦 DB 呈现故障,则 MQ 的音讯就无奈落盘存储会导致线上故障。

​ 目前业界较为罕用的几款产品(RocketMQ/Kafka/RabbitMQ)均采纳的是音讯刷盘至所部署虚拟机 / 物理机的文件系统来做长久化(刷盘个别能够分为异步刷盘和同步刷盘两种模式)。音讯刷盘为消

息存储提供了一种高效率、高可靠性和高性能的数据长久化形式。除非部署 MQ 机器自身或是本地磁盘挂了,否则个别是不会呈现无奈长久化的故障问题。

个别状况下,文件系统 > 关系型数据库

存储构造

​ RocketMQ 音讯的存储是由 ConsumeQueue 和 CommitLog 配合实现 的,音讯真正的物理存储文件是 CommitLog,ConsumeQueue 是音讯的逻辑队列,相似数据库的索引文件,存储的是

指向物理存储的地址。每 个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。如下图所示
<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211024163116397.png” alt=”image-20211024163116397″ style=”zoom:25%;” />

CommitLog

​ commitLog 文件的存储地址:$HOME\store\commitlog\${fileName},每个文件的大小默认 1G =102410241024,commitLog 的文件名 fileName,名字长度为 20 位,右边补零,残余为起始偏移量;比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当这个文件满了,第二个文件名字为 00000000001073741824,起始偏移量为 1073741824,以此类推,第三个文件名字为 00000000002147483648,起始偏移量为 2147483648 音讯存储的时候会程序写入文件,当文件满了,写入下一个文件。MappedFileQueue 能够看作是 ${ROCKET_HOME}/store/commitlog 文件夹,而 MappedFile 则对应该文件夹下一个个的文件。

文件的存储构造

程序编号 字段简称(单位字节) 大小 含意
1 msgSize 4 代表这个音讯的大小
2 MAGICCODE 4 MAGICCODE = daa320a7
3 BODY CRC 4 音讯体 BODY CRC 当 broker 重启 recover 时会校验
4 queueId 4
5 flag 4
6 QUEUEOFFSET 8 这个值是个自增值不是真正的 consume queue 的偏移量,能够代表这个 consumeQueue 队列或者 tranStateTable 队列中音讯的个数,若是非事务音讯或者 commit 事务音讯,能够通过这个值查找到 consumeQueue 中数据,QUEUEOFFSET * 20 才是偏移地址;若是 PREPARED 或者 Rollback 事务,则能够通过该值从 tranStateTable 中查找数据
7 PHYSICALOFFSET 8 代表音讯在 commitLog 中的物理起始地址偏移量
8 SYSFLAG 4 指明音讯是事物事物状态等音讯特色,二进制为四个字节从右往左数:
当 4 个字节均为 0(值为 0)时示意非事务音讯;
当第 1 个字节为 1(值为 1)时示意示意音讯是压缩的(Compressed);
当第 2 个字节为 1(值为 2)示意多音讯(MultiTags);
当第 3 个字节为 1(值为 4)时示意 prepared 音讯;
当第 4 个字节为 1(值为 8)时示意 commit 音讯;
当第 3 / 4 个字节均为 1 时(值为 12)时示意 rollback 音讯;
当第 3 / 4 个字节均为 0 时示意非事务音讯
9 BORNTIMESTAMP 8 音讯产生端 (producer) 的工夫戳
10 BORNHOST 8 音讯产生端 (producer) 地址(address:port)
11 STORETIMESTAMP 8 音讯在 broker 存储工夫
12 STOREHOSTADDRESS 8 音讯存储到 broker 的地址(address:port)
13 RECONSUMETIMES 8 音讯被某个订阅组从新生产了几次(订阅组之间独立计数),因为重试音讯发送到了 topic 名字为 %retry%groupName 的队列 queueId= 0 的队列中去了,胜利生产一次记录为 0;
14 PreparedTransaction Offset 8 示意是 prepared 状态的事物音讯
15 messagebodyLength 4 音讯体大小值
16 messagebody bodyLength 音讯体内容
17 topicLength 1 topic 名称内容大小
18 topic topicLength topic 的内容值
19 propertiesLength 2 属性值大小
20 properties propertiesLength propertiesLength 大小的属性数据

ConsumeQueue

​ 每个 ConsumeQueue 都有一个 id,id 的值为 0 到 TopicConfig 配置的队列数量。比方某个 Topic 的生产队列数量为 4,那么四个 ConsumeQueue 的 id 就别离为 0、1、2、3。
ConsumeQueue 是不负责存储音讯的,只是负责记录它所属 Topic 的音讯在 CommitLog 中的偏移量,这样当消费者从 Broker 拉取音讯的时候,就能够疾速依据偏移量定位到音讯。
ConsumeQueue 自身同样是利用 MappedFileQueue 进行记录偏移量信息的,可见 MappedFileQueue 的设计如许美好,它没有与音讯进行耦合,而是设计成一个通用的存储性能。
ConsumeQueue 更新音讯偏移量的整体过程大略如下图所示,其中波及了几个概念。

  • ReputMessageService
  • CommitLogDispatcherBuildConsumeQueue

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211025134432978.png” alt=”image-20211025134432978″ style=”zoom: 67%;” />

大体上的存储分层设计能够如下:

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026131430151.png” alt=”image-20211026131430151″ style=”zoom:67%;” />

总结以上存储构造:

长处:
a、ConsumeQueue 音讯逻辑队列较为轻量级;
b、对磁盘的拜访串行化,防止磁盘竟争,不会因为队列减少导致 IOWAIT 增高;
毛病:
a、对于 CommitLog 来说写入音讯尽管是程序写,然而读却变成了齐全的随机读;
b、Consumer 端订阅生产一条音讯,须要先读 ConsumeQueue,再读 Commit Log,肯定水平上减少了开销;

RocketMQ 存储关键技术

Mmap

(1)Mmap 内存映射技术的特点

   Mmap 内存映射和一般规范 IO 操作的本质区别在于它并不需要将文件中的数据先拷贝至 OS 的内核 IO 缓冲区,而是能够间接将用户过程公有地址空间中的一块区域与文件对象建设映射关系,这样程序就如同能够间接从内存中实现对文件读 / 写操作一样。只有当缺页中断产生时,间接将文件从磁盘拷贝至用户态的过程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小个别须要限度在 1.5~2G 以下),采纳 Mmap 的形式其读 / 写的效率和性能都十分高。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211025193659669.png” alt=”image-20211025193659669″ />

(2)JDK NIO 的 MappedByteBuffer

   从 JDK 的源码来看,MappedByteBuffer 继承自 ByteBuffer,其外部保护了一个逻辑地址变量—address。在建设映射关系时,MappedByteBuffer 利用了 JDK NIO 的 FileChannel 类提供的 map()办法把文件对象映射到虚拟内存。认真看源码中 map()办法的实现,能够发现最终其通过调用 native 办法 map0()实现文件对象的映射工作,同时应用 Util.newMappedByteBuffer()办法初始化 MappedByteBuffer 实例,但最终返回的是 DirectByteBuffer 的实例。在 Java 程序中应用 MappedByteBuffer 的 get()办法来获取内存数据是最终通过 DirectByteBuffer.get()办法实现(底层通过 unsafe.getByte()办法,以“地址 + 偏移量”的形式获取指定映射至内存中的数据)。

(3)应用 Mmap 的限度

  • Mmap 映射的内存空间开释的问题
    因为映射的内存空间自身就不属于 JVM 的堆内存区(Java Heap),因而其不受 JVM GC 的管制,卸载这部分内存空间须要通过零碎调用 unmap()办法来实现。然而 unmap()办法是 FileChannelImpl 类里实现的公有办法,无奈间接显示调用。RocketMQ 中的做法是 ,通过 Java 反射的形式调用“sun.misc”包下的 Cleaner 类的 clean() 办法来开释映射占用的内存空间;
  • MappedByteBuffer 内存映射大小限度
    因为其占用的是虚拟内存(非 JVM 的堆内存),大小不受 JVM 的 -Xmx 参数限度,但其大小也受到 OS 虚拟内存大小的限度。一般来说,一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,所以 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G
  • 应用 MappedByteBuffe 的其余问题
    会存在内存占用率较高和文件敞开不确定性的问题
PageCache

​ 文件个别寄存在硬盘(机械硬盘或固态硬盘)中,CPU 并不能间接拜访硬盘中的数据,而是须要先将硬盘中的数据读入到内存中,而后能力被 CPU 拜访。为了晋升对文件的读写效率,Linux 内核会以页大小(4KB)为单位,将文件划分为多数据块。当用户对文件中的某个数据块进行读写操作时,内核首先会申请一个内存页(页缓存)与文件中的数据块进行绑定。所以 PageCache 是 OS 对文件的缓存,用于减速对文件的读写。一般来说,程序对文件进行程序读写的速度简直靠近于内存的读写访问,这里的次要起因就是在于 OS 应用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。下图借用了网上一张图片说明下流程。

(1)对于数据文件的读取,如果一次读取文件时呈现未命中 PageCache 的状况,OS 从物理磁盘上拜访读取文件的同时,会程序对其余相邻块的数据文件进行预读取

     程序读入紧随其后的少数几个页面。这样只有下次访问的文件曾经被加载至 PageCache 时,读取操作的速度根本等于拜访内存。

(2)对于数据文件的写入,OS 会先写入至 Cache 内,随后通过异步的形式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。
对于文件的程序读写操作来说,读和写的区域都在 OS 的 PageCache 内,此时读写性能靠近于内存。RocketMQ 的大抵做法是,将数据文件映射到 OS 的虚拟内存中(通过 JDK NIO 的 MappedByteBuffer),写音讯的时候首先写入 PageCache,并通过异步刷盘的形式将音讯批量的做长久化(同时也反对同步刷盘);订阅生产音讯时(对 CommitLog 操作是随机读取),因为 PageCache 的局部性热点原理且整体状况下还是从旧到新的有序读,因而大部分状况下音讯还是能够间接从 Page Cache 中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。

PageCache 机制也不是齐全无毛病的,当遇到 OS 进行脏页回写,内存回收,内存 swap 等状况时,就会引起较大的音讯读写提早。对于这些状况,RocketMQ 采纳了多种优化技术,比方内存预调配,文件预热,mlock 零碎调用等,来保障在最大可能地施展 PageCache 机制长处的同时,尽可能地缩小其毛病带来的音讯读写提早。

存储优化

(1)事后调配 MappedFile

​ 在音讯写入过程中,调用 CommitLog 的 putMessage()办法,CommitLog 会先从 MappedFileQueue 队列中获取一个 MappedFile,如果没有就新建一个。这里,MappedFile 的创立过程是将构建好的一个 AllocateRequest 申请,具体做法是,将下一个文件的门路、下下个文件的门路、文件大小为参数封装为 AllocateRequest 对象增加至队列中。后盾运行的 AllocateMappedFileService 服务线程(在 Broker 启动时,该线程就会创立并运行),会不停地运行,只有申请队列里存在申请,就会去执行 MappedFile 映射文件的创立和预调配工作。

​ 调配的时候有两种策略,一种是应用 Mmap 的形式来构建 MappedFile 实例,另外一种是从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile,具体采纳哪种策略,也与刷盘的形式无关。并且,在创立调配完下个 MappedFile 后,还会将下下个 MappedFile 事后创立并保留至申请队列中期待下次获取时间接返回。RocketMQ 中预调配 MappedFile 的设计十分奇妙,下次获取时候间接返回就能够不必期待 MappedFile 创立调配所产生的时间延迟。

(2)mlock 零碎调用

​ 其能够将过程应用的局部或者全副的地址空间锁定在物理内存中,避免其被替换到 swap 空间。对于 RocketMQ 这种的高吞吐量的分布式音讯队列来说,谋求的是音讯读写低提早,那么必定心愿尽可能地多应用物理内存,进步数据读写访问的操作效率。
(3)文件预热
​ 预热的目标次要有两点;第一点,因为仅分配内存并进行 mlock 零碎调用后并不会为程序齐全锁定这些内存,因为其中的分页可能是写时复制的。因而,就有必要对每个内存页面中写入一个假的值。其中,RocketMQ 是在创立并调配 MappedFile 的过程中,事后写入一些随机值至 Mmap 映射出的内存空间里。第二,调用 Mmap 进行内存映射后,OS 只是建设虚拟内存地址至物理地址的映射表,而理论并没有加载任何文件至内存中。程序要拜访数据时 OS 会查看该局部的分页是否曾经在内存中,如果不在,则收回一次缺页中断。RocketMQ 在做 Mmap 内存映射的同时进行 madvise 零碎调用,目标是使 OS 做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的成果。

零拷贝

在这里,我想提一下这个零拷贝的概念。在 Java 程序员的世界,罕用的零拷贝有 mmap 和 sendFile

MMAP

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026134319452.png” alt=”image-20211026134319452″ style=”zoom: 33%;” />
sendFile

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026134339231.png” alt=”image-20211026134339231″ style=”zoom: 33%;” />

  • 尽管叫零拷贝,实际上 sendfile 有 2 次数据拷贝的。
    第 1 次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协定引擎)。
    如果网卡反对 SG-DMA(The Scatter-GatherDirect Memory Access)技术,就无需从 PageCache 拷贝至 Socket 缓冲区
  • 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有产生过拷贝。
    只是在内存和 I / O 设施之间传输。很多时候咱们认为 sendfile 才是零拷贝,mmap 严格来说不算。
  • Linux 中的 API 为 sendfile、mmap,Java 中的 API 为 FileChanel.transferTo()、FileChannel.map()等。
  • Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx 等高性能中间件中,都有大量利用操作系统零拷贝个性。

音讯过滤

​ RocketMQ 分布式音讯队列的音讯过滤形式有别于其它 MQ 中间件,是在 Consumer 端订阅音讯时再做音讯过滤的。RocketMQ 这么做是在于其 Producer 端写入音讯和 Consumer 端订阅音讯采纳 拆散存储的机制 来实现的,Consumer 端订阅音讯是须要通过 ConsumeQueue 这个音讯生产的逻辑队列拿到一个索引,而后再从 CommitLog 外面读取真正的音讯实体内容,所以说到底也是还绕不开其存储构造。其 ConsumeQueue 的存储构造如下,能够看到其中有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的音讯过滤正式基于这个字段值的。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026135007668.png” alt=”image-20211026135007668″ style=”zoom:50%;” />

同步 / 异步复制

如果一个 Broker 组有 Master 和 Slave,音讯须要从 Master 复制到 Slave 上,有同步和异步两种复制形式。

同步复制

​ 同步复制形式是等 Master 和 Slave 均写 胜利后才反馈给客户端写胜利状态。在同步复制形式下,如果 Master 出故障,Slave 上有全副的备份数据,容易复原。

然而同步复制会增大数据写入提早,升高零碎吞吐量。

异步复制

​ 异步复制形式是只有 Master 写胜利 即可反馈给客户端写胜利状态。在异步复制形式下,零碎领有较低的提早和较高的吞吐量,然而如果 Master 出了故障,有些数据因为没有被写入 Slave,有可能会失落。

高可用机制

音讯发送高可用

​ 在创立 Topic 的时候,把 Topic 的多个 Message Queue 创立在多个 Broker 组上(雷同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样既能够在性能方面具备扩展性,也能够升高主节点故障

对整体上带来的影响,而且当一个 Broker 组的 Master 不可用后,其余组的 Master 依然可用,Producer 依然能够发送音讯的。

RocketMQ 目前还不反对把 Slave 主动转成 Master,如果机器资源有余,须要把 Slave 转成 Master。

  1. 手动进行 Slave 角色的 Broker。
  2. 更改配置文件。
  3. 用新的配置文件启动 Broker。

音讯生产高可用

​ 在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者忙碌的时候,Consumer 会被 主动切换 到从 Slave 读。

有了主动切换 Consumer 这种机制,当一个 Master 角色的机器呈现故障后,Consumer 依然能够从 Slave 读取音讯,不影响 Consumer 程序。这就达到了生产端的高可用性。

刷盘机制

​ RocketMQ 的所有音讯都是长久化的,先写入零碎 PageCache,而后刷盘,能够保障内存与磁盘都有一份数据,拜访时,间接从内存读取。音讯在通过 Producer 写入 RocketMQ 的时候,有两种写磁

盘形式,分布式同步刷盘和异步刷盘。

同步刷盘

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026163909119.png” alt=”image-20211026163909119″ style=”zoom:50%;” />

同步刷盘与异步刷盘的惟一区别是异步刷盘写完 PageCache 间接返回,而同步刷盘须要期待刷盘实现才返回,同步刷盘流程如下:

(1) 写入 PageCache 后,线程期待,告诉刷盘线程刷盘。

(2) 刷盘线程刷盘后,唤醒前端期待线程,可能是一批线程。

(3) 前端期待线程向用户返回胜利

异步刷盘

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026164006559.png” alt=”image-20211026164006559″ style=”zoom:50%;” />

在有 RAID 卡,SAS 15000 转磁盘测试程序写文件,速度能够达到 300M 每秒左右,而线上的网卡个别都为千兆网卡,写磁盘速度显著快于数据网络入口速度,那么是否能够做到写完内存就向用户返

回,由后盾线程刷盘呢?

(1)因为磁盘速度大于网卡速度,那么刷盘的进度必定能够跟上音讯的写入速度。

(2)万一因为此时零碎压力过大,可能沉积音讯,除了写入 IO,还有读取 IO,万一呈现磁盘读取落后状况,会不会导致系统内存溢出,答案是否定的,起因如下:

写入音讯到 PageCache 时,如果内存不足,则尝试抛弃洁净的 PAGE,腾出内存供新音讯应用,策略是 LRU 形式。如果洁净页有余,此时写入 PageCache 会被阻塞,零碎尝试刷盘局部数据,大概每次尝试 32 个 PAGE , 来找出更多洁净 PAGE。综上,内存溢出的状况不会呈现。

负载平衡

​ RocketMQ 中的负载平衡都在 Client 端实现,次要能够分为 Producer 端发消息时的负载平衡和 Consumer 端订阅音讯时负载平衡。

Producer 的负载平衡

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026165329840.png” alt=”image-20211026165329840″ style=”zoom: 50%;” />

如图所示,5 个队列能够部署在一台机器上,也能够别离部署在 5 台不同的机器上,发送音讯通过轮询队列的形式 发送,每个队列接管均匀的音讯量。

通过减少机器,能够程度扩大队列容量。另外也能够自定义形式抉择发往哪个队列。

Consumer 的负载平衡

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026165510101.png” alt=”image-20211026165510101″ style=”zoom:50%;” />

如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 生产 3 个队列,第二 consumer 生产 2 个队列。这样即可达到均匀生产的目标,能够程度扩大 Consumer 来进步生产能

力。然而 Consumer 数量要小于等于队列数 量,如果 Consumer 超过队列数量,那么多余的 Consumer 将不能生产音讯。

音讯重试

程序音讯的重试

​ 对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 会主动一直进行音讯重试(每次间隔时间为 1 秒),这时,利用会呈现音讯生产被阻塞的状况。因而,在应用程序音讯时,务必保障应

用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。

无序音讯的重试

​ 对于无序音讯(一般、定时、延时、事务音讯),当消费者生产音讯失败时,您能够通过设置返回状态达到音讯重试的后果。

无序音讯的重试 只针对集群生产形式失效;播送形式不提供失败重试个性,即生产失败后,失败音讯不再重试,持续生产新的音讯。

重试次数

音讯队列 RocketMQ 默认容许每条音讯最多重试 16 次,每次重试的间隔时间如下:

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026234851716.png” alt=”image-20211026234851716″ style=”zoom: 33%;” />

死信队列

​ RocketMQ 中,这种失常状况下无奈被生产的音讯称为死信音讯(Dead-Letter Message),存储死信音讯的非凡队列称为死信队列(Dead-Letter Queue)。能够在控制台 Topic 列表中看到“DLQ”相干的

Topic,默认命名是:

%RETRY% 生产组名称(重试 Topic)

%DLQ% 生产组名称(死信 Topic)

死信队列也能够被订阅和生产,并且也会过期

个性

死信音讯具备以下个性

  • 不会再被消费者失常生产。
  • 有效期与失常音讯雷同,均为 3 天,3 天后会被主动删除。因而,请在死信音讯产生后的 3 天内及时处理。

死信队列具备以下个性:

  • 一个死信队列对应一个 Group ID,而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信音讯,音讯队列 RocketMQ 不会为其创立相应的死信队列。
  • 一个死信队列蕴含了对应 Group ID 产生的所有死信音讯,不管该音讯属于哪个 Topic。

提早音讯

​ 定时音讯(提早队列)是指音讯发送到 broker 后,不会立刻被生产,期待特定工夫投递给真正的 topic。broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m

8m 9m 10m 20m 30m 1h 2h”,18 个 level。能够配置自定义 messageDelayLevel。留神,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:

msg.setDelayLevel(level)。level 有以下三种状况:

  • level == 0,音讯为非提早音讯
  • 1<=level<=maxLevel,音讯提早特定工夫,例如 level==1,提早 1s
  • level > maxLevel,则 level== maxLevel,例如 level==20,提早 2h

定时音讯会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并依据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存雷同提早的音讯,保障具备雷同发送提早

的音讯可能程序生产。broker 会调度地生产 SCHEDULE_TOPIC_XXXX,将音讯写入实在的 topic。须要留神的是,定时音讯会在第一次写入和调度写入实在 topic 时都会计数,因而发送数量、tps 都

会变高。

程序音讯

​ 程序音讯是指音讯的生产程序和产生程序雷同,在有些业务逻辑下,必须保障程序。比方订单的生成、付款、发货,这 3 个音讯必须按程序解决才行。

程序音讯分为全局程序音讯和局部程序音讯:

  • 全局程序音讯指某个 Topic 下的所有音讯都要保障程序;
  • 局部程序音讯只有保障每一组音讯被程序生产即可,比方订单音讯,只有保障同一个订单 ID 的音讯能按程序生产即可。

​ 在少数的业务场景中实际上只须要部分有序就能够了。RocketMQ 在默认状况下不保障程序,比方创立一个 Topic,默认八个写队列,八个读队列。这时

候一条音讯可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer,每个 Consumer 也可能启动多个线程并行处理,所以音讯被哪个 Consumer 生产,被生产的程序和写入的顺

序是否统一是不确定的。要保障全局程序音讯,须要先把 Topic 的读写队列数设置为一,而后 Producer 和 Consumer 的并发设置也要是一。简略来说,为了保障整个 Topic 的全局音讯有序,只能打消所有的并发

解决,各局部都设置成单线程解决。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211027001546111.png” alt=”image-20211027001546111″ style=”zoom:33%;” />

局部音讯有序

要保障局部音讯有序,须要发送端和生产端配合解决。在发送端,要做到把同一业务 ID 的音讯发送到同一个 Message Queue;在生产过程中,要做到从同一个 Message Queue 读取的音讯不被并发处

理,这样能力达到局部有序。生产端通过应用 MessageListenerOrderly 类来解决单 Message Queue 的音讯被并发解决的问题。

Consumer 应用 MessageListenerOrderly 的时候,上面四个 Consumer 的设置仍旧能够应用:

  • setConsumeThreadMin
  • setConsumeThreadMax
  • setPullBatchSize
  • setConsumeMessageBatchMaxSize

前两个参数设置 Consumer 的线程数;PullBatchSize 指的是一次从 Broker 的一个 Message Queue 获取音讯的最大数量,默认值是 32;ConsumeMessageBatchMaxSize 指的是这个 Consumer 的 Executor(也就

是调用 MessageListener 解决的中央)一次传入的音讯数(List<MessageExt>msgs 这个链表的最大长度),默认值是 1。上述四个参数能够应用,阐明 MessageListenerOrderly 并不是简略地禁止并发解决。

在 MessageListenerOrderly 的实现中,为每个 Consumer Queue 加个锁,生产每个音讯前,须要先取得这个音讯对应的 Consumer Queue 所对应的锁,这样保障了同一时间,同一个 Consumer Queue 的音讯不

被并发生产,但不同 Consumer Queue 的音讯能够并发解决。

事务音讯

RocketMQ 提供了事务音讯的性能,采纳 2PC(两段式协定)+ 弥补机制(事务回查)的分布式事务性能,通过音讯队列 RocketMQ 版事务音讯能达到分布式事务的最终统一。

  • 半事务音讯:暂不能投递的音讯,发送方曾经胜利地将音讯发送到了音讯队列 RocketMQ 版服务端,然而服务端未收到生产者对该音讯的二次确认,此时该音讯被标记成“暂不能投递”状态,处于该种状态下的音讯即半事务音讯。
  • 音讯回查:因为网络闪断、生产者利用重启等起因,导致某条事务音讯的二次确认失落,音讯队列 RocketMQ 版服务端通过扫描发现某条音讯长期处于“半事务音讯”时,须要被动向音讯生产者询问该音讯的最终状态(Commit 或是 Rollback),该询问过程即音讯回查。

事务音讯发送步骤如下:

  • 发送方将半事务音讯发送至音讯队列 RocketMQ 版服务端。
  • 音讯队列 RocketMQ 版服务端将音讯长久化胜利之后,向发送方返回 Ack 确认音讯曾经发送胜利,此时音讯为半事务音讯。
  • 发送方开始执行本地事务逻辑。
  • 发送方依据本地事务执行后果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务音讯标记为可投递,订阅方最终将收到该音讯;服务端收到 Rollback 状态则删除半事务音讯,订阅方将不会承受该音讯。

事务音讯回查步骤如下:

  • 在断网或者是利用重启的非凡状况下,上述步骤 4 提交的二次确认最终未达到服务端,通过固定工夫后服务端将对该音讯发动音讯回查。
  • 发送方收到音讯回查后,须要查看对应音讯的本地事务执行的最终后果。
  • 发送方依据查看失去的本地事务的最终状态再次提交二次确认,服务端仍依照步骤 4 对半事务音讯进行操作。

简而言之:

  • 发送流程:发送 half message(半音讯),执行本地事务,发送事务执行后果
  • 定时工作回查流程:MQ 定时工作扫描半音讯,回查本地事务,发送事务执行后果

外围流程

启动流程

负载平衡

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/RocketMQ%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1.png” alt=”RocketMQ 负载平衡 ” style=”zoom:75%;” />

向 broker 发送心跳

写在最初

写文章实属不易,关注我的博客和公众号,就是我创作的能源~

本文由博客群发一文多发等经营工具平台 OpenWrite 公布

退出移动版