关于java:消息队列扫盲RabbitMQ-入门

32次阅读

共计 14370 个字符,预计需要花费 36 分钟才能阅读完成。

音讯队列扫盲

音讯队列顾名思义就是寄存音讯的队列,队列我就不解释了,别通知我你连队列都不晓得似啥吧?

所以问题并不是音讯队列是什么,而是 音讯队列为什么会呈现?音讯队列能用来干什么?用它来干这些事会带来什么益处?音讯队列会带来副作用吗?

音讯队列为什么会呈现?

音讯队列算是作为后端程序员的一个必备技能吧,因为 分布式应用必然波及到各个系统之间的通信问题,这个时候音讯队列也应运而生了。能够说分布式的产生是音讯队列的根底,而分布式怕是一个很古老的概念了吧,所以音讯队列也是一个很古老的中间件了。

音讯队列能用来干什么?

异步

你可能会反驳我,利用之间的通信又不是只能由音讯队列解决,好好的通信为什么两头非要插一个音讯队列呢?我不能间接进行通信吗?

很好????,你又提出了一个概念,同步通信。就比方当初业界应用比拟多的 Dubbo 就是一个实用于各个系统之间同步通信的 RPC 框架。

我来举个???? 吧,比方咱们有一个购票零碎,需要是用户在购买完之后能接管到购买实现的短信。

咱们省略两头的网络通信工夫耗费,如果购票零碎解决须要 150ms,短信零碎解决须要 200ms,那么整个解决流程的工夫耗费就是 150ms + 200ms = 350ms。

当然,乍看没什么问题。可是认真一想你就感觉有点问题,我用户购票在购票零碎的时候其实就曾经实现了购买,而我当初通过同步调用非要让整个申请拉长工夫,而短息零碎这玩意又不是很有必要,它仅仅是一个辅助性能加强用户体验感而已。我当初整个调用流程就有点 头重脚轻 的感觉了,购票是一个不太耗时的流程,而我当初因为同步调用,非要期待发送短信这个比拟耗时的操作才返回后果。那我如果再加一个发送邮件呢?

这样整个零碎的调用链又变长了,整个工夫就变成了 550ms。

当咱们在学生时代须要在食堂排队的时候,咱们和食堂大妈就是一个同步的模型。

咱们须要通知食堂大妈:“姐姐,给我加个鸡腿,再加个酸辣土豆丝,帮我浇点汁下来,多打点饭哦”咦~ 为了多吃点,真恶心。

而后大妈帮咱们打饭配菜,咱们看着大妈那颤动的手和掉落的土豆丝不禁咽了咽口水。

最终咱们从大妈手中接过饭菜而后去寻找座位了 …

回忆一下,咱们在给大妈发送须要的信息之后咱们是 同步期待大妈给我配好饭菜 的,下面咱们只是加了鸡腿和土豆丝,万一我再加一个番茄牛腩,韭菜鸡蛋,这样是不是大妈打饭配菜的流程就会变长,咱们期待的工夫也会相应的变长。

那起初,咱们工作赚钱了有钱去饭店吃饭了,咱们通知服务员来一碗牛肉面加个荷包蛋 (传播一个音讯),而后咱们就能够在饭桌上安心的玩手机了 (干本人其余事件),等到咱们的牛肉面上了咱们就能够吃了。这其中咱们也就传播了一个音讯,而后咱们又转过头干其余事件了。这其中尽管做面的工夫没有变短,然而咱们只须要传播一个音讯就可以看其余事件了,这是一个 异步 的概念。

所以,为了解决这一个问题,聪慧的程序员在两头也加了个相似于服务员的中间件——音讯队列。这个时候咱们就能够把模型给革新了。

这样,咱们在将音讯存入音讯队列之后咱们就能够间接返回了(咱们通知服务员咱们要吃什么而后玩手机),所以整个耗时只是 150ms + 10ms = 160ms。

然而你须要留神的是,整个流程的时长是没变的,就像你仅仅通知服务员要吃什么是不会影响到做面的速度的。

解耦

回到最后同步调用的过程,咱们写个伪代码简略概括一下。

那么第二步,咱们又增加了一个发送邮件,咱们就得从新去批改代码,如果咱们又加一个需要:用户购买完还须要给他加积分,这个时候咱们是不是又得改代码?

如果你感觉还行,那么我这个时候不要发邮件这个服务了呢,我是不是又得改代码,又得重启利用?

这样改来改去是不是很麻烦,那么 此时咱们就用一个音讯队列在两头进行解耦。你须要留神的是,咱们前面的发送短信、发送邮件、增加积分等一些操作都依赖于下面的 result,这货色形象进去就是购票的处理结果呀,比方订单号,用户账号等等,也就是说咱们前面的一系列服务都是须要同样的音讯来进行解决。既然这样,咱们是不是能够通过 “播送音讯” 来实现。

