乐趣区

关于消息队列:Javaer-进阶必看的-RocketMQ-就这篇了

每个时代,都不会亏待会学习的人。

大家好,我是 yes。

继上一篇 头条终面:写个消息中间件,我提到实现消息中间件的一些关键点,明天就和大家一起深刻生产级别消息中间件 – RocketMQ 的内核实现,来看看真正落地能撑持万亿级音讯容量、低提早的音讯队列到底是如何设计的。

这篇文章我会先介绍整体的架构设计,而后再深刻各外围模块的具体设计、外围流程的分析。

还会提及应用的一些留神点和最佳实际。

对于音讯队列的用途和一些概念不太分明的同学强烈建议先看音讯队列面试连环问,这篇文章介绍了音讯队列的应用场景、基本概念和常见面试题。

话不多说,上车。

RocketMQ 整体架构设计

整体的架构设计次要分为四大局部,别离是:Producer、Consumer、Broker、NameServer。

为了更贴合理论,我画的都是集群部署,像 Broker 我还画了主从。

  • Producer:就是音讯生产者,能够集群部署。它会先和 NameServer 集群中的随机一台建设长连贯,得悉以后要发送的 Topic 存在哪台 Broker Master 上,而后再与其建设长连贯,反对多种负载平衡模式发送音讯。
  • Consumer:音讯消费者,也能够集群部署。它也会先和 NameServer 集群中的随机一台建设长连贯,得悉以后要音讯的 Topic 存在哪台 Broker Master、Slave 上,而后它们建设长连贯,反对集群生产和播送生产音讯。
  • Broker:次要负责音讯的存储、查问生产,反对主从部署,一个 Master 能够对应多个 Slave,Master 反对读写,Slave 只反对读。Broker 会向集群中的每一台 NameServer 注册本人的路由信息。
  • NameServer:是一个很简略的 Topic 路由注册核心,反对 Broker 的动静注册和发现,保留 Topic 和 Borker 之间的关系。通常也是集群部署,然而 各 NameServer 之间不会相互通信,各 NameServer 都有残缺的路由信息,即无状态。

我再用一段话来概括它们之间的交互:

先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包,包含:IP、Port、TopicInfo,NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相干信息,代表下线。

这样每个 NameServer 就晓得集群所有 Broker 的相干信息,此时 Producer 上线从 NameServer 就能够得悉它要发送的某 Topic 音讯在哪个 Broker 上,和对应的 Broker(Master 角色的)建设长连贯,发送音讯。

Consumer 上线也能够从 NameServer 得悉它所要接管的 Topic 是哪个 Broker,和对应的 Master、Slave 建设连贯,接管音讯。

简略的工作流程如上所述,置信大家对整体数据流转曾经有点印象了,咱们再来看看每个局部的详细情况。

NameServer

它的特点就是轻量级,无状态。角色相似于 Zookeeper 的状况,从下面形容晓得其次要的两个性能就是:Broker 治理、路由信息管理。

总体而言比较简单,我再贴一些字段,让大家有更直观的印象晓得它存储了些什么。

Producer

Producer 无非就是音讯生产者,那首先它得晓得音讯要发往哪个 Broker,于是每 30s 会从某台 NameServer 获取 Topic 和 Broker 的映射关系存在本地内存中,如果发现新的 Broker 就会和其建设长连贯,每 30s 会发送心跳至 Broker 保护连贯。

并且会 轮询以后能够发送的 Broker 来发送音讯 ,达到负载平衡的目标,在 同步发送状况 下如果发送失败会默认重投两次(retryTimesWhenSendFailed = 2),并且不会抉择上次失败的 broker,会向其余 broker 投递。

异步发送 失败的状况下也会重试,默认也是两次(retryTimesWhenSendAsyncFailed = 2),然而仅在同一个 Broker 上重试。

Producer 启动流程

而后咱们再来看看 Producer 的启动流程看看都干了些啥。

大抵启动流程图中曾经表明的很清晰的,然而有些细节可能还不分明,比方重均衡啊,TBW102 啥玩意啊,有哪些定时工作啊,别急都会提到的。

有人可能会问这生产者为什么要启拉取服务、重均衡?

