关于java:RocketMQ核心原理

RocketMQ外围原理

简介

​ RocketMQ在阿里外部叫做Metaq(最早名为Metamorphosis,中文意思“变形记”,是作家卡夫卡的中篇小说代表作,可见是为了致敬Kafka)。RocketMQ是Metaq 3.0之后开源的版本。Metaq在阿里巴巴团体外部、蚂蚁金服、菜鸟等各业务中被宽泛应用,接入了上万个利用零碎中。并安稳撑持了历年的双十一大促(万亿级的音讯),在性能、稳定性、可靠性等方面表现出色,在整个阿里技术体系和大中台策略中施展着无足轻重的作用。Metaq最早源于Kafka,晚期借鉴了Kafka很多优良的设计。然而因为Kafka是Scale语言编写而阿里系次要应用Java,且无奈满足阿里的电商、金融业务场景,所以誓嘉(花名)团队用Java从新造轮子,并做了大量的革新和优化。在此之前,淘宝有一款消息中间件名为Notify,目前曾经逐渐被Metaq所取代。第一代的Notify次要应用了推模型,解决了事务音讯;第二代的MetaQ次要应用了拉模型,解决了程序音讯和海量沉积的问题。相比起Kafka应用的Scale语言编写,RabbitMQ 应用Erlang语言编写,基于Java的RocketMQ开源后更容易被宽泛的钻研,以及其余大厂定制开发。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211021003058681.png” alt=”image-20211021003058681″ style=”zoom:25%;” />

应用场景

  • 利用解耦
  • 流量削峰
  • 数据散发

个性

  • 公布与订阅
    公布是指某个生产者向某个topic发送音讯。音讯的订阅是指某个消费者关注了某个topic中带有某些tag的音讯。
  • 音讯程序
    一类音讯生产时能依照发送的程序来生产
  • 音讯过滤
    能够依据Tag进行音讯过滤,也反对自定义属性过滤。音讯过滤目前是在Broker端实现的。
  • 音讯可靠性
  • 至多一次
    每个音讯必须投递一次。Consumer先Pull音讯到本地,生产实现后,才向服务器返回ack,如果没有生产肯定不会ack音讯。
  • 回溯生产
    Consumer曾经生产胜利的音讯,因为业务上需要须要从新生产。
  • 事务音讯
    利用本地事务和发送音讯操作能够被定义到全局事务中,要么同时胜利,要么同时失败。
  • 提早音讯
    是指音讯发送到broker后,不会立刻被生产,期待特定工夫投递给真正的topic。
  • 音讯重试
    Consumer生产音讯失败后,要触发重试机制令音讯再生产一次。
  • 音讯重投
    生产者在发送音讯时,同步音讯失败会重投;异步音讯有重试;oneway没有任何保障。

架构

角色介绍

角色 阐明
Producer 音讯的发送者
Consumer 音讯发送者
Broker 暂存和传输音讯
NameServer 治理Broker
Topic 辨别音讯的品种,一个发送者能够发送音讯给一个或者多个Topic;一个音讯的接收者能够订阅一个或者多个Topic音讯
Message Queue 相当于是Topic的分区,用于并行发送和接管音讯

架构图

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211021003011407.png” alt=”image-20211021003011407″ style=”zoom:25%;” />

​ NameServer是一个简直无状态节点,可集群部署,节点之间无任何信息同步。Broker分为Master与Slave,一个Master能够对应多个Slave,然而一个Slave只能对应一个Master。Master与Slave 的对应关系通过指定雷同的BrokerName,不同的BrokerId来定义,BrokerId为0示意Master,非0示意Slave。Master也能够部署多个。每个Broker与NameServer集群中的所有节点建设长连贯,定时注册Topic信息到所有NameServer。 以后RocketMQ版本在部署架构上反对一Master多Slave,但只有BrokerId=1的从服务器才会参加音讯的读负载。
​ Producer与NameServer集群中的其中一个节点(随机抉择)建设长连贯,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建设长连贯,且定时向Master发送心跳。Producer齐全无状态,可集群部署。
​ Consumer与NameServer集群中的其中一个节点(随机抉择)建设长连贯,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建设长连贯,且定时向Master、Slave发送心跳。Consumer既能够从Master订阅音讯,也能够从Slave订阅音讯,消费者在向Master拉取音讯时,Master服务器会依据拉取偏移量与最大偏移量的间隔(判断是否读老音讯,产生读I/O),以及从服务器是否可读等因素倡议下一次是从Master还是Slave拉取。