我下面所讲的“播送”并不是真正的播送,而是接下来的零碎作为消费者去 订阅 特定的主题。比方咱们这里的主题就能够叫做 订票 ,咱们购买零碎作为一个生产者去生产这条音讯放入音讯队列,而后消费者订阅了这个主题,会从音讯队列中拉取音讯并生产。就比方咱们刚刚画的那张图,你会发现,在生产者这边咱们只须要关注 生产音讯到指定主题中 ,而 消费者只须要关注从指定主题中拉取音讯 就行了。

如果没有音讯队列,每当一个新的业务接入,咱们都要在主零碎调用新接口、或者当咱们勾销某些业务,咱们也得在主零碎删除某些接口调用。有了音讯队列,咱们只须要关怀音讯是否送达了队列,至于谁心愿订阅,接下来收到音讯如何解决,是上游的事件,无疑极大地缩小了开发和联调的工作量。

削峰

咱们再次回到一开始咱们应用同步调用零碎的状况,并且思考一下,如果此时有大量用户申请购票整个零碎会变成什么样?

如果,此时有一万的申请进入购票零碎,咱们晓得运行咱们主业务的服务器配置个别会比拟好,所以这里咱们假如购票零碎能接受这一万的用户申请,那么也就意味着咱们同时也会呈现一万调用发短信服务的申请。而对于短信零碎来说并不是咱们的次要业务,所以咱们装备的硬件资源并不会太高,那么你感觉当初这个短信零碎能接受这一万的峰值么,且不说能不能接受,零碎会不会 间接解体 了?

短信业务又不是咱们的主业务,咱们能不能 折中解决 呢?如果咱们把购买实现的信息发送到音讯队列中,而短信零碎 尽本人所能地去音讯队列中取音讯和生产音讯,即便处理速度慢一点也无所谓,只有咱们的零碎没有解体就行了。

留得江山在,还怕没柴烧?你敢说每次发送验证码的时候是一发你就收到了的么?

音讯队列能带来什么益处?

其实下面我曾经说了。异步、解耦、削峰。 哪怕你下面的都没看懂也千万要记住这六个字,因为他不仅是音讯队列的精髓,更是编程和架构的精髓。

音讯队列会带来副作用吗?

没有哪一门技术是“银弹”,音讯队列也有它的副作用。

比方,原本好好的两个零碎之间的调用,我两头加了个音讯队列,如果音讯队列挂了怎么办呢?是不是 升高了零碎的可用性

那这样是不是要保障 HA(高可用)?是不是要搞集群?那么我 整个零碎的复杂度是不是回升了

抛开下面的问题不讲,万一我发送方发送失败了,而后执行重试,这样就可能产生反复的音讯。

或者我生产端解决失败了,申请重发,这样也会产生反复的音讯。

对于一些微服务来说,生产反复音讯会带来更大的麻烦,比方减少积分,这个时候我加了屡次是不是对其余用户不偏心?

那么,又 如何解决反复生产音讯的问题 呢?

如果咱们此时的音讯须要保障严格的程序性怎么办呢?比方生产者生产了一系列的有序音讯(对一个 id 为 1 的记录进行删除减少批改),然而咱们晓得在公布订阅模型中,对于主题是无程序的,那么这个时候就会导致对于消费者生产音讯的时候没有依照生产者的发送程序生产,比方这个时候咱们生产的程序为批改删除减少,如果该记录波及到金额的话是不是会出大事件?

那么,又 如何解决音讯的程序生产问题 呢?

就拿咱们下面所讲的分布式系统来说,用户购票实现之后是不是须要减少账户积分?在同一个零碎中咱们个别会应用事务来进行解决,如果用 Spring 的话咱们在下面伪代码中退出 @Transactional 注解就好了。然而在不同零碎中如何保障事务呢?总不能这个零碎我扣钱胜利了你那积分零碎积分没加吧?或者说我这扣钱明明失败了,你那积分零碎给我加了积分。

那么,又如何 解决分布式事务问题 呢?

咱们刚刚说了,音讯队列能够进行削峰操作,那如果我的消费者如果生产很慢或者生产者生产音讯很快,这样是不是会将音讯沉积在音讯队列中?

那么,又如何 解决音讯沉积的问题 呢?

可用性升高,复杂度回升,又带来一系列的反复生产,程序生产,分布式事务,音讯沉积的问题,这音讯队列还怎么用啊?????

别急,方法总是有的。

RocketMQ 是什么?