因为 Producer 和 Consumer 都须要用 MQClientInstance,而同一个 clientId 是共用一个 MQClientInstance 的,clientId 是通过本机 IP 和 instanceName(默认值 default)拼起来的,所以多个 Producer、Consumer 理论用的是一个 MQClientInstance。

至于有哪些定时工作,请看下图:

Producer 发消息流程

咱们再来看看发消息的流程,大抵也不是很简单,无非就是找到要发送音讯的 Topic 在哪个 Broker 上,而后发送音讯。

当初就晓得 TBW102 是啥用的,就是承受主动创立主题的 Broker 启动会把这个默认主题注销到 NameServer,这样当 Producer 发送新 Topic 的音讯时候就得悉哪个 Broker 能够主动创立主题,而后发往那个 Broker。

而 Broker 承受到这个音讯的时候发现没找到对应的主题,然而它承受创立新主题,这样就会创立对应的 Topic 路由信息。

主动创立主题的弊病

主动创立主题那么有可能该主题的音讯都只会发往一台 Broker,起不到负载平衡的作用。

因为创立新 Topic 的申请达到 Broker 之后,Broker 创立对应的路由信息,然而心跳是每 30s 发送一次,所以说 NameServer 最长须要 30s 能力得悉这个新 Topic 的路由信息。

假如此时发送方还在间断疾速的发送音讯 ,那 NameServer 上其实还没有对于这个 Topic 的路由信息,所以 有机会 让别的容许主动创立的 Broker 也创立对应的 Topic 路由信息,这样集群里的 Broker 就能承受这个 Topic 的信息,达到负载平衡的目标,但也有个别 Broker 可能,没收到。

如果发送方这一次发了之后 30s 内一个都不发,之前的那个 Broker 随着心跳把这个路由信息更新到 NameServer 了,那么之后发送该 Topic 音讯的 Producer 从 NameServer 只能得悉该 Topic 音讯只能发往之前的那台 Broker,这就不平衡了,如果这个新主题音讯很多,那台 Broker 负载就很高了。

所以不倡议线上开启容许主动创立主题,即 autoCreateTopicEnable 参数。

发送音讯故障提早机制

有一个参数是 sendLatencyFaultEnable,默认不开启。这个参数的作用是对于之前发送超时的 Broker 进行一段时间的退却。

发送音讯会记录此时发送音讯的工夫,如果超过肯定工夫,那么此 Broker 就在一段时间内不容许发送。

比方发送工夫超过 15000ms 则在 600000 ms 内无奈向该 Broker 发送音讯。

这个机制其实很要害,发送超时大概率表明此 Broker 负载高,所以先避让一会儿,让它缓一缓,这也是实现音讯发送高可用的要害。

小结一下

Producer 每 30s 会向 NameSrv 拉取路由信息更新本地路由表,有新的 Broker 就和其建设长连贯,每隔 30s 发送心跳给 Broker。

不要在生产环境开启 autoCreateTopicEnable。

Producer 会通过重试和提早机制晋升音讯发送的高可用。

Broker

Broker 就比较复杂一些了,然而十分重要。大抵分为以下五大模块,咱们来看一下官网的图。

  • Remoting 近程模块,解决客户申请。
  • Client Manager 治理客户端,保护订阅的主题。
  • Store Service 提供音讯存储查问服务。
  • HA Serivce,主从同步高可用。
  • Index Serivce,通过指定 key 建设索引,便于查问。

有几个模块没啥可说的就不剖析了,先看看存储的。

Broker 的存储

RocketMQ 存储用的是本地文件存储系统,效率高也牢靠。

次要波及到三种类型的文件,别离是 CommitLog、ConsumeQueue、IndexFile。

CommitLog

RocketMQ 的所有主题的音讯都存在 CommitLog 中,单个 CommitLog 默认 1G,并且文件名以起始偏移量命名,固定 20 位,有余则后面补 0,比方 00000000000000000000 代表了第一个文件,第二个文件名就是 00000000001073741824,表明起始偏移量为 1073741824,以这样的形式命名用偏移量就能找到对应的文件。

所有音讯都是程序写入的,超过文件大小则开启下一个文件。

ConsumeQueue