外围个性以及原理

音讯发送

​ 不同的业务场景须要生产者采纳不同的写入策略。比方同步发送、异步发送、Oneway发送、提早发送、发送事务音讯等。

音讯发送后返回状态有如下四种:

  • FLUSH_DISK_TIMEOUT
  • FLUSH_SLAVE_TIMEOUT
  • SLAVE_NOT_AVAILABLE
  • SEND_OK

不同状态在不同的刷盘策略和同步策略的配置下含意是不同的

FLUSH_DISK_TIMEOUT :示意没有在规定工夫内实现刷盘(须要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个谬误)

FLUSH_SLAVE_TIMEOUT:示意在主备形式下,并且Broker被设置成SYNC_MASTER形式,没有在设定工夫内实现主从同步。

SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT相似,示意在主备形式下,并且Broker被设置成SYNC_MASTER,然而没有找到被配置成Slave的Broker。

SEND_OK:示意发送胜利,发送胜利的具体含意,比方音讯是否曾经被存储到磁盘?音讯是否被同步到了Slave上?音讯在Slave上是否被写入磁盘?须要联合所配置的刷盘策略、主从策略来定。

​ 这个状态还能够简略了解为,没有产生下面列出的三个问题状态就是SEND_OK。

Oneway发送

​ 在一些对速度要求高,然而可靠性要求不高的场景下,比方日志收集类利用,能够采纳Oneway形式发送。Oneway形式只发送申请不期待应答,即将数据写入客户端的Socket缓冲区就返回,不期待对方返回后果。这种形式发送音讯能够缩短到奥妙级别。或者能够采纳多个Producer并发发送的形式,这种形式下无需太多关注写入性能问题。RocketMQ引入了一个并发窗口,在窗口内音讯能够并发地写入DirectMem中,而后异步地将间断一段无空洞的数据刷入文件系统当中。同时程序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘状况下都能放弃较高的写入性能。在Linux操作系统层级进行调优,举荐应用EXT4文件系统,IO调度算法应用deadline算法。

音讯生产

次要能够总结为以下几点

  1. 音讯生产形式(Pull和Push)
  2. 音讯生产的模式(播送模式和集群模式)
  3. 流量管制(可联合sentinel来实现,前面补充)
  4. 并发线程数设置
  5. 音讯的过滤(Tag、Key) TagA||TagB||TagC * null

音讯存储

存储形式选型

​ Apache下开源的另外一款MQ—ActiveMQ(默认采纳的KahaDB做音讯存储)可选用JDBC的形式来做音讯长久化,通过简略的xml配置信息即可实现JDBC音讯存储。因为,一般关系型数据库(如

Mysql)在单表数据量达到千万级别的状况下,其IO读写性能往往会呈现瓶颈。在可靠性方面,该种计划十分依赖DB,如果一旦DB呈现故障,则MQ的音讯就无奈落盘存储会导致线上故障。

​ 目前业界较为罕用的几款产品(RocketMQ/Kafka/RabbitMQ)均采纳的是音讯刷盘至所部署虚拟机/物理机的文件系统来做长久化(刷盘个别能够分为异步刷盘和同步刷盘两种模式)。音讯刷盘为消

息存储提供了一种高效率、高可靠性和高性能的数据长久化形式。除非部署MQ机器自身或是本地磁盘挂了,否则个别是不会呈现无奈长久化的故障问题。

个别状况下,文件系统> 关系型数据库

存储构造

​ RocketMQ音讯的存储是由ConsumeQueue和CommitLog配合实现 的,音讯真正的物理存储文件是CommitLog,ConsumeQueue是音讯的逻辑队列,相似数据库的索引文件,存储的是