哇,你个混蛋!下面给我抛出那么多问题,你当初又讲 RocketMQ,还让不让人活了?!

别急别急,话说你当初分明 MQ 的结构吗,我还没讲呢,咱们先搞明确 MQ 的外部结构,再来看看如何解决下面的一系列问题吧,不过你最好带着问题去浏览和理解喔。

RocketMQ 是一个 队列模型 的消息中间件,具备 高性能、高牢靠、高实时、分布式 的特点。它是一个采纳 Java 语言开发的分布式的音讯零碎,由阿里巴巴团队开发,在 2016 年底奉献给 Apache,成为了 Apache 的一个顶级我的项目。在阿里外部,RocketMQ 很好地服务了团体大大小小上千个利用,在每年的双十一当天,更有不堪设想的万亿级音讯通过 RocketMQ 流转。

废话不多说,想要理解 RocketMQ 历史的同学能够本人去搜查材料。听完下面的介绍,你只有晓得 RocketMQ 很快、很牛、而且经验过双十一的实际就行了!

队列模型和主题模型

在谈 RocketMQ 的技术架构之前,咱们先来理解一下两个名词概念——队列模型 主题模型

首先我问一个问题,音讯队列为什么要叫音讯队列?

你可能感觉很弱智,这玩意不就是寄存音讯的队列嘛?不叫音讯队列叫什么?

确实,晚期的消息中间件是通过 队列 这一模型来实现的,可能是历史起因,咱们都习惯把消息中间件成为音讯队列。

然而,现在例如 RocketMQKafka 这些优良的消息中间件不仅仅是通过一个 队列 来实现音讯存储的。

队列模型

就像咱们了解队列一样,消息中间件的队列模型就真的只是一个队列。。。我画一张图给大家了解。

在一开始我跟你提到了一个 “播送” 的概念,也就是说如果咱们此时咱们须要将一个音讯发送给多个消费者(比方此时我须要将信息发送给短信零碎和邮件系统),这个时候单个队列即不能满足需要了。

当然你能够让 Producer 生产音讯放入多个队列中,而后每个队列去对应每一个消费者。问题是能够解决,创立多个队列并且复制多份音讯是会很影响资源和性能的。而且,这样子就会导致生产者须要晓得具体消费者个数而后去复制对应数量的音讯队列,这就违反咱们消息中间件的 解耦 这一准则。

主题模型

那么有没有好的办法去解决这一个问题呢?有,那就是 主题模型 或者能够称为 公布订阅模型

感兴趣的同学能够去理解一下设计模式外面的观察者模式并且手动实现一下,我置信你会有所播种的。

在主题模型中,音讯的生产者称为 发布者 (Publisher),音讯的消费者称为 订阅者 (Subscriber),寄存音讯的容器称为 主题(Topic)

其中,发布者将音讯发送到指定主题中,订阅者须要 提前订阅主题 能力承受特定主题的音讯。

RocketMQ 中的音讯模型

RockerMQ 中的音讯模型就是依照 主题模型 所实现的。你可能会好奇这个 主题 到底是怎么实现的呢?你下面也没有讲到呀!

其实对于主题模型的实现来说每个消息中间件的底层设计都是不一样的,就比方 Kafka 中的 分区 RocketMQ 中的 队列 RabbitMQ 中的 Exchange。咱们能够了解为 主题模型 / 公布订阅模型 就是一个规范,那些中间件只不过照着这个规范去实现而已。

所以,RocketMQ 中的 主题模型 到底是如何实现的呢?首先我画一张图,大家尝试着去了解一下。

咱们能够看到在整个图中有 Producer GroupTopicConsumer Group 三个角色,我来别离介绍一下他们。

  • Producer Group 生产者组:代表某一类的生产者,比方咱们有多个秒杀零碎作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们个别生产雷同的音讯。
  • Consumer Group 消费者组:代表某一类的消费者,比方咱们有多个短信零碎作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们个别生产雷同的音讯。
  • Topic 主题:代表一类音讯,比方订单音讯,物流音讯等等。

你能够看到图中生产者组中的生产者会向主题发送音讯,而 主题中存在多个队列,生产者每次生产音讯之后是指定主题中的某个队列发送音讯的。

每个主题中都有多个队列 (这里还不波及到 Broker),集群生产模式下,一个消费者集群多台机器独特生产一个 topic 的多个队列, 一个队列只会被一个消费者生产 。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者持续生产。就像上图中 Consumer1Consumer2 别离对应着两个队列,而 Consuer3 是没有队列对应的,所以一般来讲要管制 消费者组中的消费者个数和主题中队列个数雷同

当然也能够消费者个数小于队列个数,只不过不太倡议。如下图。

