关于java:查缺补漏巩固你的RocketMQ知识体系

44次阅读

共计 15471 个字符,预计需要花费 39 分钟才能阅读完成。

Windows 装置部署

下载

地址:[https://www.apache.org/dyn/cl…]

抉择‘Binary’进行下载

解压已下载工程

配置

新增零碎变量
ROCKETMQ_HOME -> F:RocketMQrocketmq-4.5.2

JAVA_HOME -> F:Java_JDKJDK1.8

Path 零碎变量新增:Maven/bin 目录

PS:RocketMQ 音讯存储在 C:UsersAdministratorstore store 目录中 文件占用较大,留神删除不必要的内容

启动

start mqnamesrv.cmd

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

Rocket 集成可视化监控插件

  1. 任意目录(拉取我的项目,轻易哪里都行)git clone https://github.com/apache/roc…
  2. 进入‘rocketmq-externalsrocketmq-consolesrcmainresources’文件夹,关上‘application.properties’进行配置
  3. 其实就是一个 SpringBoot 服务,确定好端口,别反复即可

    server.port=8100

    rocketmq.config.namesrvAddr=127.0.0.1:9876

  4. 进入‘rocketmq-externalsrocketmq-console’文件夹

    执行‘mvn clean package -Dmaven.test.skip=true’,编译生成 target

    java -jar rocketmq-console-ng-1.0.1.jar

  5. 依据配置地址拜访:http://127.0.0.1:8100

Rocket 可视化监控插件 减少 Topic | 主动减少 Topic(4.5.2 版本)

4.5.2 版本 反对主动创立 Topic

4.3.0 版本 必须通过监控程序配置 Topic,否则执行程序报错,没有此路由

SpringBoot 集成 RocketMQ

<!--RocketMQ-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

RocketMQ 基本概念

<br/>

概览

基于 RocketMQ 的分布式系统,个别能够分为四个集群:Name server、broker、producer、consumer

  1. name server

    • 提供轻量级的服务发现和路由服务;
    • 每个节点都寄存了全副的路由信息和对应的读写服务;
    • 存储反对程度扩大
  2. broker

    • 提供满足 TOPIC 和 QUEUE 机制的音讯存储服务;
    • 有推和拉两种模式;
    • 通过 2 或 3 拷贝实现高可用;
    • 提供上亿音讯的沉积能力;
    • 提供故障复原、统计性能和告警性能;
  3. producer

    • 反对分布式部署,通过负载平衡模块给 broker 发消息
    • 反对疾速失败
    • 低提早
  4. consumer

    1. 反对推和拉两种模式
    2. 反对集群生产和播送生产

<br/>

外围模块

<br/>

Name Server

提供 Broker 治理;Routing 治理(路由治理)

NameServer,很多时候称为命名发现服务,其在 RocketMQ 中起着直达承接的作用,是一个无状态的服务,多个 NameServer 之间不通信。任何 Producer,Consumer,Broker 与所有 NameServer 通信,向 NameServer 申请或者发送数据。而且都是单向的,Producer 和 Consumer 申请数据,Broker 发送数据。正是因为这种单向的通信,RocketMQ 程度扩容变得很容易

  • 提供轻量级的服务发现和路由服务;
  • 每个节点都寄存了全副的路由信息和对应的读写服务;
  • 存储反对程度扩大

总结:相比于 ZooKeeper 提供的分布式锁,公布和订阅,数据一致性,选举等,在 RocketMQ 是不实用的,因而重写了一套更加轻量级的发现服务,次要用以存储 Broker 相干信息以及以后 Broker 上的 topic 信息,路由信息等

Broker Server

提供 Remoting Module、客户端治理、存储服务、HA 服务(主从)、索引服务

  • 提供满足 TOPIC 和 QUEUE 机制的音讯存储服务;
  • 有推和拉两种模式;
  • 通过 2 或 3 拷贝实现高可用;
  • 提供上亿音讯的沉积能力;
  • 提供故障复原、统计性能和告警性能;

producer

  • 反对分布式部署,通过负载平衡模块给 broker 发消息
  • 反对疾速失败
  • 低提早

consumer

  • 反对推和拉两种模式
  • 反对集群生产和播送生产

<br/>

外围角色介绍

<br/>

生产者

生产者发送业务零碎产生的音讯给 broker, RocketMQ 提供了多种发送形式:同步的、异步的、单向的

<br/>

生产者组

具备雷同角色的生产者被分到一组, 如果原始的生产者在事务后解体,broker 会分割 同一生产者组中的不同生产者实例,持续提交或回滚事务

<br/>

消费者

一个消费者从 broker 拉取信息,并将信息返还给利用。为了咱们利用的正确性,提供了两种消费者类型:

拉式消费者:拉式消费者从 broker 拉取音讯,一旦一批音讯被拉取,用户利用零碎将发动生产过程。

推式消费者:推式消费者,从另一方面讲,囊括了音讯的拉取、生产过程,并放弃了外部的其余工作,留下了一个回调 接口给终端用户去实现,实现在音讯达到时要执行的内容。

<br/>

消费者组

具备雷同角色的消费者被组在一起,称为消费者组,它实现了负载平衡和容错的指标

一个生产组中的消费者实例必须有确定的雷同的订阅 topic

<br/>

Topic(主题)

Topic 是一个音讯的目录,在这个目录中,生产者传送音讯,消费者拉取音讯,能够多个消费者订阅同一个 topic,一个生产者也能够发送多个 topic

PS:RocketMQ 基于公布订阅模式,公布订阅的外围即 Topic 主题

<br/>

Message(音讯)

音讯是被传递的信息。一个音讯必须有一个 Topic,它能够了解为函件上的地址。一个音讯也能够有一个可选的 tag,和额定的 key-value 对。例如:你能够设置业务中的键到你的音讯中,在 broker 服务中查找音讯,以便在开发期间诊断问题

<br/>

音讯队列

Topic 被宰割成一个或多个音讯队列。队列分为 3 中角色:异步主、同步主、从。如果你不能容忍音讯失落,咱们倡议你部署同步主,并加一个从队列。如果你容忍失落,但你心愿队列总是可用,你能够部署异步主和从队列。如果你想最简略,你只须要一个异步主,不须要从队列。音讯保留磁盘的形式也有两种,举荐应用的是异步保留,同步保留是低廉的并会导致性能损失,如果你想要可靠性,咱们举荐你应用同步主 + 从的形式。

<br/>

Tag(标签)

标签,用另外一个词来说,就是子主题,为用户提供额定的灵活性。具备雷同 Topic 的音讯能够有不同的 tag。

<br/>

Broker(队列)

Broker 是 RocketMQ 的一个次要组件,它接管生产者发送的音讯,存储它们并筹备解决消费者的拉取申请。它也存储音讯相干的元数据,包含生产组,生产胜利的偏移量,主题、队列的信息。

<br/>

名称服务

名称服务次要提供路由信息。生产者 / 消费者客户端寻找 topic,并找到通信的队列列表。

<br/>

音讯的程序

DefaultMQPushConsumer 被应用,你就要决定生产音讯时,是程序生产还是同时生产

  • 程序生产

程序生产音讯的意思是 音讯将依照生产者发送到队列时的程序被生产掉。如果你被强制要求应用全局的程序,你要确保你的 topic 只有一个音讯队列。

如果指定程序生产,音讯被同时生产的数量就是订阅这个 topic 的生产组的数量。

  • 同时生产

当同时生产音讯时,音讯同时生产的最大数量取决于生产客户端指定的线程池的大小。

<br/>

最佳实际

Producer 最佳实际
  1. 一个利用尽可能用一个 Topic,音讯子类型用 tags 来标识,tags 能够由利用自在设置。只有发送音讯设置了 tags,生产方在订阅音讯时,才能够利用 tags 在 broker 做音讯过滤。
  2. 每个音讯在业务层面的惟一标识码,要设置到 keys 字段,不便未来定位音讯失落问题。因为是哈希索引,请务必保障 key 尽可能惟一,这样能够防止潜在的哈希抵触。

    音讯发送胜利或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

  3. 对于音讯不可失落利用,务必要有音讯重发机制。例如:音讯发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
  4. 某些利用如果不关注音讯是否发送胜利,请间接应用 sendOneWay 办法发送音讯。
Consumer 最佳实际
  1. 生产过程要做到幂等(即生产端去重)
  2. 尽量应用批量形式生产形式,能够很大水平上进步生产吞吐量。
  3. 优化每条音讯生产过程

MQ 外围问题

<br/>

1. 音讯队列适宜解决的问题

解决的外围问题次要是:异步、解耦、削峰

然而引入音讯队列也会有很多额定的问题,比方零碎复杂性会大大增加,同时须要解决反复下发,反复生产,生产程序,音讯失落,重试机制等等问题,因而不能滥用,适合的场景用适合的技术

<br/>

2. 音讯模型:主题和队列的区别

一、音讯队列的演进

1、初始阶段

最后的音讯队列,就是一个严格意义上的队列。队列是一种数据结构,先进先出,在音讯入队出队过程中,保障这些音讯严格有序。晚期的音讯队列就是依照“队列”的数据结构设计的

队列模型:

生产者(Producer)发消息就是入队操作,消费者(Consumer)收音讯就是出队也就是删除操作,服务端寄存音讯的容器天然就称为“队列”。

  • 如果有多个 生产者往同一个队列外面发送 音讯,这个队列中能够生产到的音讯,就是这些生产者生产的所有音讯的合集。音讯的程序就是这些生产者 发送音讯的天然程序
  • 如果有 多个消费者接管同一个队列 的音讯,这些消费者之间实际上是 竞争的关系 ,每个消费者只能收到队列中的一部分音讯,也就是说任何 一条音讯只能被其中的一个消费者收到

2、公布 – 订阅模型阶段

如果须要将 一份音讯数据分发给多个消费者 ,要求 每个消费者都能收到全量的音讯,例如,对于一份订单数据,风控系统、剖析零碎、领取零碎等都须要接管音讯。

这个时候,单个队列就满足不了需要,一个可行的解决形式是,为 每个消费者创立一个独自的队列,让生产者发送多份 。然而同样的一份音讯数据被复制到多个队列中会 浪费资源 ,更重要的是,生产者必须晓得有多少个消费者。为每个消费者独自发送一份音讯,这实际上 违反了音讯队列“解耦”这个设计初衷。

为了解决这个问题,演化出了另外一种音讯模型:公布 – 订阅模型(Publish-Subscribe Pattern)

音讯的发送方称为发布者(Publisher),音讯的接管方称为订阅者(Subscriber),服务端寄存音讯的容器称为主题(Topic)。

  • 发布者将音讯发送到主题中,订阅者在接管音讯之前须要先“订阅主题”。
  • 每份订阅中,订阅者都能够接管到主题的所有音讯。

3、总结:

  • 在很长的一段时间,队列模式和公布 – 订阅模式是并存的。
  • 有些音讯队列同时反对这两种音讯模型,比方 ActiveMQ。
  • 比照这两种模型,生产者就是发布者,消费者就是订阅者,队列就是主题,并没有实质的区别。它们最大的区别是:一份音讯数据能不能被生产屡次的问题
  • 实际上,在这种公布 – 订阅模型中,如果只有一个订阅者,那它和队列模型就根本是一样的了。也就是说,公布 – 订阅模型在性能层面上是能够兼容队列模型的。

二、RabbitMQ 的音讯模型

多数仍然保持 应用队列模型 的产品之一。

RabbitMQ 应用 Exchange 模块解决多个消费者的问题 。Exchange 位于生产者和队列之间,生产者并不关怀将音讯发送给哪个队列,而是 将音讯发送给 Exchange,由 Exchange 上 配置的策略 来决定将音讯投递到哪些队列中。

  • 同一份音讯如果须要被多个消费者来生产,须要 配置 Exchange 将音讯发送到多个队列 ,每个队列中都 寄存一份残缺的音讯数据,能够为一个消费者提供生产服务。

三、RocketMQ 的音讯模型

RocketMQ 应用的音讯模型是规范的 公布 – 订阅模型。在 RocketMQ 也有队列(Queue)这个概念。

音讯队列的生产机制:

简直所有的音讯队列产品都应用一种十分奢侈的“申请 – 确认”机制,确保音讯不会在传递过程中因为网络或服务器故障失落。

在生产端,生产者先将音讯发送给服务端,也就是 Broker,服务端在收到音讯并将音讯写入主题或者队列中后,会 给生产者发送确认的响应 。如果生产者没有收到服务端的确认或者收到失败的响应,则会 从新发送音讯

在生产端,消费者在收到音讯并实现本人的生产业务逻辑(比方,将数据保留到数据库中)后,也会 给服务端发送生产胜利的确认 ,服务端只有收到生产确认后,才认为一条音讯被胜利生产,否则它会给消费者 从新发送这条音讯,直到收到对应的生产胜利确认。

这个确认机制很好地 保障了消息传递过程中的可靠性 ,然而,引入这个机制在生产端带来了一个问题: 为了确保音讯的有序性,在某一条音讯被胜利生产之前,下一条音讯是不能被生产的 ,也就是说,每个主题在任意时刻, 至少只能有一个消费者实例在进行生产 ,那就 没法通过程度扩大消费者的数量来晋升生产端总体的生产性能

为了解决这个问题,RocketMQ 在主题上面减少了队列的概念:

  • 每个主题蕴含多个队列,通过多个队列来实现多实例并行生产和生产。须要留神的是,RocketMQ 只在队列上保障音讯的有序性,主题层面是无奈保障音讯的严格程序的。
  • 生产者会往所有队列发消息 ,但不是“同一条音讯每个队列都发一次”, 每条音讯只会往某个队列外面发送一次
  • 一个生产组,每个队列上只能串行生产,多个队列加一起就是并行生产了 ,并行度就是队列数量,队列数量越多并行度越大,所以程度扩大能够 晋升生产性能。
  • 每队列每生产组保护一个生产地位(offset),记录这个生产组在这个队列上生产到哪儿了。
  • 订阅者是通过生产组(Consumer Group)来体现的。每个生产组都生产主题中一份残缺的音讯,不同生产组之间生产进度彼此不受影响,也就是说,一条音讯被 Consumer Group1 生产过,也会再给 Consumer Group2 生产。
  • 生产组中蕴含多个消费者,同一个组内的消费者是竞争生产的关系,每个消费者负责生产组内的一部分音讯。如果一条音讯被消费者 Consumer1 生产了,那同组的其余消费者就不会再收到这条音讯。
  • 因为音讯须要被不同的组进行屡次生产,所以生产完的音讯并不会立刻被删除,这就须要 RocketMQ 为每个生产组在每个队列上保护一个生产地位(Consumer Offset),这个地位之前的音讯都被生产过,之后的音讯都没有被生产过,每胜利生产一条音讯,生产地位就加一。咱们在应用音讯队列的时候,丢音讯的起因大多是因为生产地位处理不当导致的

四、Kafka 的音讯模型

Kafka 的音讯模型和 RocketMQ 是齐全一样的,惟一的区别是,在 Kafka 中,队列这个概念的名称不一样,Kafka 中对应的名称是“分区(Partition)”,含意和性能是没有任何区别的。

五、总结

  • 罕用的音讯队列中,RabbitMQ 采纳的是队列模型,然而它一样能够实现公布 – 订阅的性能。RocketMQ 和 Kafka 采纳的是公布 – 订阅模型,并且二者的音讯模型是基本一致的。

<br/>

3. 音讯失落怎么办? 如何保障音讯的可靠性传输?

首先如何验证音讯是否失落?

  • 如果是 IT 基础设施比较完善的公司,个别都有分布式链路追踪零碎,应用相似的追踪零碎能够很不便地追踪每一条音讯。
  • 如果没有这样的追踪零碎,咱们能够利用音讯队列的有序性来验证是否有音讯失落

即保障音讯生产程序的状况下,依据音讯的序号,在生产段判断是否间断

解决方案:

音讯从生产到生产的过程中,能够划分三个阶段:

1、生产阶段

音讯队列通过最罕用的 申请确认机制,来保障音讯的牢靠传递:当你代码调用发消息办法时,音讯队列客户端会把音讯发送到 Broker,Broker 收到音讯后,会给客户端返回一个确认响应,表明音讯已收到。客户端收到响应后,实现了一次失常音讯的发送。

有些音讯队列在长时间没收到发送确认响应后,会主动重试,如果重试失败,就会以返回值或者异样的形式告知用户。在编写发送音讯的代码时,须要留神,正确处理返回值或者捕捉异样,就能够保障这个阶段的音讯不会失落。

同步发送时,只有留神捕捉异样即可。

异步发送时,则须要在回调办法里进行查看。这个中央须要特地留神,很多丢音讯的起因就是,咱们应用了异步发送,却没有在回调中查看发送后果。

2、存储阶段

在存储阶段失常状况下,只有 Broker 在失常运行,就不会呈现丢音讯的问题;然而如果 Broker 呈现故障,比方过程死掉或者服务器宕机,还是可能会失落音讯的。

如果对音讯的可靠性要求十分高,能够通过配置 Broker 参数来防止因为宕机丢音讯:

  • 对于单个节点的 Broker,须要配置 Broker 参数,在收到音讯后,将音讯写入磁盘后再给 Producer 返回确认响应,这样即便产生宕机,因为音讯曾经被写入磁盘,就不会失落音讯,复原后还能够持续生产。例如,在 RocketMQ 中,须要将刷盘形式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。
  • 对于 Broker 是由多个节点组成的集群,须要将 Broker 集群配置成:至多将音讯发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其余的 Broker 能够代替宕机的 Broker,也不会产生音讯失落。

3、音讯阶段

生产阶段采纳和生产阶段相似的 确认机制 来保障音讯的牢靠传递,客户端从 Broker 拉取音讯后,执行用户的生产业务逻辑,胜利后,才会给 Broker 发送生产确认响应。如果 Broker 没有收到生产确认响应,下次拉音讯的时候还会返回同一条音讯,确保音讯不会在网络传输过程中失落,也不会因为客户端在执行生产逻辑中出错导致失落。

在编写生产代码时须要留神的是:不要在收到音讯后就立刻发送生产确认,而是应该 在执行完所有生产业务逻辑之后,再发送生产确认

<br/>

4. 解决生产过程中的反复音讯

在消息传递过程中,如果呈现 传递失败 的状况,发送方会 执行重试 ,重试过程中就有可能 产生反复的音讯。如果没有对反复音讯进行解决,就可能导致系统的数据呈现谬误。

比方,一个生产订单音讯,统计下单金额的微服务,如果没有正确处理反复音讯,那就会呈现反复统计,导致统计后果谬误。

一、音讯反复的状况必然存在

在 MQTT 协定中,给出了三种传递音讯时可能提供的服务质量规范:

  • At most once:至少一次。最多会被送达一次,也就是说没有音讯可靠性保障,容许丢音讯。个别都是一些对音讯可靠性要求不高的监控场景应用,比方每分钟上报一次机房温度数据,能够承受数据大量失落。
  • At least once:至多一次。至多会被送达一次,也就是说 不容许丢音讯 ,然而 容许有大量反复音讯呈现
  • Exactly once:恰好一次。只会被送达一次,不容许失落也不容许反复,这个是最高等级。

这个服务质量规范不仅实用于 MQTT,对所有的音讯队列都是实用的。罕用的 绝大部分音讯队列提供的服务质量都是 At least once,包含 RocketMQ、RabbitMQ 和 Kafka。也就是说,音讯队列很难保障音讯不反复。

留神:Kafka 反对的“Exactly once”和咱们刚刚提到的消息传递的服务质量规范“Exactly once”是不一样的,它是 Kafka 提供的另外一个个性,Kafka 中反对的事务也和咱们通常意义了解的事务有肯定的差别。在 Kafka 中,事务和 Excactly once 次要是为了配合流计算应用的个性。

二、用幂等性解决反复音讯问题

幂等原本是一个数学上的概念,它的定义是:如果一个函数 f(x)满足:f(f(x)) = f(x),则函数 f(x)满足米幂等性。扩大到计算机领域,被用来形容一个操作、办法或者服务。

  • 一个幂等操作的特点是,其 任意屡次执行所产生的影响均与一次执行的影响雷同
  • 一个幂等办法,应用同样的参数,对它进行屡次调用和一次调用,对系统产生的影响是一样的。所以不必放心反复执行会对系统造成任何扭转。

举例:

1、在不思考并发的状况下,“将账户 X 的余额设置为 100 元”,执行一次后对系统的影响是,账户 X 的余额变成了 100 元。只有提供的参数 100 元不变,那即便再执行多少次,账户 X 的余额始终都是 100 元,不会变动,这个操作就是一个幂等的操作。

2、“将账户 X 的余额加 100 元”,这个操作它就不是幂等的,每执行一次,账户余额就会减少 100 元,执行屡次和执行一次对系统的影响(也就是账户的余额)是不一样的。

如果生产音讯的业务逻辑具备幂等性,那就不必放心音讯反复的问题,因为同一条音讯,生产一次和生产屡次对系统的影响是齐全一样的。生产屡次等于生产一次。从对系统的影响后果来说:At least once + 幂等生产 = Exactly once。

实现幂等操作最好的形式是,从业务逻辑设计上动手,将生产的业务逻辑设计成具备幂等性的操作

罕用的设计幂等操作的办法

(1)利用 数据库的惟一束缚 实现幂等

下面提到的那个不具备幂等个性的转账的例子:将账户 X 的余额加 100 元。在这个例子中,咱们能够通过革新业务逻辑,让它具备幂等性。

首先,咱们能够限定,对于每个转账单每个账户只能够执行一次变更操作,在分布式系统中,这个限度实现的办法十分多,最简略的是咱们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,而后给转账单 ID 和账户 ID 这两个字段联结起来创立一个惟一束缚,这样对于雷同的转账单 ID 和账户 ID,表里至少只能存在一条记录。

这样,咱们生产音讯的逻辑能够变为:“在转账流水表中 减少一条转账记录,而后再依据转账记录,异步操作更新用户余额即可 。”在转账流水表减少一条转账记录这个操作中,因为咱们在这个表中事后定义了“账户 ID 转账单 ID”的惟一束缚, 对于同一个转账单同一个账户只能插入一条记录,后续反复的插入操作都会失败,这样就实现了一个幂等的操作。

基于这个思路,不光是能够应用关系型数据库,只有是反对相似“INSERT IF NOT EXIST”语义的存储类零碎都能够用于实现幂等,比方,你能够用 Redis 的 SETNX 命令来代替数据库中的惟一束缚,来实现幂等生产。

(2)为更新的数据设置前置条件

给数据变更设置一个前置条件,如果满足条件就更新数据,否则回绝更新数据,在更新数据的时候,同时变更前置条件中须要判断的数据。这样,反复执行这个操作时,因为第一次更新数据的时候曾经变更了前置条件中须要判断的数据,不满足前置条件,则不会反复执行更新数据操作。

比方,“将账户 X 的余额减少 100 元”这个操作并不满足幂等性,咱们能够把这个操作加上一个前置条件,变为:“如果账户 X 以后的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到音讯队列中的应用时,能够在发消息时在音讯体中带上以后的余额,在生产的时候进行判断数据库中,以后余额是否与音讯中的余额相等,只有相等才执行变更操作。

然而,如果咱们要更新的数据不是数值,或者咱们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的办法是,给你的 数据减少一个版本号属性 ,每次更数据前, 比拟以后数据的版本号是否和音讯中的版本号统一 ,如果不统一就回绝更新数据, 更新数据的同时将版本号 +1,一样能够实现幂等更新。

(3)记录并查看操作

如果下面提到的两种实现幂等办法都不能实用于你的场景,还有一种通用性最强,适用范围最广的实现幂等性办法:记录并查看操作,也称为“Token 机制或者 GUID(全局惟一 ID)机制”,实现的思路特地简略:在执行数据更新操作之前,先检查一下是否执行过这个更新操作 。这种办法适用范围最广,然而实 现难度和复杂度也比拟高,个别不举荐应用

具体的实现办法是,在发送音讯时,给每条音讯 指定一个全局惟一的 ID,生产时,先依据这个 ID 查看这条音讯是否有被生产过,如果没有生产过,才更新数据,而后 将生产状态置为已生产

在分布式系统中,这个办法其实是十分难实现的。首先,给每个音讯指定一个全局惟一的 ID 就是一件不那么简略的事儿,办法有很多,但都不太好同时满足简略、高可用和高性能,或多或少都要有些就义。更加麻烦的是,在“查看生产状态,而后更新数据并且设置生产状态”中,三个操作必须作为一组操作保障原子性,能力真正实现幂等,否则就会呈现 Bug。

比如说,对于同一条音讯:“全局 ID 为 8,操作为:给 ID 为 666 账户减少 100 元”,有可能呈现这样的状况:

  • t0 时刻:Consumer A 收到条音讯,查看音讯执行状态,发现音讯未解决过,开始执行“账户减少 100 元”;
  • t1 时刻:Consumer B 收到条音讯,查看音讯执行状态,发现音讯未解决过,因为这个时刻,Consumer A 还未来得及更新音讯执行状态。

这样就会导致账户被谬误地减少了两次 100 元,这是一个在分布式系统中非常容易犯的谬误,肯定要引以为戒。对于这个问题,当然咱们能够用事务来实现,也能够用锁来实现,然而在分布式系统中,无论是分布式事务还是分布式锁都是比拟难解决问题。

<br/>

5. 利用事务音讯实现分布式事务

一、音讯事务

其实很多场景下,咱们“发消息”这个过程,目标往往是 告诉另外一个零碎或者模块去更新数据 ,音讯队列中的“事务”,次要解决 音讯生产者和音讯消费者的数据一致性问题

用户在电商 APP 上购物时,先把商品加到购物车里,而后几件商品一起下单,最初领取,实现购物流程。

这个过程中有一个须要用到音讯队列的步骤,订单零碎创立订单后,发消息给购物车零碎,将已下单的商品从购物车中删除。因为从购物车删除已下单商品这个步骤,并不是用户下单领取这个次要流程中必要的步骤,应用音讯队列来异步清理购物车是更加正当。

对于订单零碎,它创立订单的过程理论执行了 2 个步骤的操作:

  • 在订单库中插入一条订单数据,创立订单;
  • 发消息给音讯队列,音讯的内容就是刚刚创立的订单

对于购物车零碎:

  • 订阅相应的主题,接管订单创立的音讯,而后清理购物车,在购物车中删除订单的商品。

在分布式系统中,下面提到的步骤,任何一个都有可能失败,如果不做任何解决,那就有可能呈现订单数据与购物车数据不统一的状况,比方:

  • 创立了订单,没有清理购物车;
  • 订单没创立胜利,购物车外面的商品却被清掉了。

所以咱们须要解决的问题为:在上述任意步骤都有可能失败的状况下,还要保障订单库和购物车库这两个库的数据一致性。

二、分布式事务

分布式事务就是要在分布式系统中实现事务。在分布式系统中,在保障可用性和不重大就义性能的前提下,光是要 实现数据的一致性就曾经十分艰难了 ,显然实现严格的分布式事务是更加不可能实现的工作。所以目前大家所说的分布式事务,更多状况下,是在 分布式系统中事务的不残缺实现,在不同的利用场景中,有不同的实现,目标都是通过一些斗争来解决理论问题。

常见的分布式事务实现:

  • 2PC(Two-phase Commit,也叫二阶段提交)
  • TCC(Try-Confirm-Cancel)
  • 事务音讯

每一种实现都有其特定的应用场景,也有各自的问题,都不是完满的解决方案。

事务音讯实用的场景 次要是那些须要 异步更新数据 ,并且 对数据实时性要求不太高 的场景。比方在创立订单后,如果呈现短暂的几秒,购物车里的商品没有被及时状况,也不是齐全不可承受的,只有最终购物车的数据和订单数据保持一致就可。

三、音讯队列实现分布式事务

事务音讯须要音讯队列提供相应的性能能力实现,kafka 和 RocketMQ 都提供了事务相干性能。

对于订单零碎:

  • 首先,订单零碎在音讯队列上开启一个事务。
  • 而后订单零碎给音讯服务器发送一个“半音讯”,这个半音讯不是说音讯内容不残缺,它蕴含的内容就是残缺的音讯内容,半音讯和一般音讯的惟一区别是,在事务提交之前,对于消费者来说,这个音讯是不可见的。
  • 半音讯发送胜利后,订单零碎就能够执行本地事务了,在订单库中创立一条订单记录,并提交订单库的数据库事务。
  • 而后依据本地事务的执行后果决定提交或者回滚事务音讯。如果订单创立胜利,那就提交事务音讯,购物车零碎就能够生产到这条音讯持续后续的流程。如果订单创立失败,那就回滚事务音讯,购物车零碎就不会收到这条音讯。这样就根本实现了“要么都胜利,要么都失败”的一致性要求。

对于购物车零碎:

  • 对于购物车零碎收到订单创立胜利音讯清理购物车这个操作来说,失败的解决比较简单,只有胜利执行购物车清理后再提交生产确认即可 ,如果失败, 因为没有提交生产确认,音讯队列会主动重试

如果在第四步提交事务音讯时失败了怎么办?Kafka 和 RocketMQ 给出了 2 种不同的解决方案:

1、Kafka 的解决方案:

间接抛出异样,让用户自行处理。咱们能够在业务代码中重复重试提交,直到提交胜利,或者删除之前创立的订单进行弥补。

2、RocketMQ 的解决方案:

在 RocketMQ 中的事务实现中,减少了 事务反查的机制 来解决事务音讯提交失败的问题。如果 Producer 也就是订单零碎,在提交或者回滚事务音讯时产生网络异样,RocketMQ 的 Broker 没有收到提交或者回滚的申请,Broker 会定期去 Producer 上 反查这个事务对应的本地事务的状态 ,而后依据反查后果决定提交或者回滚这个事务。为了撑持这个事务反查机制,咱们的业务代码须要 实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是胜利还是失败。

综合下面讲的通用事务音讯的实现和 RocketMQ 的事务反查机制,应用 RocketMQ 事务音讯性能实现分布式事务的流程 如下图:

<br/>

6. 音讯队列中的程序问题

当咱们说程序时,咱们在说什么?

日常思维中,程序大部分状况会和工夫关联起来,即工夫的先后示意事件的程序关系。

比方事件 A 产生在下午 3 点一刻,而事件 B 产生在下午 4 点,那么咱们认为事件 A 产生在事件 B 之前,他们的程序关系为先 A 后 B。

下面的例子之所以成立是因为他们有雷同的参考系,即他们的工夫是对应的同一个物理时钟的工夫。如果 A 产生的工夫是北京工夫,而 B 依赖的工夫是东京工夫,那么先 A 后 B 的程序关系还成立吗?

如果没有一个相对的工夫参考,那么 A 和 B 之间还有程序吗,或者说怎么判定 A 和 B 的程序?

不言而喻的,如果 A、B 两个事件之间如果是有因果关系的,那么 A 肯定产生在 B 之前(前因后果,有因才有果)。相同,在没有一个相对的工夫的参考的状况下,若 A、B 之间没有因果关系,那么 A、B 之间就没有程序关系。

那么,咱们在说程序时,其实说的是:

  • 有相对工夫参考的状况下,事件的产生工夫的关系;
  • 和没有工夫参考下的,一种由因果关系推断进去的 happening before 的关系;

在分布式环境中探讨程序

当把程序放到分布式环境(多线程、多过程都能够认为是一个分布式的环境)中去探讨时:

  • 同一线程上的事件程序是确定的,能够认为他们有雷同的工夫作为参考
  • 不同线程间的程序只能通过因果关系去推断

(点示意事件,波浪线箭头示意事件间的音讯)

上图中,过程 P 中的事件程序为 p1->p2->p3->p4(工夫推断)。而因为 p1 给过程 Q 的 q2 发了音讯,那么 p1 肯定在 q2 之前(因果推断)。然而无奈确定 p1 和 q1 之间的程序关系。

举荐浏览《Time, Clocks, and the Ordering of Events in a Distributed System》,会透彻的剖析分布式系统中的程序问题。

消息中间件中的程序音讯

什么是程序音讯

有了上述的根底之后,咱们回到本篇文章的主题中,聊一聊消息中间件中的程序音讯。

程序音讯(FIFO 音讯)是 MQ 提供的一种严格依照程序进行公布和生产的音讯类型。程序音讯由两个局部组成:程序公布和程序生产。

程序音讯蕴含两种类型:

分区程序:一个 Partition 内所有的音讯依照先进先出的程序进行公布和生产

全局程序:一个 Topic 内所有的音讯依照先进先出的程序进行公布和生产

这是阿里云上对程序音讯的定义,把程序音讯拆分成了程序公布和程序生产。那么多线程中发送音讯算不算程序公布?

如上一部分介绍的,多线程中若没有因果关系则没有程序。那么用户在多线程中去发消息就意味着用户不关怀那些在不同线程中被发送的音讯的程序。即多线程发送的音讯,不同线程间的音讯不是程序公布的,同一线程的音讯是程序公布的。这是须要用户本人去保障的。

而对于程序生产,则须要保障哪些来自同一个发送线程的音讯在生产时是依照雷同的程序被解决的(为什么不说他们应该在一个线程中被生产呢?)。

全局程序其实是分区程序的一个特例,即便 Topic 只有一个分区(以下不在探讨全局程序,因为全局程序将面临性能的问题,而且绝大多数场景都不须要全局程序)。

如何保障程序

在 MQ 的模型中,程序须要由 3 个阶段去保障:

  1. 音讯被发送时放弃程序
  2. 音讯被存储时放弃和发送的程序统一
  3. 音讯被生产时放弃和存储的程序统一

发送时放弃程序意味着对于有程序要求的音讯,用户应该在同一个线程中采纳同步的形式发送。存储放弃和发送的程序统一则要求在同一线程中被发送进去的音讯 A 和 B,存储时在空间上 A 肯定在 B 之前。而生产放弃和存储统一则要求音讯 A、B 达到 Consumer 之后必须依照先 A 后 B 的程序被解决。

如下图所示:

对于两个订单的音讯的原始数据:a1、b1、b2、a2、a3、b3(相对工夫下产生的程序):

  • 在发送时,a 订单的音讯须要放弃 a1、a2、a3 的程序,b 订单的音讯也雷同,然而 a、b 订单之间的音讯没有程序关系,这意味着 a、b 订单的音讯能够在不同的线程中被发送进来
  • 在存储时,须要别离保障 a、b 订单的音讯的程序,然而 a、b 订单之间的音讯的程序能够不保障
    • a1、b1、b2、a2、a3、b3 是能够承受的
    • a1、a2、b1、b2、a3、b3 也是能够承受的
    • a1、a3、b1、b2、a2、b3 是不能承受的
  • 生产时保障程序的简略形式就是“什么都不做”,不对收到的音讯的程序进行调整,即只有一个分区的音讯只由一个线程解决即可;当然,如果 a、b 在一个分区中,在收到音讯后也能够将他们拆分到不同线程中解决,不过要衡量一下收益

开源 RocketMQ 中程序的实现

上图是 RocketMQ 程序音讯原理的介绍,将不同订单的音讯路由到不同的分区中。文档只是给出了 Producer 程序的解决,Consumer 生产时通过一个分区只能有一个线程生产的形式来保障音讯程序,具体实现如下。

Producer 端

Producer 端确保音讯程序惟一要做的事件就是将音讯路由到特定的分区,在 RocketMQ 中,通过 MessageQueueSelector 来实现分区的抉择。

  • List<MessageQueue> mqs:音讯要发送的 Topic 下所有的分区
  • Message msg:音讯对象
  • 额定的参数:用户能够传递本人的参数

比方如下实现就能够保障雷同的订单的音讯被路由到雷同的分区:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

Consumer 端

RocketMQ 生产端有两种类型:MQPullConsumer 和 MQPushConsumer。

MQPullConsumer 由用户控制线程,被动从服务端获取音讯,每次获取到的是一个 MessageQueue 中的音讯。PullResult 中的 List msgFoundList 天然和存储程序统一,用户须要再拿到这批音讯后本人保障生产的程序。

对于 PushConsumer,由用户注册 MessageListener 来生产音讯,在客户端中须要保障调用 MessageListener 时音讯的程序性。RocketMQ 中的实现如下:

  1. PullMessageService 单线程的从 Broker 获取音讯
  2. PullMessageService 将音讯增加到 ProcessQueue 中(ProcessMessage 是一个音讯的缓存),之后提交一个生产工作到 ConsumeMessageOrderService
  3. ConsumeMessageOrderService 多线程执行,每个线程在生产音讯时须要拿到 MessageQueue 的锁
  4. 拿到锁之后从 ProcessQueue 中获取音讯

保障生产程序的核心思想是:

  • 获取到音讯后增加到 ProcessQueue 中,单线程执行,所以 ProcessQueue 中的音讯是程序的
  • 提交的生产工作时提交的是“对某个 MQ 进行一次生产”,这次生产申请是从 ProcessQueue 中获取音讯生产,所以也是程序的(无论哪个线程获取到锁,都是依照 ProcessQueue 中音讯的程序进行生产)

程序和异样的关系

程序音讯须要 Producer 和 Consumer 都保障程序。Producer 须要保障音讯被路由到正确的分区,音讯须要保障每个分区的数据只有一个线程音讯,那么就会有一些缺点:

  • 发送程序音讯无奈利用集群的 Failover 个性,因为不能更换 MessageQueue 进行重试
  • 因为发送的路由策略导致的热点问题,可能某一些 MessageQueue 的数据量特地大
  • 生产的并行读依赖于分区数量
  • 生产失败时无奈跳过

不能更换 MessageQueue 重试就须要 MessageQueue 有本人的正本,通过 Raft、Paxos 之类的算法保障有可用的正本,或者通过其余高可用的存储设备来存储 MessageQueue。

热点问题如同没有什么好的解决办法,只能通过拆分 MessageQueue 和优化路由办法来尽量平衡的将音讯调配到不同的 MessageQueue。

生产并行度实践上不会有太大问题,因为 MessageQueue 的数量能够调整。

生产失败的无奈跳过是不可避免的,因为跳过可能导致后续的数据处理都是谬误的。不过能够提供一些策略,由用户依据谬误类型来决定是否跳过,并且提供重试队列之类的性能,在跳过之后用户能够在“其余”中央从新生产到这条音讯。

<br/>

鸣谢

感激极客工夫所属的《音讯队列高手课》链接

<br/>

最初

本篇是一篇大合集,两头必定参考了许多其他人的文章内容或图片,但因为工夫比拟长远,过后并没有一一记录,为此表示歉意,如果有作者发现了本人的文章或图片,能够私聊我,我会进行补充。

如果你发现写的还不错,能够搜寻公众号「是 Kerwin 啊」,一起提高!

也能够查看 Kerwin 的 GitHub 主页

正文完
 0