指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。如下图所示
<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211024163116397.png” alt=”image-20211024163116397″ style=”zoom:25%;” />

CommitLog

​ commitLog文件的存储地址:$HOME\store\commitlog\${fileName},每个文件的大小默认1G =102410241024,commitLog的文件名fileName,名字长度为20位,右边补零,残余为起始偏移量;比方00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648音讯存储的时候会程序写入文件,当文件满了,写入下一个文件。MappedFileQueue能够看作是${ROCKET_HOME}/store/commitlog文件夹,而MappedFile则对应该文件夹下一个个的文件。

文件的存储构造

程序编号 字段简称(单位字节) 大小 含意
1 msgSize 4 代表这个音讯的大小
2 MAGICCODE 4 MAGICCODE = daa320a7
3 BODY CRC 4 音讯体BODY CRC 当broker重启recover时会校验
4 queueId 4
5 flag 4
6 QUEUEOFFSET 8 这个值是个自增值不是真正的consume queue的偏移量,能够代表这个consumeQueue队列或者tranStateTable队列中音讯的个数,若是非事务音讯或者commit事务音讯,能够通过这个值查找到consumeQueue中数据,QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事务,则能够通过该值从tranStateTable中查找数据
7 PHYSICALOFFSET 8 代表音讯在commitLog中的物理起始地址偏移量
8 SYSFLAG 4 指明音讯是事物事物状态等音讯特色,二进制为四个字节从右往左数:
当4个字节均为0(值为0)时示意非事务音讯;
当第1个字节为1(值为1)时示意示意音讯是压缩的(Compressed);
当第2个字节为1(值为2)示意多音讯(MultiTags);
当第3个字节为1(值为4)时示意prepared音讯;
当第4个字节为1(值为8)时示意commit音讯;
当第3/4个字节均为1时(值为12)时示意rollback音讯;
当第3/4个字节均为0时示意非事务音讯
9 BORNTIMESTAMP 8 音讯产生端(producer)的工夫戳
10 BORNHOST 8 音讯产生端(producer)地址(address:port)
11 STORETIMESTAMP 8 音讯在broker存储工夫
12 STOREHOSTADDRESS 8 音讯存储到broker的地址(address:port)
13 RECONSUMETIMES 8 音讯被某个订阅组从新生产了几次(订阅组之间独立计数),因为重试音讯发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了,胜利生产一次记录为0;
14 PreparedTransaction Offset 8 示意是prepared状态的事物音讯
15 messagebodyLength 4 音讯体大小值
16 messagebody bodyLength 音讯体内容
17 topicLength 1 topic名称内容大小
18 topic topicLength topic的内容值
19 propertiesLength 2 属性值大小
20 properties propertiesLength propertiesLength大小的属性数据

ConsumeQueue

​ 每个ConsumeQueue都有一个id,id 的值为0到TopicConfig配置的队列数量。比方某个Topic的生产队列数量为4,那么四个ConsumeQueue的id就别离为0、1、2、3。
ConsumeQueue是不负责存储音讯的,只是负责记录它所属Topic的音讯在CommitLog中的偏移量,这样当消费者从Broker拉取音讯的时候,就能够疾速依据偏移量定位到音讯。
ConsumeQueue自身同样是利用MappedFileQueue进行记录偏移量信息的,可见MappedFileQueue的设计如许美好,它没有与音讯进行耦合,而是设计成一个通用的存储性能。
ConsumeQueue更新音讯偏移量的整体过程大略如下图所示,其中波及了几个概念。

  • ReputMessageService
  • CommitLogDispatcherBuildConsumeQueue

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211025134432978.png” alt=”image-20211025134432978″ style=”zoom: 67%;” />

大体上的存储分层设计能够如下:

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026131430151.png” alt=”image-20211026131430151″ style=”zoom:67%;” />

总结以上存储构造:

长处:
a、ConsumeQueue音讯逻辑队列较为轻量级;
b、对磁盘的拜访串行化,防止磁盘竟争,不会因为队列减少导致IOWAIT增高;
毛病:
a、对于CommitLog来说写入音讯尽管是程序写,然而读却变成了齐全的随机读;
b、Consumer端订阅生产一条音讯,须要先读ConsumeQueue,再读Commit Log,肯定水平上减少了开销;

