关于后端:RocketMQ基础概念剖析源码解析

25次阅读

共计 9627 个字符,预计需要花费 25 分钟才能阅读完成。

Topic

Topic 是一类音讯的汇合,是一种逻辑上的分区。为什么说是逻辑分区呢?因为最终数据是存储到 Broker 上的,而且为了满足 高可用 ,采纳了 分布式 的存储。

这和 Kafka 中的实现一模一样,Kafka 的 Topic 也是一种逻辑概念,每个 Topic 的数据会分成很多份,而后存储在不同的 Broker 上,这个「份」叫Partition。而在 RocketMQ 中,Topic 的数据也会分布式的存储,这个「份」叫MessageQueue

其散布能够用下图来示意。

这样一来,如果某个 Broker 所在的机器意外宕机,而且刚好 MessageQueue 中的数据还没有长久化到磁盘,那么该 Topic 下的这部分音讯就会齐全失落。此时如果有备份的话,MQ 就能够持续对外提供服务。

为什么还会呈现没有长久化到磁盘的状况呢?当初的 OS 当中,程序写入数据到文件之后,并不会立马写入到磁盘,因为磁盘 I / O 是十分耗时的操作,在计算机来看是十分慢的一种操作。所以写入文件的数据会先写入到 OS 本人的缓存中去,而后择机异步的将 Buffer 中的数据刷入磁盘。

通过 多正本冗余 的机制,使得 RocketMQ 具备了 高可用 的个性。除此之外,分布式存储可能应答前期业务大量的数据存储。如果不应用分布式进行存储,那么随着前期业务倒退,音讯量越来越大,单机是无论如何也满足不了 RocketMQ 音讯的存储需要的。如果不做解决,那么一台机器的磁盘总有被塞满的时候,此时的零碎就不具备 可伸缩 的个性,也无奈满足业务的应用要求了。

然而这里的可伸缩,和微服务中的服务可伸缩还不太一样。因为在微服务中,各个服务是无状态的。而 Broker 是有状态的,每个 Broker 上存储的数据都不太一样,因为 Producer 在发送音讯的时候会通过指定的算法,从 Message Queue 列表中选出一个 MessageQueue 发送音讯。

如果不是很了解这个横向扩大,那么能够把它当成 Redis 的 Cluster,通过 一致性哈希,抉择到 Redis Cluster 中的具体某个节点,而后将数据写入 Redis Master 中去。如果此时想要扩容很不便,只须要往 Redis Cluster 中新增 Master 节点就好了。

所以,数据分布式的存储实质上是一种 数据分片 的机制。在此基础上,通过冗余多正本,达成了高可用。

Broker

Broker 能够了解为咱们微服务中的一个服务的某个实例,因为微服务中咱们的服务一般来说都会多实例部署,而 RocketMQ 也同理,多实例部署能够帮忙零碎扛住更多的流量,也从某种方面进步了零碎的 健壮性

在 RocketMQ4.5之前,它应用主从架构,每一个 Master Broker 都有一个本人的 Slave Broker。

那 RocketMQ 的主从 Broker 是如何进行数据同步的呢?

Broker 启动的时候,会启动一个定时工作,定期的从 Master Broker 同步全量的数据。

这块能够先不必纠结,前面咱们会通过源码来验证这个主从同步逻辑。

下面提到了 Broker 会部署很多个实例,那么既然多实例部署,那必然会存在一个问题,客户端是如何得悉本人是连贯的哪个服务器?如何得悉对应的 Broker 的 IP 地址和端口?如果某个 Broker 忽然挂了怎么办?

NameServer

这就须要 NameServer 了,NameServer 是什么?

这里先拿 Spring Cloud 举例子——Spring Cloud 中服务启动的时候会将本人注册到 Eureka 注册核心上。当服务实例启动的时候,会从 Eureka 拉取全量的注册表,并且之后定期的从 Eureka 增量同步,并且每隔 30 秒发送心跳到 Eureka 去续约。如果 Eureka 检测到某个服务超过了 90 秒没有发送心跳,那么就会该服务宕机,就会将其从注册表中移除。