ConsumeQueue 音讯生产队列,能够认为是 CommitLog 中音讯的索引,因为 CommitLog 是糅合了所有主题的音讯,所以通过索引能力更加高效的查找音讯。

ConsumeQueue 存储的条目是固定大小,只会存储 8 字节的 commitlog 物理偏移量,4 字节的音讯长度和 8 字节 Tag 的哈希值,固定 20 字节。

在理论存储中,ConsumeQueue 对应的是一个 Topic 下的某个 Queue,每个文件约 5.72M,由 30w 条数据组成。

消费者是先从 ConsumeQueue 来失去音讯实在的物理地址,而后再去 CommitLog 获取音讯。

IndexFile

IndexFile 就是索引文件,是额定提供查找音讯的伎俩,不影响主流程。

通过 Key 或者工夫区间来查问对应的音讯,文件名以创立工夫戳命名,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 存储 2000W 个索引。

咱们再来看看以上三种文件的内容是如何生成的:

音讯到了先存储到 Commitlog,而后会有一个 ReputMessageService 线程靠近实时地将音讯转发给音讯生产队列文件与索引文件,也就是说是异步生成的。

音讯刷盘机制

RocketMQ 提供音讯同步刷盘和异步刷盘两个抉择,对于刷盘咱们都晓得效率比拟低,单纯存入内存中的话效率是最高的,然而可靠性不高,影响音讯可靠性的状况大抵有以下几种:

  1. Broker 被暴力敞开,比方 kill -9
  2. Broker 挂了
  3. 操作系统挂了
  4. 机器断电
  5. 机器坏了,开不了机
  6. 磁盘坏了

如果都是 1-4 的状况,同步刷盘必定没问题,异步的话就有可能失落局部音讯,5 和 6 就得依附正本机制了,如果同步双写必定是稳的,然而性能太差,如果异步则有可能失落局部音讯。

所以须要看场景来应用同步、异步刷盘和正本双写机制。

页缓存与内存映射

Commitlog 是混合存储的,所以所有音讯的写入就是程序写入,对文件的程序写入和内存的写入速度基本上没什么差异。

并且 RocketMQ 的文件都利用了内存映射即 Mmap,将程序虚构页面间接映射到页缓存上,无需有内核态再往用户态的拷贝,来看一下我之前文章画的图。

页缓存其实就是操作系统对文件的缓存,用来减速文件的读写,也就是说对文件的写入先写到页缓存中,操作系统会不定期刷盘(工夫不可控),对文件的读会先加载到页缓存中,并且依据局部性原理还会预读邻近块的内容。

其实也是因为应用内存映射机制,所以 RocketMQ 的文件存储都应用定长构造来存储,不便一次将整个文件映射至内存中。

文件预调配和文件预热

而内存映射也只是做了映射,只有当真正读取页面的时候产生缺页中断,才会将数据真正加载到内存中,所以 RocketMQ 做了一些优化,避免运行时的性能抖动。

文件预调配

CommitLog 的大小默认是 1G,当超过大小限度的时候须要筹备新的文件,而 RocketMQ 就起了一个后盾线程 AllocateMappedFileService,一直的解决 AllocateRequest,AllocateRequest 其实就是预调配的申请,会提前准备好下一个文件的调配,避免在音讯写入的过程中调配文件,产生抖动。

文件预热

有一个 warmMappedFile 办法,它会把以后映射的文件,每一页遍历多去,写入一个 0 字节,而后再调用 mlock 和 madvise(MADV_WILLNEED)。

mlock:能够将过程应用的局部或者全副的地址空间锁定在物理内存中,避免其被替换到 swap 空间。

madvise:给操作系统倡议,说这文件在不久的未来要拜访的,因而,提前读几页可能是个好主见。

小结一下

CommitLog 采纳混合型存储,也就是所有 Topic 都存在一起,程序追加写入,文件名用起始偏移量命名。

音讯先写入 CommitLog 再通过后盾线程散发到 ConsumerQueue 和 IndexFile 中。

消费者先读取 ConsumerQueue 失去真正音讯的物理地址,而后拜访 CommitLog 失去真正的音讯。

利用了 mmap 机制缩小一次拷贝,利用文件预调配和文件预热进步性能。