RocketMQ存储关键技术

Mmap

(1)Mmap内存映射技术的特点

   Mmap内存映射和一般规范IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是能够间接将用户过程公有地址空间中的一块区域与文件对象建设映射关系,这样程序就如同能够间接从内存中实现对文件读/写操作一样。只有当缺页中断产生时,间接将文件从磁盘拷贝至用户态的过程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小个别须要限度在1.5~2G以下),采纳Mmap的形式其读/写的效率和性能都十分高。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211025193659669.png” alt=”image-20211025193659669″ />

(2)JDK NIO的MappedByteBuffer

   从JDK的源码来看,MappedByteBuffer继承自ByteBuffer,其外部保护了一个逻辑地址变量—address。在建设映射关系时,MappedByteBuffer利用了JDK NIO的FileChannel类提供的map()办法把文件对象映射到虚拟内存。认真看源码中map()办法的实现,能够发现最终其通过调用native办法map0()实现文件对象的映射工作,同时应用Util.newMappedByteBuffer()办法初始化MappedByteBuffer实例,但最终返回的是DirectByteBuffer的实例。在Java程序中应用MappedByteBuffer的get()办法来获取内存数据是最终通过DirectByteBuffer.get()办法实现(底层通过unsafe.getByte()办法,以“地址 + 偏移量”的形式获取指定映射至内存中的数据)。

(3)应用Mmap的限度

  • Mmap映射的内存空间开释的问题
    因为映射的内存空间自身就不属于JVM的堆内存区(Java Heap),因而其不受JVM GC的管制,卸载这部分内存空间须要通过零碎调用 unmap()办法来实现。然而unmap()办法是FileChannelImpl类里实现的公有办法,无奈间接显示调用。RocketMQ中的做法是,通过Java反射的形式调用“sun.misc”包下的Cleaner类的clean()办法来开释映射占用的内存空间;
  • MappedByteBuffer内存映射大小限度
    因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限度,但其大小也受到OS虚拟内存大小的限度。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,所以RocketMQ默认设置单个CommitLog日志数据文件为1G
  • 应用MappedByteBuffe的其余问题
    会存在内存占用率较高和文件敞开不确定性的问题
PageCache

​ 文件个别寄存在硬盘(机械硬盘或固态硬盘)中,CPU 并不能间接拜访硬盘中的数据,而是须要先将硬盘中的数据读入到内存中,而后能力被 CPU 拜访。为了晋升对文件的读写效率,Linux 内核会以页大小(4KB)为单位,将文件划分为多数据块。当用户对文件中的某个数据块进行读写操作时,内核首先会申请一个内存页(页缓存)与文件中的数据块进行绑定。所以PageCache是OS对文件的缓存,用于减速对文件的读写。一般来说,程序对文件进行程序读写的速度简直靠近于内存的读写访问,这里的次要起因就是在于OS应用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。下图借用了网上一张图片说明下流程。

(1)对于数据文件的读取,如果一次读取文件时呈现未命中PageCache的状况,OS从物理磁盘上拜访读取文件的同时,会程序对其余相邻块的数据文件进行预读取

     程序读入紧随其后的少数几个页面。这样只有下次访问的文件曾经被加载至PageCache时,读取操作的速度根本等于拜访内存。

(2)对于数据文件的写入,OS会先写入至Cache内,随后通过异步的形式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。
对于文件的程序读写操作来说,读和写的区域都在OS的PageCache内,此时读写性能靠近于内存。RocketMQ的大抵做法是,将数据文件映射到OS的虚拟内存中(通过JDK NIO的MappedByteBuffer),写音讯的时候首先写入PageCache,并通过异步刷盘的形式将音讯批量的做长久化(同时也反对同步刷盘);订阅生产音讯时(对CommitLog操作是随机读取),因为PageCache的局部性热点原理且整体状况下还是从旧到新的有序读,因而大部分状况下音讯还是能够间接从Page Cache中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。