每个生产组在每个队列上保护一个生产地位,为什么呢?

因为咱们刚刚画的仅仅是一个消费者组,咱们晓得在公布订阅模式中个别会波及到多个消费者组,而每个消费者组在每个队列中的生产地位都是不同的。如果此时有多个消费者组,那么音讯被一个消费者组生产完之后是不会删除的 (因为其它消费者组也须要呀),它仅仅是为每个消费者组保护一个 生产位移(offset),每次消费者组生产完会返回一个胜利的响应,而后队列再把保护的生产位移加一,这样就不会呈现刚刚生产过的音讯再一次被生产了。

可能你还有一个问题,为什么一个主题中须要保护多个队列

答案是 进步并发能力 。确实,每个主题中只存在一个队列也是可行的。你想一下,如果每个主题中只存在一个队列,这个队列中也保护着每个消费者组的生产地位,这样也能够做到 公布订阅模式。如下图。

然而,这样我生产者是不是只能向一个队列发送音讯?又因为须要保护生产地位所以一个队列只能对应一个消费者组中的消费者,这样是不是其余的 Consumer 就没有用武之地了?从这两个角度来讲,并发度一下子就小了很多。

所以总结来说,RocketMQ 通过 应用在一个 Topic 中配置多个队列并且每个队列保护每个消费者组的生产地位 实现了 主题模式 / 公布订阅模式

RocketMQ 的架构图

讲完了音讯模型,咱们了解起 RocketMQ 的技术架构起来就容易多了。

RocketMQ 技术架构中有四大角色 NameServerBrokerProducerConsumer。我来向大家别离解释一下这四个角色是干啥的。

  • Broker:次要负责音讯的存储、投递和查问以及服务高可用保障。说白了就是音讯队列服务器嘛,生产者生产音讯到 Broker,消费者从 Broker 拉取音讯并生产。

    这里,我还得遍及一下对于 BrokerTopic 和 队列的关系。下面我解说了 Topic 和队列的关系——一个 Topic 中存在多个队列,那么这个 Topic 和队列寄存在哪呢?

    一个 Topic 散布在多个 Broker上,一个 Broker 能够配置多个 Topic,它们是多对多的关系

    如果某个 Topic 音讯量很大,应该给它多配置几个队列 (上文中提到了进步并发能力),并且 尽量多散布在不同 Broker 上,以加重某个 Broker 的压力

    Topic 音讯量都比拟平均的状况下,如果某个 broker 上的队列越多,则该 broker 压力越大。

    所以说咱们须要配置多个 Broker。

  • NameServer:不晓得你们有没有接触过 ZooKeeperSpring Cloud 中的 Eureka,它其实也是一个 注册核心 ,次要提供两个性能:Broker 治理 路由信息管理。说白了就是 Broker 会将本人的信息注册到 NameServer 中,此时 NameServer 就寄存了很多 Broker 的信息(Broker 的路由表),消费者和生产者就从 NameServer 中获取路由表而后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查问相干的 Broker 的信息)。
  • Producer:音讯公布的角色,反对分布式集群形式部署。说白了就是生产者。
  • Consumer:音讯生产的角色,反对分布式集群形式部署。反对以 push 推,pull 拉两种模式对音讯进行生产。同时也反对集群形式和播送形式的生产,它提供实时音讯订阅机制。说白了就是消费者。

听完了下面的解释你可能会感觉,这玩意好简略。不就是这样的么?

嗯?你可能会发现一个问题,这老家伙 NameServer 干啥用的,这不多余吗?间接 ProducerConsumerBroker 间接进行生产音讯,生产音讯不就好了么?

然而,咱们上文提到过 Broker 是须要保障高可用的,如果整个零碎仅仅靠着一个 Broker 来维持的话,那么这个 Broker 的压力会不会很大?所以咱们须要应用多个 Broker 来保障 负载平衡

如果说,咱们的消费者和生产者间接和多个 Broker 相连,那么当 Broker 批改的时候必定会株连着每个生产者和消费者,这样就会产生耦合问题,而 NameServer 注册核心就是用来解决这个问题的。

如果还不是很了解的话,能够去看我介绍 Spring Cloud 的那篇文章,其中介绍了 Eureka 注册核心。

当然,RocketMQ 中的技术架构必定不止后面那么简略,因为下面图中的四个角色都是须要做集群的。我给出一张官网的架构图,大家尝试了解一下。

其实和咱们最开始画的那张乞丐版的架构图也没什么区别,次要是一些细节上的差异。听我细细道来????。