RocketMQ 中,NameServer 充当的也是相似的角色。两者从性能上也有肯定的 区别

Broker 在启动的时候会向 NameServer 注册本人,并且每隔 30 秒 向 NameServerv 发送心跳。如果某个 Broker 超过了 120 秒 没有发送心跳,那么就会认为该 Broker 宕机,就会将其从保护的信息中移除。这块前面也会从源码层面验证。

当然 NameServer 不仅仅是存储了各个 Broker 的 IP 地址和端口,还存储了对应的 Topic 的路由数据。什么是路由数据呢?那就是某个 Topic 下的哪个 Message Queue 在哪台 Broker 上。

Producer

总体流程

接下来,咱们来看看 Producer 发送一条音讯到 Broker 的时候会做什么事件,整体的流程如下。

查看音讯合法性

整体来看,其实是个很简略的操作,跟咱们平时写代码是一样的,来申请了先校验申请是否非法。Producer 启动这里会去校验以后 Topic 数据的合法性。

  • Topic 名称中是否蕴含了非法字符

  • Topic 名称长度是否超过了最大的长度限度,由常量 TOPIC_MAX_LENGTH 来决定,其默认值为 127

  • 以后音讯体是否是 NULL 或者是空音讯

  • 以后音讯体是否超过了最大限度,由常量 maxMessageSize 决定,值为 1024 1024 4,也就是 4M。

都是些很惯例的操作,和咱们平时写的 checker 都差不多。

获取 Topic 的详情

当通过了音讯的合法性校验之后,就须要持续往下走。此时的关注点就应该从 音讯是否非法 转移到 我要发消息给谁

此时就须要通过以后音讯所属的 Topic 拿到 Topic 的具体数据。

获取 Topic 的办法 源码在下面曾经给进去了,首先会从内存中保护的一份 Map 中获取数据。顺带一提,这里的 Map 是 ConcurrentHashMap,是线程平安的,和 Golang 中的 Sync.Map 相似。

当然,首次发送的话,这个 Map 必定是空的,此时会调用 NameServer 的接口,通过 Topic 去获取详情的 Topic 数据,此时会在下面的办法中将其退出到 Map 中去,这样一来下次再往该 Topic 发送音讯就可能间接从内存中获取。这里就是简略的实现的 缓存机制

从办法名称来看,是通过 Topic 获取路由数据。实际上该办法,通过调用 NameServer 提供的 API,更新了两局部数据,别离是:

  • Topic 路由信息
  • Topic 下的 Broker 相干信息

而这两局部数据都来源于同一个构造体 TopicRouteData。其构造如下。

通过源码能够看到,就蕴含了该 Topic 下所有 Broker 下的 Message Queue 相干的数据、所有 Broker 的地址信息。

发送的具体 Queue

此时咱们获取到了须要发送到的 Broker 详情,包含地址和 MessageQueue,那么此时问题的关注点又该从「音讯发送给谁 」转移到「 音讯具体发送到哪儿」。

什么叫发送到哪儿?

开篇提到过一个 Topic 下会被分为很多个 MessageQueue,「发送到哪儿」指的就是具体发送到哪一个 Message Queue 中去。

Message Queue 抉择机制

外围的抉择逻辑

还是先给出流程图

外围逻辑,用大白话讲就是将一个 随机数 Message Queue 的容量 取模。这个随机数存储在 Thread Local 中,首次计算的时候,会间接随机一个数。

尔后,都间接从 ThreadLocal 中取出该值,并且 + 1 返回,拿到了 MessageQueue 的数量和随机数两个要害的参数之后,就会执行最终的计算逻辑。

接下来,咱们来看看抉择 Message Queue 的办法 SelectOneMessageQueue 都做了什么操作吧。

能够看到,主逻辑被变量 sendLatencyFaultEnable 分为了两局部。

容错机制下的抉择逻辑