PageCache机制也不是齐全无毛病的,当遇到OS进行脏页回写,内存回收,内存swap等状况时,就会引起较大的音讯读写提早。对于这些状况,RocketMQ采纳了多种优化技术,比方内存预调配,文件预热,mlock零碎调用等,来保障在最大可能地施展PageCache机制长处的同时,尽可能地缩小其毛病带来的音讯读写提早。

存储优化

(1)事后调配MappedFile

​ 在音讯写入过程中,调用CommitLog的putMessage()办法,CommitLog会先从MappedFileQueue队列中获取一个 MappedFile,如果没有就新建一个。这里,MappedFile的创立过程是将构建好的一个AllocateRequest申请,具体做法是,将下一个文件的门路、下下个文件的门路、文件大小为参数封装为AllocateRequest对象增加至队列中。后盾运行的AllocateMappedFileService服务线程(在Broker启动时,该线程就会创立并运行),会不停地运行,只有申请队列里存在申请,就会去执行MappedFile映射文件的创立和预调配工作。

​ 调配的时候有两种策略,一种是应用Mmap的形式来构建MappedFile实例,另外一种是从TransientStorePool堆外内存池中获取相应的DirectByteBuffer来构建MappedFile,具体采纳哪种策略,也与刷盘的形式无关。并且,在创立调配完下个MappedFile后,还会将下下个MappedFile事后创立并保留至申请队列中期待下次获取时间接返回。RocketMQ中预调配MappedFile的设计十分奇妙,下次获取时候间接返回就能够不必期待MappedFile创立调配所产生的时间延迟。

(2)mlock零碎调用

​ 其能够将过程应用的局部或者全副的地址空间锁定在物理内存中,避免其被替换到swap空间。对于RocketMQ这种的高吞吐量的分布式音讯队列来说,谋求的是音讯读写低提早,那么必定心愿尽可能地多应用物理内存,进步数据读写访问的操作效率。
(3)文件预热
​ 预热的目标次要有两点;第一点,因为仅分配内存并进行mlock零碎调用后并不会为程序齐全锁定这些内存,因为其中的分页可能是写时复制的。因而,就有必要对每个内存页面中写入一个假的值。其中,RocketMQ是在创立并调配MappedFile的过程中,事后写入一些随机值至Mmap映射出的内存空间里。第二,调用Mmap进行内存映射后,OS只是建设虚拟内存地址至物理地址的映射表,而理论并没有加载任何文件至内存中。程序要拜访数据时OS会查看该局部的分页是否曾经在内存中,如果不在,则收回一次缺页中断。RocketMQ在做Mmap内存映射的同时进行madvise零碎调用,目标是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的成果。

零拷贝

在这里,我想提一下这个零拷贝的概念。在 Java 程序员的世界,罕用的零拷贝有 mmap 和 sendFile

MMAP

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026134319452.png” alt=”image-20211026134319452″ style=”zoom: 33%;” />
sendFile

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026134339231.png” alt=”image-20211026134339231″ style=”zoom: 33%;” />

  • 尽管叫零拷贝,实际上sendfile有2次数据拷贝的。
    第1次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协定引擎)。
    如果网卡反对 SG-DMA(The Scatter-GatherDirect Memory Access)技术,就无需从PageCache拷贝至 Socket 缓冲区
  • 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有产生过拷贝。
    只是在内存和I/O设施之间传输。很多时候咱们认为sendfile才是零拷贝,mmap严格来说不算。
  • Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、FileChannel.map()等。
  • Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操作系统零拷贝个性。

音讯过滤

​ RocketMQ分布式音讯队列的音讯过滤形式有别于其它MQ中间件,是在Consumer端订阅音讯时再做音讯过滤的。RocketMQ这么做是在于其Producer端写入音讯和Consumer端订阅音讯采纳拆散存储的机制来实现的,Consumer端订阅音讯是须要通过ConsumeQueue这个音讯生产的逻辑队列拿到一个索引,而后再从CommitLog外面读取真正的音讯实体内容,所以说到底也是还绕不开其存储构造。其ConsumeQueue的存储构造如下,能够看到其中有8个字节存储的Message Tag的哈希值,基于Tag的音讯过滤正式基于这个字段值的。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026135007668.png” alt=”image-20211026135007668″ style=”zoom:50%;” />