第一、咱们的 Broker 做了集群并且还进行了主从部署 ,因为音讯散布在各个 Broker 上,一旦某个 Broker 宕机,则该Broker 上的音讯读写都会受到影响。所以 Rocketmq 提供了 master/slave 的构造,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,slave 提供生产服务,然而不能写入音讯 (前面我还会提到哦)。

第二、为了保障 HA,咱们的 NameServer 也做了集群部署,然而请留神它是 去中心化 的。也就意味着它没有主节点,你能够很显著地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过 单个 Broker 和所有 NameServer 放弃长连贯,并且在每隔 30 秒 Broker 会向所有 Nameserver 发送心跳,心跳蕴含了本身的 Topic 配置信息,这个步骤就对应这下面的 Routing Info

第三、在生产者须要向 Broker 发送音讯的时候,须要先从 NameServer 获取对于 Broker 的路由信息 ,而后通过 轮询 的办法去向每个队列中生产数据以达到 负载平衡 的成果。

第四、消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 申请来获取音讯数据。Consumer 能够以两种模式启动—— 播送(Broadcast)和集群(Cluster)。播送模式下,一条音讯会发送给 同一个生产组中的所有消费者,集群模式下音讯只会发送给一个消费者。

如何解决 程序生产、反复生产

其实,这些货色都是我在介绍音讯队列带来的一些副作用的时候提到的,也就是说,这些问题不仅仅挂钩于 RocketMQ,而是应该每个消息中间件都须要去解决的。

在下面我介绍 RocketMQ 的技术架构的时候我曾经向你展现了 它是如何保障高可用的,这里不波及运维方面的搭建,如果你感兴趣能够本人去官网上照着例子搭建属于你本人的 RocketMQ 集群。

其实 Kafka 的架构根本和 RocketMQ 相似,只是它注册核心应用了 Zookeeper、它的 分区 就相当于 RocketMQ 中的 队列。还有一些小细节不同会在前面提到。

程序生产

在下面的技术架构介绍中,咱们曾经晓得了 RocketMQ 在主题上是无序的、它只有在队列层面才是保障有序 的。

这又扯到两个概念——一般程序 严格程序

所谓一般程序是指 消费者通过 同一个生产队列收到的音讯是有程序的 ,不同音讯队列收到的音讯则可能是无程序的。一般程序音讯在 Broker 重启状况下不会保障音讯程序性 (短暂工夫)。

所谓严格程序是指 消费者收到的 所有音讯 均是有程序的。严格程序音讯 即便在异常情况下也会保障音讯的程序性

然而,严格程序看起来虽好,实现它可会付出微小的代价。如果你应用严格程序模式,Broker 集群中只有有一台机器不可用,则整个集群都不可用。你还用啥?当初次要场景也就在 binlog 同步。

一般而言,咱们的 MQ 都是能容忍短暂的乱序,所以举荐应用一般程序模式。

那么,咱们当初应用了 一般程序模式 ,咱们从下面学习晓得了在 Producer 生产音讯的时候会进行轮询(取决你的负载平衡策略) 来向同一主题的不同音讯队列发送音讯。那么如果此时我有几个音讯别离是同一个订单的创立、领取、发货,在轮询的策略下这 三个音讯会被发送到不同队列,因为在不同的队列此时就无奈应用 RocketMQ 带来的队列有序个性来保障音讯有序性了。

那么,怎么解决呢?

其实很简略,咱们须要解决的仅仅是将同一语义下的音讯放入同一个队列(比方这里是同一个订单),那咱们就能够应用 Hash 取模法 来保障同一个订单在同一个队列中就行了。

反复生产

emmm,就两个字—— 幂等 。在编程中一个 幂等 操作的特点是其任意屡次执行所产生的影响均与一次执行的影响雷同。比如说,这个时候咱们有一个订单的解决积分的零碎,每当来一个音讯的时候它就负责为创立这个订单的用户的积分加上相应的数值。可是有一次,音讯队列发送给订单零碎 FrancisQ 的订单信息,其要求是给 FrancisQ 的积分加上 500。然而积分零碎在收到 FrancisQ 的订单信息处理实现之后返回给音讯队列解决胜利的信息的时候呈现了网络稳定(当然还有很多种状况,比方 Broker 意外重启等等),这条回应没有发送胜利。

那么,音讯队列没收到积分零碎的回应会不会尝试重发这个音讯?问题就来了,我再发这个音讯,万一它又给 FrancisQ 的账户加上 500 积分怎么办呢?

所以咱们须要给咱们的消费者实现 幂等,也就是对同一个音讯的处理结果,执行多少次都不变。

那么如何给业务实现幂等呢?这个还是须要联合具体的业务的。你能够应用 写入 Redis 来保障,因为 Rediskeyvalue 就是人造反对幂等的。当然还有应用 数据库插入法,基于数据库的惟一键来保障反复数据不会被插入多条。