该变量表意为发送提早故障。实质上是一种容错的策略,在原有的 MessageQueue 抉择根底上,再过滤掉不可用的 Broker,对之前失败的 Broker,按肯定的工夫做退却。

能够看到,如果调用 Broker 信息产生了异样,那么就会调用 updateFault 这个办法,来更新 Broker 的 Aviable 状况。留神这个参数 isolation 的值为 true。接下来咱们从源码级别来验证下面说的退却 3000ms 的事实。

能够看到,isolation 值是 true,则 duration 通过三元运算符计算出来后果为 30000,也就是 30 秒。所以咱们能够得出结论,如果发送音讯抛出了异样,那么间接会将该 Broker 设置为 30 秒内不可用。

而如果只是发送提早较高,则会依据如下的 map,依据提早的具体工夫,来判断该设置多少工夫的不可用。

例如,如果上次申请的 latency 超过 550ms,就退却 3000ms;超过 1000,就退却 60000;

失常状况下的抉择逻辑

而失常状况下,如果以后发送故障提早没有启用,则会走惯例逻辑,同样的会去 for 循环计算,循环中取到了 MessageQueue 之后会去判断是否和上次抉择的 MessageQueue 属于同一个 Broker,如果是同一个 Broker,则会从新抉择,直到抉择到不属于同一个 Broker 的 MessageQueue,或者直到循环完结。这也是为了将音讯平均的散发存储,避免 数据歪斜

发送音讯

选到了具体的 Message Queue 之后就会开始执行发送音讯的逻辑,就会调用底层 Netty 的接口给发送进来,这块临时没啥可看的。

Broker 的启动流程

主从同步

在下面提到过,RocketMQ 有本人的主从同步,然而有两个不同的版本,版本的分水岭是在 4.5 版本。这两个版本区别是什么呢?

  • 4.5 之前:有点相似于 Redis 中,咱们手动的将某台机器通过命令 slave of 变成另一台 Redis 的 Slave 节点,这样一来就变成了一个较为原始的一主一从的架构。为什么说原始呢?因为如果此时 Master 节点宕机,咱们须要人肉的去做故障转移。RocketMQ 的主从架构也是这种状况。
  • 4.5 之后:引入了 Dleger,能够实现一主多从,并且实现主动的故障转移。这就跟 Redis 后续推出了 Sentinel 是一样的。Dleger 也是相似的作用。

下图是 Broker 启动代码中的源码。

能够看到判断了是否开启了 Dleger,默认是不开启的。所以就会执行其中的逻辑。

刚好咱们就看到了,外面有 Rocket 主从同步数据 的相干代码。

如果以后 Broker 节点的角色是 Slave,则会启动一个周期性的定时工作,定期(也就是 10 秒)去 Master Broker 同步全量的数据。同步的数据包含:

  • Topic 的相干配置
  • Cosumer 的生产偏移量
  • 提早音讯的 Offset
  • 订阅组的相干数据和配置

注册 Broker

实现了被动同步定时工作的启动之后,就会去调用 registerBrokerAll 去注册 Broker。可能这里会有点疑难,我这里是 Broker 启动,只有以后一个 Broker 实例,那这个 All 是什么意思呢?

All 是指所有的 NameServer,Broker 启动的时候会将本人注册到每一个 NameServer 下来。为什么不只注册到一个 NameServer 就完事了呢?这样一来还能够提高效率。归根结底还是高可用的问题。

如果 Broker 只注册到了一台 NameServer 上,万一这台 NameServer 挂了呢?这个 Broker 对所有客户端就都不可见了。实际上 Broker 还在失常的运行。

进到 registerBrokerAll 中去。

能够看到,这里会判断是否须要进行注册。通过下面的截图能够看到,此时 forceRegister 的值为 true,而是否要注册,决定权就交给了 needRegister

为什么须要判断是否须要注册呢?因为 Broker 一旦注册到了 NameServer 之后,因为 Producer 不停的在写入数据,Consumer 也在不停的生产数据,Broker 也可能因为故障导致某些 Topic 下的 Message Queue 等要害的路由信息产生变动。