同步/异步复制

如果一个Broker组有Master和Slave,音讯须要从Master复制到Slave 上,有同步和异步两种复制形式。

同步复制

​ 同步复制形式是等Master和Slave均写 胜利后才反馈给客户端写胜利状态。在同步复制形式下,如果Master出故障,Slave上有全副的备份数据,容易复原。

然而同步复制会增大数据写入提早,升高零碎吞吐量。

异步复制

​ 异步复制形式是只有Master写胜利 即可反馈给客户端写胜利状态。在异步复制形式下,零碎领有较低的提早和较高的吞吐量,然而如果Master出了故障,有些数据因为没有被写入Slave,有可能会失落。

高可用机制

音讯发送高可用

​ 在创立Topic的时候,把Topic的多个Message Queue创立在多个Broker组上(雷同Broker名称,不同brokerId的机器组成一个Broker组),这样既能够在性能方面具备扩展性,也能够升高主节点故障

对整体上带来的影响,而且当一个Broker组的Master不可用后,其余组的Master依然可用,Producer依然能够发送音讯的。

RocketMQ目前还不反对把Slave主动转成Master,如果机器资源有余,须要把Slave转成Master。

  1. 手动进行Slave角色的Broker。
  2. 更改配置文件。
  3. 用新的配置文件启动Broker。

音讯生产高可用

​ 在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者忙碌的时候,Consumer会被主动切换到从Slave 读。

有了主动切换Consumer这种机制,当一个Master角色的机器呈现故障后,Consumer依然能够从Slave读取音讯,不影响Consumer程序。这就达到了生产端的高可用性。

刷盘机制

​ RocketMQ 的所有音讯都是长久化的,先写入零碎 PageCache,而后刷盘,能够保障内存与磁盘都有一份数据, 拜访时,间接从内存读取。音讯在通过Producer写入RocketMQ的时候,有两种写磁

盘形式,分布式同步刷盘和异步刷盘。

同步刷盘

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026163909119.png” alt=”image-20211026163909119″ style=”zoom:50%;” />

同步刷盘与异步刷盘的惟一区别是异步刷盘写完 PageCache间接返回,而同步刷盘须要期待刷盘实现才返回, 同步刷盘流程如下:

(1) 写入 PageCache后,线程期待,告诉刷盘线程刷盘。

(2) 刷盘线程刷盘后,唤醒前端期待线程,可能是一批线程。

(3) 前端期待线程向用户返回胜利

异步刷盘

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026164006559.png” alt=”image-20211026164006559″ style=”zoom:50%;” />

在有 RAID 卡,SAS 15000 转磁盘测试程序写文件,速度能够达到 300M 每秒左右,而线上的网卡个别都为千兆网卡,写磁盘速度显著快于数据网络入口速度,那么是否能够做到写完内存就向用户返

回,由后盾线程刷盘呢?

(1)因为磁盘速度大于网卡速度,那么刷盘的进度必定能够跟上音讯的写入速度。

(2)万一因为此时零碎压力过大,可能沉积音讯,除了写入 IO,还有读取 IO,万一呈现磁盘读取落后状况, 会不会导致系统内存溢出,答案是否定的,起因如下:

写入音讯到 PageCache时,如果内存不足,则尝试抛弃洁净的 PAGE,腾出内存供新音讯应用,策略是LRU 形式。如果洁净页有余,此时写入 PageCache会被阻塞,零碎尝试刷盘局部数据,大概每次尝试 32个 PAGE , 来找出更多洁净 PAGE。综上,内存溢出的状况不会呈现。

负载平衡

​ RocketMQ中的负载平衡都在Client端实现,次要能够分为Producer端发消息时的负载平衡和Consumer端订阅音讯时负载平衡。

Producer的负载平衡

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026165329840.png” alt=”image-20211026165329840″ style=”zoom: 50%;” />