不过最次要的还是须要 依据特定场景应用特定的解决方案,你要晓得你的音讯生产是否是齐全不可反复生产还是能够忍耐反复生产的,而后再抉择强校验和弱校验的形式。毕竟在 CS 畛域还是很少有技术银弹的说法。

而在整个互联网畛域,幂等不仅仅实用于音讯队列的反复生产问题,这些实现幂等的办法,也同样实用于,在其余场景中来解决反复申请或者反复调用的问题 。比方将 HTTP 服务设计成幂等的, 解决前端或者 APP 反复提交表单数据的问题 ,也能够将一个微服务设计成幂等的,解决 RPC 框架主动重试导致的 反复调用问题

分布式事务

如何解释分布式事务呢?事务大家都晓得吧?要么都执行要么都不执行。在同一个零碎中咱们能够轻松地实现事务,然而在分布式架构中,咱们有很多服务是部署在不同零碎之间的,而不同服务之间又须要进行调用。比方此时我下订单而后减少积分,如果保障不了分布式事务的话,就会呈现 A 零碎下了订单,然而 B 零碎减少积分失败或者 A 零碎没有下订单,B 零碎却减少了积分。前者对用户不敌对,后者对运营商不利,这是咱们都不违心见到的。

那么,如何去解决这个问题呢?

现在比拟常见的分布式事务实现有 2PC、TCC 和事务音讯 (half 半音讯机制)。每一种实现都有其特定的应用场景,然而也有各自的问题, 都不是完满的解决方案

RocketMQ 中应用的是 事务音讯加上事务反查机制 来解决分布式事务问题的。我画了张图,大家能够对照着图进行了解。

在第一步发送的 half 音讯,它的意思是 在事务提交之前,对于消费者来说,这个音讯是不可见的

那么,如何做到写入音讯然而对用户不可见呢?RocketMQ 事务音讯的做法是:如果音讯是 half 音讯,将备份原音讯的主题与音讯生产队列,而后 扭转主题 为 RMQ_SYS_TRANS_HALF_TOPIC。因为生产组未订阅该主题,故生产端无奈生产 half 类型的音讯, 而后 RocketMQ 会开启一个定时工作,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取音讯进行生产,依据生产者组获取一个服务提供者发送回查事务状态申请,依据事务状态来决定是提交或回滚音讯。

你能够试想一下,如果没有从第 5 步开始的 事务反查机制,如果呈现网路稳定第 4 步没有发送胜利,这样就会产生 MQ 不晓得是不是须要给消费者生产的问题,他就像一个无头苍蝇一样。在 RocketMQ 中就是应用的上述的事务反查来解决的,而在 Kafka 中通常是间接抛出一个异样让用户来自行解决。

你还须要留神的是,在 MQ Server 指向零碎 B 的操作曾经和零碎 A 不相干了,也就是说在音讯队列中的分布式事务是——本地事务和存储音讯到音讯队列才是同一个事务 。这样也就产生了事务的 最终一致性 ,因为整个过程是异步的, 每个零碎只有保障它本人那一部分的事务就行了

音讯沉积问题

在下面咱们提到了音讯队列一个很重要的性能——削峰。那么如果这个峰值太大了导致音讯沉积在队列中怎么办呢?

其实这个问题能够将它狭义化,因为产生音讯沉积的本源其实就只有两个——生产者生产太快或者消费者生产太慢。

咱们能够从多个角度去思考解决这个问题,当流量到峰值的时候是因为生产者生产太快,咱们能够应用一些 限流降级 的办法,当然你也能够减少多个消费者实例去程度扩大减少生产能力来匹配生产的激增。如果消费者生产过慢的话,咱们能够先查看 是否是消费者呈现了大量的生产谬误,或者打印一下日志查看是否是哪一个线程卡死,呈现了锁资源不开释等等的问题。

当然,最疾速解决音讯沉积问题的办法还是减少消费者实例,不过 同时你还须要减少每个主题的队列数量

别忘了在 RocketMQ 中,一个队列只会被一个消费者生产,如果你仅仅是减少消费者实例就会呈现我一开始给你画架构图的那种状况。

回溯生产

回溯生产是指 Consumer 曾经生产胜利的音讯,因为业务上需要须要从新生产,在 RocketMQ 中,Broker 在向Consumer 投递胜利音讯后, 音讯依然须要保留。并且从新生产个别是依照工夫维度,例如因为 Consumer 系统故障,复原后须要从新生产 1 小时前的数据,那么 Broker 要提供一种机制,能够依照工夫维度来回退生产进度。RocketMQ 反对依照工夫回溯生产,工夫维度准确到毫秒。