这样一来,NameServer 中的数据和 Broker 中的数据就会 不统一

如何判断是否须要注册

大抵的思路是,Broker 会从每一个 NameServer 中获取到以后 Broker 的数据,并和以后 Broker 节点中的数据做比照。凡是有一台 NameServer 数据和以后 Broker 不统一,都会进行注册操作。

接下来,咱们从源码层面验证这个逻辑。要害的逻辑我在图中也标注了进去。

能够看到,就是通过比照 Broker 中的数据版本和 NameServer 中的数据版本来实现的。这个版本,注册的时候会写到注册的数据中存入 NameServer 中。

这里因为是有多个,所以 RocketMQ 用线程池来实现了多线程操作,并且用 CountDownLatch 来期待所有的返回后果。经典的用 空间换工夫,Golang 外面也有相似的操作,那就是 sync.waitGroup。

对于任何一个数据不匹配,都会进行从新注册的事实,咱们也从源码层面来验证一下。

能够看到,如果任何一台 NameServer 的数据产生了 Change,都会 break,返回 true。

这里的后果列表应用的是 CopyOnWriteList 来实现的。

因为这里是多线程去执行的判断逻辑,而失常的列表不是线程平安的。CopyOnWriteArrayList 之所以是线程平安的,这归功于 COW(Copy On Write),读申请 时共用同一个 List,波及到 写申请 时,会复制出一个 List,并在写入数据的时候退出独占锁。比起间接对所有操作加锁,读写锁的模式拆散了读、写申请,使其互不影响,只对写申请加锁,升高了加锁的次数、缩小了加锁的耗费,晋升了整体操作的并发。

执行注册逻辑

这块就是构建数据,而后多线程并发的去发送申请,用 CopyOnWriteArrayList 来保留后果。不过,下面咱们提到过,Broker 注册的时候,会把数据版本发送到 NameServer 并且存储起来,这块咱们能够看看发送到 NameServer 的数据结构。

能够看到,Topic 的数据分为了两局部,一部分是外围的逻辑,另一部分是 DataVersion,也就是咱们刚刚始终提到的数据版本。

Broker 如何存储数据

刚刚在聊 Producer 最初提到的是,发送音讯到 Broker 就完了。不晓得大家有没有想过 Broker 是如何存储音讯的?

Commit log

先给出流程图

而后给出论断,Producer 发送的音讯是存储在一种叫 commit log 的文件中的,Producer 端每次写入的音讯是不等长的,当该 CommitLog 文件写入满 1G,就会新建另一个新的 CommitLog,持续写入。此次采取的是程序写入。

那么问题来了,Consumer 来生产的时候,Broker 是如何疾速找到对应的音讯的呢?咱们首先排除遍历文件查找的办法,因为 RocketMQ 是以 高吞吐 高性能 著称的,必定不可能采取这种对于很慢的操作。那 RocketMQ 是如何做的呢?

答案是 ConsumerQueue

ConsumerQueue

ConsumerQueue 是什么?是 文件 。引入的目标是什么呢?进步 生产的性能

Broker 在收到一条音讯的时候,写入 Commit Log 的同时,还会将以后这条音讯在 commit log 中的 offset 音讯的 size和对应的 Tag 的 Hash 写入到 consumer queue 文件中去。

每个 MessageQueue 都会有对应的 ConsumerQueue 文件存储在磁盘上,每个 ConsumerQueue 文件蕴含了 30W 条音讯,每条音讯的 size 大小为 20 字节,蕴含了 8 字节 CommitLog 的 Offset、4 字节的音讯长度、8 字节的 Tag 的哈希值。这样一来,每个 ConsumerQueue 的文件大小就约为 5.72M。

当该 ConsumerQueue 文件写满了之后,就会再新建一个 ConsumerQueue 文件,持续写入。

所以,ConsumerQueue 文件能够看成是 CommitLog 文件的 索引