如图所示,5 个队列能够部署在一台机器上,也能够别离部署在 5 台不同的机器上,发送音讯通过轮询队列的形式 发送,每个队列接管均匀的音讯量。

通过减少机器,能够程度扩大队列容量。 另外也能够自定义形式抉择发往哪个队列。

Consumer的负载平衡

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026165510101.png” alt=”image-20211026165510101″ style=”zoom:50%;” />

如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 生产 3 个队列,第二consumer 生产 2 个队列。 这样即可达到均匀生产的目标,能够程度扩大 Consumer 来进步生产能

力。然而 Consumer 数量要小于等于队列数 量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能生产音讯 。

音讯重试

程序音讯的重试

​ 对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 会主动一直进行音讯重试(每次间隔时间为 1 秒),这时,利用会呈现音讯生产被阻塞的状况。因而,在应用程序音讯时,务必保障应

用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。

无序音讯的重试

​ 对于无序音讯(一般、定时、延时、事务音讯),当消费者生产音讯失败时,您能够通过设置返回状态达到音讯重试的后果。

无序音讯的重试只针对集群生产形式失效;播送形式不提供失败重试个性,即生产失败后,失败音讯不再重试,持续生产新的音讯。

重试次数

音讯队列 RocketMQ 默认容许每条音讯最多重试 16 次,每次重试的间隔时间如下:

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211026234851716.png” alt=”image-20211026234851716″ style=”zoom: 33%;” />

死信队列

​ RocketMQ 中,这种失常状况下无奈被生产的音讯称为死信音讯(Dead-Letter Message),存储死信音讯的非凡队列称为死信队列(Dead-Letter Queue)。能够在控制台Topic列表中看到“DLQ”相干的

Topic,默认命名是:

%RETRY%生产组名称(重试Topic)

%DLQ%生产组名称(死信Topic)

死信队列也能够被订阅和生产,并且也会过期

个性

死信音讯具备以下个性

  • 不会再被消费者失常生产。
  • 有效期与失常音讯雷同,均为 3 天,3 天后会被主动删除。因而,请在死信音讯产生后的 3天内及时处理。

死信队列具备以下个性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信音讯,音讯队列 RocketMQ 不会为其创立相应的死信队列。
  • 一个死信队列蕴含了对应 Group ID 产生的所有死信音讯,不管该音讯属于哪个 Topic。

提早音讯

​ 定时音讯(提早队列)是指音讯发送到broker后,不会立刻被生产,期待特定工夫投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m

8m 9m 10m 20m 30m 1h 2h”,18个level。能够配置自定义messageDelayLevel。留神,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:

msg.setDelayLevel(level)。level有以下三种状况:

  • level == 0,音讯为非提早音讯
  • 1<=level<=maxLevel,音讯提早特定工夫,例如level==1,提早1s
  • level > maxLevel,则level== maxLevel,例如level==20,提早2h

定时音讯会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并依据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存雷同提早的音讯,保障具备雷同发送提早

的音讯可能程序生产。broker会调度地生产SCHEDULE_TOPIC_XXXX,将音讯写入实在的topic。须要留神的是,定时音讯会在第一次写入和调度写入实在topic时都会计数,因而发送数量、tps都

会变高。

程序音讯

​ 程序音讯是指音讯的生产程序和产生程序雷同,在有些业务逻辑下,必须保障程序。比方订单的生成、付款、发货,这3个音讯必须按程序解决才行。

程序音讯分为全局程序音讯和局部程序音讯:

  • 全局程序音讯指某个Topic下的所有音讯都要保障程序;
  • 局部程序音讯只有保障每一组音讯被程序生产即可,比方订单音讯,只有保障同一个订单ID的音讯能按程序生产即可。

​ 在少数的业务场景中实际上只须要部分有序就能够了。RocketMQ在默认状况下不保障程序,比方创立一个Topic,默认八个写队列,八个读队列。这时

候一条音讯可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer也可能启动多个线程并行处理,所以音讯被哪个Consumer生产,被生产的程序和写入的顺

序是否统一是不确定的。要保障全局程序音讯,须要先把Topic的读写队列数设置为一,而后Producer和Consumer的并发设置也要是一。简略来说,为了保障整个Topic的全局音讯有序,只能打消所有的并发