这是官网文档的解释,我间接照搬过去就当科普了????????????。

RocketMQ 的刷盘机制

下面我讲了那么多的 RocketMQ 的架构和设计原理,你有没有好奇

Topic 中的 队列是以什么样的模式存在的?

队列中的音讯又是如何进行存储长久化的呢?

我在上文中提到的 同步刷盘 异步刷盘 又是什么呢?它们会给长久化带来什么样的影响呢?

上面我将给你们一一解释。

同步刷盘和异步刷盘

如上图所示,在同步刷盘中须要期待一个刷盘胜利的 ACK,同步刷盘对 MQ 音讯可靠性来说是一种不错的保障,然而 性能上会有较大影响,个别地实用于金融等特定业务场景。

而异步刷盘往往是开启一个线程去异步地执行刷盘操作。音讯刷盘采纳后盾异步线程提交的形式进行,升高了读写提早,进步了 MQ 的性能和吞吐量,个别实用于如发验证码等对于音讯保障要求不太高的业务场景。

个别地,异步刷盘只有在 Broker 意外宕机的时候会失落局部数据,你能够设置 Broker 的参数 FlushDiskType 来调整你的刷盘策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。

同步复制和异步复制

下面的同步刷盘和异步刷盘是在单个结点层面的,而同步复制和异步复制次要是指的 Borker 主从模式下,主节点返回音讯给客户端的时候是否须要同步从节点。

  • 同步复制:也叫“同步双写”,也就是说,只有音讯同步双写到主从结点上时才返回写入胜利
  • 异步复制:音讯写入主节点之后就间接返回写入胜利

然而,很多事件是没有完满的计划的,就比方咱们进行音讯写入的节点越多就更能保障音讯的可靠性,然而随之的性能也会降落,所以须要程序员依据特定业务场景去抉择适应的主从复制计划。

那么,异步复制会不会也像异步刷盘那样影响音讯的可靠性呢?

答案是不会的,因为两者就是不同的概念,对于音讯可靠性是通过不同的刷盘策略保障的,而像异步同步复制策略仅仅是影响到了 可用性 。为什么呢?其次要起因RocketMQ 是不反对主动主从切换的,当主节点挂掉之后,生产者就不能再给这个主节点生产音讯了

比方这个时候采纳异步复制的形式,在主节点还未发送完须要同步的音讯的时候主节点挂掉了,这个时候从节点就少了一部分音讯。然而此时生产者无奈再给主节点生产音讯了,消费者能够主动切换到从节点进行生产(仅仅是生产),所以在主节点挂掉的工夫只会产生主从结点短暂的音讯不统一的状况,升高了可用性,而当主节点重启之后,从节点那局部未来得及复制的音讯还会持续复制。

在单主从架构中,如果一个主节点挂掉了,那么也就意味着整个零碎不能再生产了。那么这个可用性的问题是否解决呢?一个主从不行那就多个主从的呗,别忘了在咱们最后的架构图中,每个 Topic 是散布在不同 Broker 中的。

然而这种复制形式同样也会带来一个问题,那就是无奈保障 严格程序。在上文中咱们提到了如何保障的音讯程序性是通过将一个语义的音讯发送在同一个队列中,应用 Topic 下的队列来保障程序性的。如果此时咱们主节点 A 负责的是订单 A 的一系列语义音讯,而后它挂了,这样其余节点是无奈代替主节点 A 的,如果咱们任意节点都能够存入任何音讯,那就没有程序性可言了。

而在 RocketMQ 中采纳了 Dledger 解决这个问题。他要求在写入音讯的时候,要求 至多音讯复制到半数以上的节点之后,才给客⼾端返回写⼊胜利,并且它是⽀持通过选举来动静切换主节点的。这里我就不开展阐明了,读者能够本人去理解。

也不是说 Dledger 是个完满的计划,至多在 Dledger 选举过程中是无奈提供服务的,而且他必须要应用三个节点或以上,如果少数节点同时挂掉他也是无奈保障可用性的,而且要求音讯复制板书以上节点的效率和间接异步复制还是有肯定的差距的。

存储机制

还记得下面咱们一开始的三个问题吗?到这里第三个问题曾经解决了。