负载平衡

什么意思呢?假如咱们总共有 6 个 MessageQueue,而后此时散布在了 3 台 Broker 上,每个 Broker 上蕴含了两个 queue。此时 Consumer 有 3 台,咱们能够大抵的认为每个 Consumer 负责 2 个 MessageQueue 的生产。然而这里有一个准则,那就是一个 MessageQueue 只能被一台 Consumer 生产,而一台 Consumer 能够生产多个 MessageQueue。

为什么?情理很简略,RocketMQ 反对的程序生产,是指的分区程序性,也就是在单个 MessageQueue 中,音讯是具备程序性的,而如果多台 Consumer 去生产同一个 MessageQueue,就很难去保障程序生产了。

因为有很多个 Consumer 在生产多个 MessageQueue,所以为了不呈现 数据歪斜 ,也为了 资源的正当调配 利用,在 Producer 发送音讯的时候,须要尽可能的将音讯平均的分发给多个 MessageQueue。

同时,下面那种一个 Consumer 生产了 2 个 MessageQueue 的状况,万一这台 Consumer 挂了呢?这两个 MessageQueue 不就没人生产了?

以上两种状况别离是Producer 端的负载平衡Consumer 端的负载平衡

Producer 端负载平衡

对于 Producer 端下面的负载平衡,下面的流程图曾经给了进去,并且给出了源码的验证。首先是容错策略,会去避开一段时间有问题的 Broker,并且加上如果抉择了上次的 Broker,就会从新进行抉择。

Consumer 端负载平衡

首先 Consumer 端的负责平衡能够由两个对象触发:

  • Broker
  • Consumer 本身

Consumer 也会向所有的 Broker 发送心跳,将音讯的 生产组名称 订阅关系汇合 音讯的通信模式 客户端的 ID等等。Broker 收到了 Consumer 的心跳之后,会将其存在 Broker 保护的一个 Manager 中,名字叫 ConsumerManager。当 Broker 监听到了 Consumer 数量产生了变动,就会告诉 Consumer 进行 Rebalance。

然而如果 Broker 告诉 Consumer 进行 Rebalance 的音讯丢了呢?这也就是为什么须要第 Consumer 本身进行触发的起因。Consumer 会在启动的时候启动定时工作,周期性的执行 rebalance 操作。

默认是 20 秒执行一次。具体的代码如下。

具体流程

首先,Consumer 的 Rebalance 会获取到本地缓存的 Topic 的全副数据,而后向 Broker 发动申请,拉取该 Topic 和 ConsumerGroup 下的所有的消费者信息。此处的 Broker 数据起源就是 Consumer 之前的心跳发送过来的数据。而后会对 Topic 中 MessageQueue 和消费者 ID 进行排序,而后用音讯队列默认调配算法来进行调配,这里的默认调配策略是 平均分配

首先会平均的依照相似分页的思维,将 MessageQueue 调配给 Consumer,如果调配的不平均,则会顺次的将剩下的 MessageQueue 依照排序的程序,从上往下的调配。所以在这里 Consumer 1 被调配到了 4 个 MessageQueue,而 Consumer 2 被调配到了 3 个 MessageQueue。

Rebalance 完了之后,会将后果和 Consumer 缓存的数据做比照,移除不在 ReBalance 后果中的 MessageQueue,将本来没有的 MessageQueue 给新增到缓存中。

触发机会

  • Consumer 启动时 启动之后会立马进行 Rebalance
  • Consumer 运行中 运行中会监听 Broker 发送过去的 Rebalance 音讯,以及 Consumer 本身的定时工作触发的 Rebalance
  • Consumer 进行运行 进行时没有间接的调用 Rebalance,而是会告诉 Broker 本人下线了,而后 Broker 会告诉其余的 Consumer 进行 Rebalance。

换一个角度来剖析,其实就是两个方面,一个是 队列信息产生了变动 ,另一种是 消费者产生了变动

源码验证