提供同步和异步刷盘,依据场景抉择适合的机制。

Broker 的 HA

从 Broker 会和主 Broker 建设长连贯,而后获取主 Broker commitlog 最大偏移量,开始向主 Broker 拉取音讯,主 Broker 会返回肯定数量的音讯,循环进行,达到主从数据同步。

消费者生产音讯会先申请主 Broker,如果主 Broker 感觉当初压力有点大,则会返回从 Broker 拉取音讯的倡议,而后消费者就去从服务器拉取音讯。

Consumer

生产有两种模式,别离是播送模式和集群模式。

播送模式:一个分组下的每个消费者都会生产残缺的 Topic 音讯。

集群模式:一个分组下的消费者瓜分生产 Topic 音讯。

个别咱们用的都是集群模式。

而消费者生产音讯又分为推和拉模式,具体看我这篇文章音讯队列推拉模式,别离从源码级别剖析了 RokcetMQ 和 Kafka 的音讯推拉,以及推拉模式的优缺点。

Consumer 端的负载平衡机制

Consumer 会定期的获取 Topic 下的队列数,而后再去查找订阅了该 Topic 的同一生产组的所有消费者信息,默认的调配策略是相似分页排序调配。

将队列排好序,而后消费者排好序,比方队列有 9 个,消费者有 3 个,那消费者 -1 生产队列 0、1、2 的音讯,消费者 -2 生产队列 3、4、5,以此类推。

所以如果负载太大,那么就加队列,加消费者,通过负载平衡机制就能够感知到重均衡,平均负载。

Consumer 音讯生产的重试

难免会遇到音讯生产失败的状况,所以须要提供生产失败的重试,而个别的生产失败要么就是音讯构造有误,要么就是一些临时无奈解决的状态,所以立刻重试不太适合。

RocketMQ 会给 每个生产组 都设置一个重试队列,Topic 是 %RETRY%+consumerGroup,并且设定了很多重试级别来提早重试的工夫。

为了利用 RocketMQ 的延时队列性能,重试的音讯会先保留在 Topic 名称为“SCHEDULE_TOPIC_XXXX”的提早队列,在音讯的扩大字段外面会存储原来所属的 Topic 信息。

delay 一段时间后再复原到重试队列中,而后 Consumer 就会生产这个重试队列主题,失去之前的音讯。

如果超过肯定的重试次数都生产失败,则会移入到死信队列,即 Topic %DLQ%" + ConsumerGroup 中,存储死信队列即认为生产胜利,因为切实没辙了,临时放过。

而后咱们能够通过人工来解决死信队列的这些音讯。

音讯的全局程序和部分程序

全局程序就是打消所有并发,一个 Topic 一个队列,Producer 和 Consuemr 的并发都为一。

部分程序其实就是指某个队列程序,多队列之间还是能并行的。

能够通过 MessageQueueSelector 指定 Producer 某个业务只发这一个队列,而后 Comsuer 通过 MessageListenerOrderly 承受音讯,其实就是加锁生产。

在 Broker 会有一个 mqLockTable,程序音讯在创立拉取音讯工作的时候须要在 Broker 锁定该音讯队列,之后加锁胜利的能力生产。

而严格的程序音讯其实很难,假如当初都好好的,如果有个 Broker 宕机了,而后产生了重均衡,队列对应的消费者实例就变了,就会有可能会呈现乱序的状况,如果要放弃严格程序,那此时就只能让整个集群不可用了。

一些留神点

1、订阅音讯是以 ConsumerGroup 为单位存储的,所以 ConsumerGroup 中的每个 Consumer 须要有雷同的订阅。

因为订阅音讯是随着心跳上传的,如果一个 ConsumerGroup 中 Consumer 订阅信息不一样,那么就会呈现相互笼罩的状况。

比方消费者 A 订阅 Topic a,消费者 B 订阅 Topic b,此时消费者 A 去 Broker 拿音讯,而后 B 的心跳包收回了,Broker 更新了,而后接到 A 的申请,一脸懵逼,没这订阅关系啊。

2、RocketMQ 主从读写拆散

从只能读,不能写,并且只有以后客户端读的 offset 和 以后 Broker 已承受的最大 offset 超过限度的物理内存大小时候才会去从读,所以 失常状况下从分担不了流量