然而,在 Topic 中的 队列是以什么样的模式存在的?队列中的音讯又是如何进行存储长久化的呢? 还未解决,其实这里波及到了 RocketMQ 是如何设计它的存储构造了。我首先想大家介绍 RocketMQ 音讯存储架构中的三大角色——CommitLogConsumeQueueIndexFile

  • CommitLog音讯主体以及元数据的存储主体 ,存储 Producer 端写入的音讯主体内容, 音讯内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,右边补零,残余为起始偏移量,比方 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。音讯次要是 程序写入日志文件,当文件满了,写入下一个文件。
  • ConsumeQueue:音讯生产队列,引入的目标次要是进步音讯生产的性能 (咱们再后面也讲了),因为RocketMQ 是基于主题 Topic 的订阅模式,音讯生产是针对主题进行的,如果要遍历 commitlog 文件中依据 Topic 检索音讯是十分低效的。Consumer 即可依据 ConsumeQueue 来查找待生产的音讯。其中,ConsumeQueue(逻辑生产队列) 作为生产音讯的索引 ,保留了指定 Topic 下的队列音讯在 CommitLog 中的 起始物理偏移量 offset ,音讯大小 size 和音讯 TagHashCode 值。consumequeue 文件能够看成是基于 topiccommitlog 索引文件 ,故 consumequeue 文件夹的组织形式如下:topic/queue/file 三层组织构造,具体存储门路为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,别离为 8 字节的 commitlog 物理偏移量、4 字节的音讯长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,能够像数组一样随机拜访每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
  • IndexFileIndexFile(索引文件)提供了一种能够通过 key 或工夫区间来查问音讯的办法。这里只做科普不做具体介绍。

总结来说,整个音讯存储的构造,最次要的就是 CommitLoqConsumeQueue。而 ConsumeQueue 你能够大略了解为 Topic 中的队列。

RocketMQ 采纳的是 混合型的存储构造,即为 Broker 单个实例下所有的队列共用一个日志数据文件来存储音讯。有意思的是在同样高并发的 Kafka 中会为每个 Topic 调配一个存储文件。这就有点相似于咱们有一大堆书须要装上书架,RockeMQ 是不分书的品种间接成批的塞下来的,而 Kafka 是将书本放入指定的分类区域的。

RocketMQ 为什么要这么做呢?起因是 进步数据的写入效率 ,不分 Topic 意味着咱们有更大的几率获取 成批 的音讯进行数据写入,但也会带来一个麻烦就是读取音讯的时候须要遍历整个大文件,这是十分耗时的。

所以,在 RocketMQ 中又应用了 ConsumeQueue 作为每个队列的索引文件来 晋升读取音讯的效率。咱们能够间接依据队列的音讯序号,计算出索引的全局地位(索引序号 * 索引固定⻓度 20),而后间接读取这条索引,再依据索引中记录的音讯的全局地位,找到音讯。

讲到这里,你可能对 RockeMQ 的存储架构还有些含糊,没事,咱们联合着图来了解一下。

emmm,是不是有一点简单????,看英文图片和英文文档的时候就不要怂,硬着头皮往下看就行。

如果下面没看懂的读者肯定要认真看上面的流程剖析!

首先,在最下面的那一块就是我刚刚讲的你当初能够间接 ConsumerQueue 了解为 Queue

在图中最右边阐明了 红色方块 代表被写入的音讯,虚线方块代表期待被写入的。右边的生产者发送音讯会指定 TopicQueueId 和具体音讯内容,而在 Broker 中管你是哪门子音讯,他间接 全副顺序存储到了 CommitLog 。而依据生产者指定的 TopicQueueId 将这条音讯自身在 CommitLog 的偏移(offset),音讯自身大小,和 tag 的 hash 值存入对应的 ConsumeQueue 索引文件中。而在每个队列中都保留了 ConsumeOffset 即每个消费者组的生产地位(我在架构那里提到了,忘了的同学能够回去看一下),而消费者拉取音讯进行生产的时候只须要依据 ConsumeOffset 获取下一个未被生产的音讯就行了。

上述就是我对于整个音讯存储架构的大略了解(这里不波及到一些细节探讨,比方稠密索引等等问题),心愿对你有帮忙。

因为有一个知识点因为写嗨了忘讲了,想想在哪里加也不好,所以我留给大家去思考一下吧。

为什么 CommitLog 文件要设计成固定大小的长度呢?揭示:内存映射机制

总结

总算把这篇博客写完了。我讲的你们还记得吗?????

这篇文章中我次要想大家介绍了

  1. 音讯队列呈现的起因
  2. 音讯队列的作用(异步,解耦,削峰)
  3. 音讯队列带来的一系列问题(音讯沉积、反复生产、程序生产、分布式事务等等)
  4. 音讯队列的两种音讯模型——队列和主题模式
  5. 剖析了 RocketMQ 的技术架构(NameServerBrokerProducerComsumer)
  6. 联合 RocketMQ 答复了音讯队列副作用的解决方案
  7. 介绍了 RocketMQ 的存储机制和刷盘策略。

等等。。。

正文完
 0