而后给出外围的代码验证,获取数据的逻辑如下

验证了咱们刚刚说的获取了本地的 Topic 数据缓存,和从 Broker 端拉取所有的 ConsumerID。

接下来是验证刚说的排序逻辑。

接下来是看判断后果是否产生了变动的源码。

能够看到,Consumer 告诉 Broker 策略,其本质上就是发送心跳,将更新后的数据通过心跳发送给所有的 Broker。

Consumer 更多的细节

可能对于 Consumer,咱们应用的更多一点。例如咱们晓得咱们能够设置集群生产和播送音讯,别离对应 RocketMQ 中的 CLUSTERINGBROADCASTING

再比方咱们晓得,咱们能够设置程序生产和并发生产等等,接下来就让咱们用源码来看看这些性能在 RocketMQ 中是怎么实现的。

生产模型

在 Consumer 中,默认都是采纳集群生产,这块在 Consumer 的代码中也有体现。

而生产模式的不同,会影响到治理 offset 的具体实现。

能够看到,当生产模型是播送模式时,Offset 的长久化治理会应用实现 LocalFileOffsetStorage

当生产模式是集群生产时,则会应用 RemoteBrokerOffsetStore。

具体起因是什么呢?首先咱们得晓得播送模式和集群模式的区别在哪儿:

  • 播送模式 下,一条音讯会被 ConsumerGroup 中的每一台机器所生产
  • 集群模式 下,一条音讯只会被 ConsumerGroup 中的一台机器生产

所以在播送模式下,每个 ConsumerGroup 的生产进度都不一样,所以须要由 Consumer 本身来治理 Offset。而集群模式下,同个 ConsumerGroup 下的生产进度其实是一样的,所以能够交由 Broker 对立治理。

生产模式

生产模式则分为程序生产和并发生产,别离对应实现 MessageListenerOrderly 和 MessageListenerConcurrently 两种形式。

不同的生产形式会采取不同的底层实现,配置实现之后就会调用 start。

拉取音讯

接下来咱们来看一个跟咱们最最相干的问题,那就是咱们平时生产的音讯到底是怎么样从 Broker 发到的 Consumer。在凑近启动 Rebalance 的中央,Consumer 也开启了一个定时拉取音讯的线程。

这个线程做了什么事呢?它会不停的从一个保护在内存中的 Queue 中获取一个在写入的时候就构建好的 PullRequest 对象,调用具体实现去不停的拉取音讯了。

解决生产后果

在这里是否开启 AutoCommit,所做的解决差不了很多,大家也都晓得,惟一区别就在于是否主动的提交 Offset。对于解决胜利的逻辑也差不多,咱们平时业务逻辑中可能也并不关怀生产胜利的音讯。咱们更多关注的是如果生产失败了,RocketMQ 是怎么解决的?

这是在 AutoCommit 下,如果生产失败了的解决逻辑。会记录一个失败的 TPS,而后这里有一个 十分要害的逻辑,那就是 checkReconsumeTimes。

如果以后音讯的重试次数,如果大于了最大的 重试生产次数,就会把生产发回给 Broker。那最大重试次数是如何定义的。

如果值为 -1,那么最大次数就是 MAX_VALUE,也就是 2147483647。这里有点奇怪啊,依照咱们平时的认知,难道不是重试 16 次吗?而后就看到了很骚的一句正文。

-1 means 16 times,这代码的确有点,一言难尽。

而后,如果超过了最大的次数限度,就会将该音讯调用 Prodcuer 的默认实现,将其发送到 死信队列 中。当然,死信队列也不是什么非凡的存在,就是一个独自的 Topic 而已。

通过 getRetryTopic 来获取的,默认是给以后的 ConsumerGroup 名称加上一个前缀。

好了以上就是本篇博客的全部内容了,如果你感觉这篇文章对你有帮忙,还麻烦 点个赞 关个注 分个享 留个言

欢送微信搜寻关注【SH 的全栈笔记】,查看更多相干文章

正文完
 0