Topic
Topic是一类音讯的汇合,是一种逻辑上的分区。为什么说是逻辑分区呢?因为最终数据是存储到Broker上的,而且为了满足高可用,采纳了分布式的存储。
这和Kafka中的实现一模一样,Kafka的Topic也是一种逻辑概念,每个Topic的数据会分成很多份,而后存储在不同的Broker上,这个「份」叫Partition。而在RocketMQ中,Topic的数据也会分布式的存储,这个「份」叫MessageQueue。
其散布能够用下图来示意。
这样一来,如果某个Broker所在的机器意外宕机,而且刚好MessageQueue中的数据还没有长久化到磁盘,那么该Topic下的这部分音讯就会齐全失落。此时如果有备份的话,MQ就能够持续对外提供服务。
为什么还会呈现没有长久化到磁盘的状况呢?当初的OS当中,程序写入数据到文件之后,并不会立马写入到磁盘,因为磁盘I/O是十分耗时的操作,在计算机来看是十分慢的一种操作。所以写入文件的数据会先写入到OS本人的缓存中去,而后择机异步的将Buffer中的数据刷入磁盘。
通过多正本冗余的机制,使得RocketMQ具备了高可用的个性。除此之外,分布式存储可能应答前期业务大量的数据存储。如果不应用分布式进行存储,那么随着前期业务倒退,音讯量越来越大,单机是无论如何也满足不了RocketMQ音讯的存储需要的。如果不做解决,那么一台机器的磁盘总有被塞满的时候,此时的零碎就不具备可伸缩的个性,也无奈满足业务的应用要求了。
然而这里的可伸缩,和微服务中的服务可伸缩还不太一样。因为在微服务中,各个服务是无状态的。而Broker是有状态的,每个Broker上存储的数据都不太一样,因为Producer在发送音讯的时候会通过指定的算法,从Message Queue列表中选出一个MessageQueue发送音讯。
如果不是很了解这个横向扩大,那么能够把它当成Redis的Cluster,通过一致性哈希,抉择到Redis Cluster中的具体某个节点,而后将数据写入Redis Master中去。如果此时想要扩容很不便,只须要往Redis Cluster中新增Master节点就好了。
所以,数据分布式的存储实质上是一种数据分片的机制。在此基础上,通过冗余多正本,达成了高可用。
Broker
Broker能够了解为咱们微服务中的一个服务的某个实例,因为微服务中咱们的服务一般来说都会多实例部署,而RocketMQ也同理,多实例部署能够帮忙零碎扛住更多的流量,也从某种方面进步了零碎的健壮性。
在RocketMQ4.5之前,它应用主从架构,每一个Master Broker都有一个本人的Slave Broker。
那RocketMQ的主从Broker是如何进行数据同步的呢?
Broker启动的时候,会启动一个定时工作,定期的从Master Broker同步全量的数据。
这块能够先不必纠结,前面咱们会通过源码来验证这个主从同步逻辑。
下面提到了Broker会部署很多个实例,那么既然多实例部署,那必然会存在一个问题,客户端是如何得悉本人是连贯的哪个服务器?如何得悉对应的Broker的IP地址和端口?如果某个Broker忽然挂了怎么办?
NameServer
这就须要NameServer了,NameServer是什么?
这里先拿Spring Cloud举例子——Spring Cloud中服务启动的时候会将本人注册到Eureka注册核心上。当服务实例启动的时候,会从Eureka拉取全量的注册表,并且之后定期的从Eureka增量同步,并且每隔30秒发送心跳到Eureka去续约。如果Eureka检测到某个服务超过了90秒没有发送心跳,那么就会该服务宕机,就会将其从注册表中移除。
RocketMQ中,NameServer充当的也是相似的角色。两者从性能上也有肯定的区别。
Broker在启动的时候会向NameServer注册本人,并且每隔30秒向NameServerv发送心跳。如果某个Broker超过了120秒没有发送心跳,那么就会认为该Broker宕机,就会将其从保护的信息中移除。这块前面也会从源码层面验证。
当然NameServer不仅仅是存储了各个Broker的IP地址和端口,还存储了对应的Topic的路由数据。什么是路由数据呢?那就是某个Topic下的哪个Message Queue在哪台Broker上。
Producer
总体流程
接下来,咱们来看看Producer发送一条音讯到Broker的时候会做什么事件,整体的流程如下。
查看音讯合法性
整体来看,其实是个很简略的操作,跟咱们平时写代码是一样的,来申请了先校验申请是否非法。Producer启动这里会去校验以后Topic数据的合法性。
Topic名称中是否蕴含了非法字符
Topic名称长度是否超过了最大的长度限度,由常量TOPIC_MAX_LENGTH来决定,其默认值为127
以后音讯体是否是NULL或者是空音讯
以后音讯体是否超过了最大限度,由常量maxMessageSize决定,值为1024 1024 4,也就是4M。
都是些很惯例的操作,和咱们平时写的checker都差不多。
获取Topic的详情
当通过了音讯的合法性校验之后,就须要持续往下走。此时的关注点就应该从音讯是否非法转移到我要发消息给谁。
此时就须要通过以后音讯所属的Topic拿到Topic的具体数据。
获取Topic的办法源码在下面曾经给进去了,首先会从内存中保护的一份Map中获取数据。顺带一提,这里的Map是ConcurrentHashMap,是线程平安的,和Golang中的Sync.Map相似。
当然,首次发送的话,这个Map必定是空的,此时会调用NameServer的接口,通过Topic去获取详情的Topic数据,此时会在下面的办法中将其退出到Map中去,这样一来下次再往该Topic发送音讯就可能间接从内存中获取。这里就是简略的实现的缓存机制 。
从办法名称来看,是通过Topic获取路由数据。实际上该办法,通过调用NameServer提供的API,更新了两局部数据,别离是:
- Topic路由信息
- Topic下的Broker相干信息
而这两局部数据都来源于同一个构造体TopicRouteData。其构造如下。
通过源码能够看到,就蕴含了该Topic下所有Broker下的Message Queue相干的数据、所有Broker的地址信息。
发送的具体Queue
此时咱们获取到了须要发送到的Broker详情,包含地址和MessageQueue,那么此时问题的关注点又该从「音讯发送给谁」转移到「音讯具体发送到哪儿」。
什么叫发送到哪儿?
开篇提到过一个Topic下会被分为很多个MessageQueue,「发送到哪儿」指的就是具体发送到哪一个Message Queue中去。
Message Queue抉择机制
外围的抉择逻辑
还是先给出流程图
外围逻辑,用大白话讲就是将一个随机数和Message Queue的容量取模。这个随机数存储在Thread Local中,首次计算的时候,会间接随机一个数。
尔后,都间接从ThreadLocal中取出该值,并且+1返回,拿到了MessageQueue的数量和随机数两个要害的参数之后,就会执行最终的计算逻辑。
接下来,咱们来看看抉择Message Queue的办法SelectOneMessageQueue都做了什么操作吧。
能够看到,主逻辑被变量sendLatencyFaultEnable分为了两局部。
容错机制下的抉择逻辑
该变量表意为发送提早故障。实质上是一种容错的策略,在原有的MessageQueue抉择根底上,再过滤掉不可用的Broker,对之前失败的Broker,按肯定的工夫做退却。
能够看到,如果调用Broker信息产生了异样,那么就会调用updateFault这个办法,来更新Broker的Aviable状况。留神这个参数isolation的值为true。接下来咱们从源码级别来验证下面说的退却3000ms的事实。
能够看到,isolation值是true,则duration通过三元运算符计算出来后果为30000,也就是30秒。所以咱们能够得出结论,如果发送音讯抛出了异样,那么间接会将该Broker设置为30秒内不可用。
而如果只是发送提早较高,则会依据如下的map,依据提早的具体工夫,来判断该设置多少工夫的不可用。
例如,如果上次申请的latency超过550ms,就退却3000ms;超过1000,就退却60000;
失常状况下的抉择逻辑
而失常状况下,如果以后发送故障提早没有启用,则会走惯例逻辑,同样的会去for循环计算,循环中取到了MessageQueue之后会去判断是否和上次抉择的MessageQueue属于同一个Broker,如果是同一个Broker,则会从新抉择,直到抉择到不属于同一个Broker的MessageQueue,或者直到循环完结。这也是为了将音讯平均的散发存储,避免数据歪斜。
发送音讯
选到了具体的Message Queue之后就会开始执行发送音讯的逻辑,就会调用底层Netty的接口给发送进来,这块临时没啥可看的。
Broker的启动流程
主从同步
在下面提到过,RocketMQ有本人的主从同步,然而有两个不同的版本,版本的分水岭是在4.5版本。这两个版本区别是什么呢?
- 4.5之前:有点相似于Redis中,咱们手动的将某台机器通过命令slave of 变成另一台Redis的Slave节点,这样一来就变成了一个较为原始的一主一从的架构。为什么说原始呢?因为如果此时Master节点宕机,咱们须要人肉的去做故障转移。RocketMQ的主从架构也是这种状况。
- 4.5之后:引入了Dleger,能够实现一主多从,并且实现主动的故障转移。这就跟Redis后续推出了Sentinel是一样的。Dleger也是相似的作用。
下图是Broker启动代码中的源码。
能够看到判断了是否开启了Dleger,默认是不开启的。所以就会执行其中的逻辑。
刚好咱们就看到了,外面有Rocket主从同步数据的相干代码。
如果以后Broker节点的角色是Slave,则会启动一个周期性的定时工作,定期(也就是10秒)去Master Broker同步全量的数据。同步的数据包含:
- Topic的相干配置
- Cosumer的生产偏移量
- 提早音讯的Offset
- 订阅组的相干数据和配置
注册Broker
实现了被动同步定时工作的启动之后,就会去调用registerBrokerAll去注册Broker。可能这里会有点疑难,我这里是Broker启动,只有以后一个Broker实例,那这个All是什么意思呢?
All是指所有的NameServer,Broker启动的时候会将本人注册到每一个NameServer下来。为什么不只注册到一个NameServer就完事了呢?这样一来还能够提高效率。归根结底还是高可用的问题。
如果Broker只注册到了一台NameServer上,万一这台NameServer挂了呢?这个Broker对所有客户端就都不可见了。实际上Broker还在失常的运行。
进到registerBrokerAll中去。
能够看到,这里会判断是否须要进行注册。通过下面的截图能够看到,此时forceRegister的值为true,而是否要注册,决定权就交给了needRegister
为什么须要判断是否须要注册呢?因为Broker一旦注册到了NameServer之后,因为Producer不停的在写入数据,Consumer也在不停的生产数据,Broker也可能因为故障导致某些Topic下的Message Queue等要害的路由信息产生变动。
这样一来,NameServer中的数据和Broker中的数据就会不统一。
如何判断是否须要注册
大抵的思路是,Broker会从每一个NameServer中获取到以后Broker的数据,并和以后Broker节点中的数据做比照。凡是有一台NameServer数据和以后Broker不统一,都会进行注册操作。
接下来,咱们从源码层面验证这个逻辑。要害的逻辑我在图中也标注了进去。
能够看到, 就是通过比照Broker中的数据版本和NameServer中的数据版本来实现的。这个版本,注册的时候会写到注册的数据中存入NameServer中。
这里因为是有多个,所以RocketMQ用线程池来实现了多线程操作,并且用CountDownLatch来期待所有的返回后果。经典的用空间换工夫,Golang外面也有相似的操作,那就是sync.waitGroup。
对于任何一个数据不匹配,都会进行从新注册的事实,咱们也从源码层面来验证一下。
能够看到,如果任何一台NameServer的数据产生了Change,都会break,返回true。
这里的后果列表应用的是CopyOnWriteList来实现的。
因为这里是多线程去执行的判断逻辑,而失常的列表不是线程平安的。CopyOnWriteArrayList之所以是线程平安的,这归功于COW(Copy On Write),读申请时共用同一个List,波及到写申请时,会复制出一个List,并在写入数据的时候退出独占锁。比起间接对所有操作加锁,读写锁的模式拆散了读、写申请,使其互不影响,只对写申请加锁,升高了加锁的次数、缩小了加锁的耗费,晋升了整体操作的并发。
执行注册逻辑
这块就是构建数据,而后多线程并发的去发送申请,用CopyOnWriteArrayList来保留后果。不过,下面咱们提到过,Broker注册的时候,会把数据版本发送到NameServer并且存储起来,这块咱们能够看看发送到NameServer的数据结构。
能够看到,Topic的数据分为了两局部,一部分是外围的逻辑,另一部分是DataVersion,也就是咱们刚刚始终提到的数据版本。
Broker如何存储数据
刚刚在聊Producer最初提到的是,发送音讯到Broker就完了。不晓得大家有没有想过Broker是如何存储音讯的?
Commit log
先给出流程图
而后给出论断,Producer发送的音讯是存储在一种叫commit log的文件中的,Producer端每次写入的音讯是不等长的,当该CommitLog文件写入满1G,就会新建另一个新的CommitLog,持续写入。此次采取的是程序写入。
那么问题来了,Consumer来生产的时候,Broker是如何疾速找到对应的音讯的呢?咱们首先排除遍历文件查找的办法, 因为RocketMQ是以高吞吐、高性能著称的,必定不可能采取这种对于很慢的操作。那RocketMQ是如何做的呢?
答案是ConsumerQueue
ConsumerQueue
ConsumerQueue是什么?是文件。引入的目标是什么呢?进步生产的性能。
Broker在收到一条音讯的时候,写入Commit Log的同时,还会将以后这条音讯在commit log中的offset、音讯的size和对应的Tag的Hash写入到consumer queue文件中去。
每个MessageQueue都会有对应的ConsumerQueue文件存储在磁盘上,每个ConsumerQueue文件蕴含了30W条音讯,每条音讯的size大小为20字节,蕴含了8字节CommitLog的Offset、4字节的音讯长度、8字节的Tag的哈希值。这样一来,每个ConsumerQueue的文件大小就约为5.72M。
当该ConsumerQueue文件写满了之后,就会再新建一个ConsumerQueue文件,持续写入。
所以,ConsumerQueue文件能够看成是CommitLog文件的索引。
负载平衡
什么意思呢?假如咱们总共有6个MessageQueue,而后此时散布在了3台Broker上,每个Broker上蕴含了两个queue。此时Consumer有3台,咱们能够大抵的认为每个Consumer负责2个MessageQueue的生产。然而这里有一个准则,那就是一个MessageQueue只能被一台Consumer生产,而一台Consumer能够生产多个MessageQueue。
为什么?情理很简略,RocketMQ反对的程序生产,是指的分区程序性,也就是在单个MessageQueue中,音讯是具备程序性的,而如果多台Consumer去生产同一个MessageQueue,就很难去保障程序生产了。
因为有很多个Consumer在生产多个MessageQueue,所以为了不呈现数据歪斜,也为了资源的正当调配利用,在Producer发送音讯的时候,须要尽可能的将音讯平均的分发给多个MessageQueue。
同时,下面那种一个Consumer生产了2个MessageQueue的状况,万一这台Consumer挂了呢?这两个MessageQueue不就没人生产了?
以上两种状况别离是Producer端的负载平衡、Consumer端的负载平衡。
Producer端负载平衡
对于Producer端下面的负载平衡,下面的流程图曾经给了进去,并且给出了源码的验证。首先是容错策略,会去避开一段时间有问题的Broker,并且加上如果抉择了上次的Broker,就会从新进行抉择。
Consumer端负载平衡
首先Consumer端的负责平衡能够由两个对象触发:
- Broker
- Consumer本身
Consumer也会向所有的Broker发送心跳,将音讯的生产组名称、订阅关系汇合、音讯的通信模式和客户端的ID等等。Broker收到了Consumer的心跳之后,会将其存在Broker保护的一个Manager中,名字叫ConsumerManager。当Broker监听到了Consumer数量产生了变动,就会告诉Consumer进行Rebalance。
然而如果Broker告诉Consumer进行Rebalance的音讯丢了呢?这也就是为什么须要第Consumer本身进行触发的起因。Consumer会在启动的时候启动定时工作,周期性的执行rebalance操作。
默认是20秒执行一次。具体的代码如下。
具体流程
首先,Consumer的Rebalance会获取到本地缓存的Topic的全副数据,而后向Broker发动申请,拉取该Topic和ConsumerGroup下的所有的消费者信息。此处的Broker数据起源就是Consumer之前的心跳发送过来的数据。而后会对Topic中MessageQueue和消费者ID进行排序,而后用音讯队列默认调配算法来进行调配,这里的默认调配策略是平均分配。
首先会平均的依照相似分页的思维,将MessageQueue调配给Consumer,如果调配的不平均,则会顺次的将剩下的MessageQueue依照排序的程序,从上往下的调配。所以在这里Consumer 1被调配到了4个MessageQueue,而Consumer 2被调配到了3个MessageQueue。
Rebalance完了之后,会将后果和Consumer缓存的数据做比照,移除不在ReBalance后果中的MessageQueue,将本来没有的MessageQueue给新增到缓存中。
触发机会
- Consumer启动时 启动之后会立马进行Rebalance
- Consumer运行中 运行中会监听Broker发送过去的Rebalance音讯,以及Consumer本身的定时工作触发的Rebalance
- Consumer进行运行 进行时没有间接的调用Rebalance,而是会告诉Broker本人下线了,而后Broker会告诉其余的Consumer进行Rebalance。
换一个角度来剖析,其实就是两个方面,一个是队列信息产生了变动,另一种是消费者产生了变动。
源码验证
而后给出外围的代码验证,获取数据的逻辑如下
验证了咱们刚刚说的获取了本地的Topic数据缓存,和从Broker端拉取所有的ConsumerID。
接下来是验证刚说的排序逻辑。
接下来是看判断后果是否产生了变动的源码。
能够看到,Consumer告诉Broker策略,其本质上就是发送心跳,将更新后的数据通过心跳发送给所有的Broker。
Consumer更多的细节
可能对于Consumer,咱们应用的更多一点。例如咱们晓得咱们能够设置集群生产和播送音讯,别离对应RocketMQ中的CLUSTERING和BROADCASTING。
再比方咱们晓得,咱们能够设置程序生产和并发生产等等,接下来就让咱们用源码来看看这些性能在RocketMQ中是怎么实现的。
生产模型
在Consumer中,默认都是采纳集群生产,这块在Consumer的代码中也有体现。
而生产模式的不同,会影响到治理offset的具体实现。
能够看到,当生产模型是播送模式时,Offset的长久化治理会应用实现LocalFileOffsetStorage
当生产模式是集群生产时,则会应用RemoteBrokerOffsetStore。
具体起因是什么呢?首先咱们得晓得播送模式和集群模式的区别在哪儿:
- 播送模式下,一条音讯会被ConsumerGroup中的每一台机器所生产
- 集群模式下,一条音讯只会被ConsumerGroup中的一台机器生产
所以在播送模式下,每个ConsumerGroup的生产进度都不一样,所以须要由Consumer本身来治理Offset。而集群模式下,同个ConsumerGroup下的生产进度其实是一样的,所以能够交由Broker对立治理。
生产模式
生产模式则分为程序生产和并发生产,别离对应实现MessageListenerOrderly和MessageListenerConcurrently两种形式。
不同的生产形式会采取不同的底层实现,配置实现之后就会调用start。
拉取音讯
接下来咱们来看一个跟咱们最最相干的问题,那就是咱们平时生产的音讯到底是怎么样从Broker发到的Consumer。在凑近启动Rebalance的中央,Consumer也开启了一个定时拉取音讯的线程。
这个线程做了什么事呢?它会不停的从一个保护在内存中的Queue中获取一个在写入的时候就构建好的PullRequest对象,调用具体实现去不停的拉取音讯了。
解决生产后果
在这里是否开启AutoCommit,所做的解决差不了很多,大家也都晓得,惟一区别就在于是否主动的提交Offset。对于解决胜利的逻辑也差不多,咱们平时业务逻辑中可能也并不关怀生产胜利的音讯。咱们更多关注的是如果生产失败了,RocketMQ是怎么解决的?
这是在AutoCommit下,如果生产失败了的解决逻辑。会记录一个失败的TPS,而后这里有一个十分要害的逻辑,那就是checkReconsumeTimes。
如果以后音讯的重试次数,如果大于了最大的重试生产次数,就会把生产发回给Broker。那最大重试次数是如何定义的。
如果值为-1,那么最大次数就是MAX_VALUE,也就是2147483647。这里有点奇怪啊,依照咱们平时的认知,难道不是重试16次吗?而后就看到了很骚的一句正文。
-1 means 16 times,这代码的确有点,一言难尽。
而后,如果超过了最大的次数限度,就会将该音讯调用Prodcuer的默认实现,将其发送到死信队列中。当然,死信队列也不是什么非凡的存在,就是一个独自的Topic而已。
通过getRetryTopic来获取的,默认是给以后的ConsumerGroup名称加上一个前缀。
好了以上就是本篇博客的全部内容了,如果你感觉这篇文章对你有帮忙,还麻烦点个赞,关个注,分个享,留个言。欢送微信搜寻关注【SH的全栈笔记】,查看更多相干文章