3、单单加机器晋升不了生产速度,队列的数量也须要跟上。

4、之前提到的,不要容许主动创立主题

RocketMQ 的最佳实际

这些最佳实际局部参考自官网。

Tags 的应用

倡议一个利用一个 Topic,利用 tages 来标记不同业务,因为 tages 设置比拟灵便,且一个利用一个 Topic 很清晰,能直观的分别。

Keys 的应用

如果有音讯业务上的惟一标识,请填写到 keys 字段中,不便日后的定位查找。

进步 Consumer 的生产能力

1、进步生产并行度:减少队列数和消费者数量,进步单个消费者的并行生产线程,参数 consumeThreadMax。

2、批处理生产,设置 consumeMessageBatchMaxSize 参数,这样一次能拿到多条音讯,而后比方一个 update 语句之前要执行十次,当初一次就执行完。

3、跳过非核心的音讯,当负载很重的时候,为了保住那些外围的音讯,设置那些非核心的音讯,例如此时音讯沉积 1W 条了之后,就间接返回生产胜利,跳过非核心音讯。

NameServer 的寻址

请应用 HTTP 动态服务器寻址(默认),这样 NameServer 就能动静发现。

JVM 选项

以下抄自官网:

如果不关怀 RocketMQ Broker 的启动工夫,通过“预触摸”Java 堆以确保在 JVM 初始化期间每个页面都将被调配。

那些不关怀启动工夫的人能够启用它:​ -XX:+AlwaysPreTouch
禁用偏置锁定可能会缩小 JVM 暂停,​ -XX:-UseBiasedLocking
至于垃圾回收,倡议应用带 JDK 1.8 的 G1 收集器。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30

另外不要把 -XX:MaxGCPauseMillis 的值设置太小,否则 JVM 将应用一个小的年老代来实现这个指标,这将导致十分频繁的 minor GC,所以倡议应用 rolling GC 日志文件:

-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m

Linux 内核参数

以下抄自官网:

  • vm.extra_free_kbytes,通知 VM 在后盾回收(kswapd)启动的阈值与间接回收(通过调配过程)的阈值之间保留额定的可用内存。RocketMQ 应用此参数来防止内存调配中的长提早。(与具体内核版本相干)
  • vm.min_free_kbytes,如果将其设置为低于 1024KB,将会奇妙的将零碎毁坏,并且零碎在高负载下容易呈现死锁。
  • vm.max_map_count,限度一个过程可能具备的最大内存映射区域数。RocketMQ 将应用 mmap 加载 CommitLog 和 ConsumeQueue,因而倡议将为此参数设置较大的值。(agressiveness –> aggressiveness)
  • vm.swappiness,定义内核替换内存页面的踊跃水平。较高的值会减少攻击性,较低的值会缩小交换量。倡议将值设置为 10 来防止替换提早。
  • File descriptor limits,RocketMQ 须要为文件(CommitLog 和 ConsumeQueue)和网络连接关上文件描述符。咱们倡议设置文件描述符的值为 655350。
  • Disk scheduler,RocketMQ 倡议应用 I / O 截止工夫调度器,它试图为申请提供有保障的提早。

最初

其实还有很多没讲,比方流量管制、音讯的过滤、定时音讯的实现,包含底层通信 1+N+M1+M2 的 Reactor 多线程设计等等。

次要是内容太多了,而且也不太影响主流程,所以还是剥离进去之后写吧,大抵的一些实现还是讲了的。

包含元信息的交互、音讯的发送、存储、生产等等。

对于事务音讯的那一块我之前文章也剖析过了,所以这个就不再贴了。

能够看到要实现一个生产级别的音讯队列还是有很多很多货色须要思考的,不过大抵的架构和波及到的模块差不多就这些了。

至于具体的细节深刻,还是得靠大家自行钻研了,我就起个抛砖引玉的作用。

最初集体能力无限,如果哪里有纰漏请放松分割抨击我!


微信搜一搜「yes 的练级攻略」,回复 123,20W 字算法刷题攻略等你支付

我是 yes,从一点点到亿点点,咱们下篇见

退出移动版