解决,各局部都设置成单线程解决。

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/image-20211027001546111.png” alt=”image-20211027001546111″ style=”zoom:33%;” />

局部音讯有序

要保障局部音讯有序,须要发送端和生产端配合解决。在发送端,要做到把同一业务ID的音讯发送到同一个Message Queue;在生产过程中,要做到从同一个Message Queue读取的音讯不被并发处

理,这样能力达到局部有序。生产端通过应用MessageListenerOrderly类来解决单Message Queue的音讯被并发解决的问题。

Consumer应用MessageListenerOrderly的时候,上面四个Consumer的设置仍旧能够应用:

  • setConsumeThreadMin
  • setConsumeThreadMax
  • setPullBatchSize
  • setConsumeMessageBatchMaxSize

前两个参数设置Consumer的线程数;PullBatchSize指的是一次从Broker的一个Message Queue获取音讯的最大数量,默认值是32;ConsumeMessageBatchMaxSize指的是这个Consumer的Executor(也就

是调用MessageListener解决的中央)一次传入的音讯数(List<MessageExt>msgs这个链表的最大长度),默认值是1。上述四个参数能够应用,阐明MessageListenerOrderly并不是简略地禁止并发解决。

在MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,生产每个音讯前,须要先取得这个音讯对应的Consumer Queue所对应的锁,这样保障了同一时间,同一个Consumer Queue的音讯不

被并发生产,但不同Consumer Queue的音讯能够并发解决。

事务音讯

RocketMQ提供了事务音讯的性能,采纳2PC(两段式协定)+弥补机制(事务回查)的分布式事务性能,通过音讯队列 RocketMQ 版事务音讯能达到分布式事务的最终统一。

  • 半事务音讯: 暂不能投递的音讯,发送方曾经胜利地将音讯发送到了音讯队列 RocketMQ 版服务端,然而服务端未收到生产者对该音讯的二次确认,此时该音讯被标记成“暂不能投递”状态,处于该种状态下的音讯即半事务音讯。
  • 音讯回查: 因为网络闪断、生产者利用重启等起因,导致某条事务音讯的二次确认失落,音讯队列 RocketMQ 版服务端通过扫描发现某条音讯长期处于“半事务音讯”时,须要被动向音讯生产者询问该音讯的最终状态(Commit 或是 Rollback),该询问过程即音讯回查。

事务音讯发送步骤如下:

  • 发送方将半事务音讯发送至音讯队列 RocketMQ 版服务端。
  • 音讯队列 RocketMQ 版服务端将音讯长久化胜利之后,向发送方返回 Ack 确认音讯曾经发送胜利,此时音讯为半事务音讯。
  • 发送方开始执行本地事务逻辑。
  • 发送方依据本地事务执行后果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务音讯标记为可投递,订阅方最终将收到该音讯;服务端收到 Rollback 状态则删除半事务音讯,订阅方将不会承受该音讯。

事务音讯回查步骤如下:

  • 在断网或者是利用重启的非凡状况下,上述步骤 4 提交的二次确认最终未达到服务端,通过固定工夫后服务端将对该音讯发动音讯回查。
  • 发送方收到音讯回查后,须要查看对应音讯的本地事务执行的最终后果。
  • 发送方依据查看失去的本地事务的最终状态再次提交二次确认,服务端仍依照步骤 4 对半事务音讯进行操作。

简而言之:

  • 发送流程:发送half message(半音讯),执行本地事务,发送事务执行后果
  • 定时工作回查流程:MQ定时工作扫描半音讯,回查本地事务,发送事务执行后果

外围流程

启动流程

负载平衡

<img src=”https://slagsea-picture.oss-cn-beijing.aliyuncs.com/img/RocketMQ%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1.png” alt=”RocketMQ负载平衡” style=”zoom:75%;” />

向broker发送心跳

写在最初

写文章实属不易,关注我的博客和公众号,就是我创作的能源~

本文由博客群发一文多发等经营工具平台 OpenWrite 公布

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理