@[toc]
从八月份开始,断断续续连载了不少 RabbitMQ 教程,最近抽空整顿了一下,将来可能会有一个视频教程,小伙伴们敬请期待。
1. 常见消息中间件大 PK
说到消息中间件,预计大伙多多少少都能讲进去一些,ActiveMQ、RabbitMQ、RocketMQ、Kafka 等等各种以及 JMS、AMQP 等各种协定,然而这些消息中间件各自都有什么特点,咱们在开发中又该抉择哪种呢?明天松哥就来和小伙伴们梳理一下。
1.1 几种协定
先来说说消息中间件中常见的几个协定。
1.1.1 JMS
1.1.1.1 JMS 介绍
先来说说 JMS。
JMS 全称 Java Message Service,相似于 JDBC,不同于 JDBC,JMS 是 JavaEE 的音讯服务接口,JMS 次要有两个版本:
- 1.1
- 2.0。
两者相比,后者次要是简化了收发音讯的代码。
思考到消息中间件是一个十分罕用的工具,所以 JavaEE 为此制订了专门的标准 JMS。
不过和 JDBC 一样,JMS 作为标准,他只是一套接口,并不蕴含具体的实现,如果咱们要应用 JMS,那么个别还须要对应的实现,这就像应用 JDBC 须要对应的驱动一样。
1.1.1.2 JMS 模型
JMS 音讯服务反对两种音讯模型:
- 点对点或队列模型
- 公布/订阅模型
在点对点或队列模型下,一个生产者向一个特定的队列公布音讯,一个消费者从该队列中读取音讯。这里,生产者晓得消费者的队列,并间接将音讯发送到对应的队列。这是一种点对点的音讯模型,这种模式被概括为:
- 只有一个消费者将取得音讯。
- 生产者不须要在消费者生产该音讯期间处于运行状态,消费者也同样不须要在音讯发送时处于运行状态,即音讯的生产者和消费者是齐全解耦的。
- 每一个胜利解决的音讯都由音讯消费者签收。
发布者/订阅者模型反对向一个特定的音讯主题公布音讯,消费者则能够定义本人感兴趣的主题,这是一种点对面的音讯模型,这种模式能够被概括为:
- 多个消费者能够生产音讯。
- 在发布者和订阅者之间存在工夫依赖性,发布者须要创立一个订阅(subscription),以便客户可能订阅;订阅者必须放弃在线状态以接管音讯;当然,如果订阅者创立了长久的订阅,那么在订阅者未连贯时,音讯生产者公布的音讯将会在订阅者从新连贯时从新公布。
1.1.1.3 JMS 实现
开源的反对 JMS 的消息中间件有:
- Kafka
- Apache ActiveMQ
- JBoss 社区的 HornetQ
- Joram
- Coridan 的 MantaRay
- OpenJMS
一些商用的反对 JMS 的消息中间件有:
- WebLogic Server JMS
- EMS
- GigaSpaces
- iBus
- IONA JMS
- IQManager(2005 年 8 月被Sun Microsystems并购)
- JMS+
- Nirvana
- SonicMQ
- WebSphere MQ
这里有不少是松哥考古开掘进去的,其实对于咱们日常开发接触较多的,可能就是 Kafka 和 ActiveMQ。
1.1.2 AMQP
1.1.2.1 AMQP 简介
另一个和消息中间件无关的协定就是 AMQP 了。
Message Queue 的需要由来已久,80 年代最早在金融交易中,高盛等公司采纳 Teknekron 公司的产品,过后的 Message Queue 软件叫做:the information bus(TIB)。 TIB 被电信和通信公司采纳,路透社收买了 Teknekron 公司。之后,IBM 开发了 MQSeries,微软开发了 Microsoft Message Queue(MSMQ)。这些商业 MQ 供应商的问题是厂商锁定,价格昂扬。2001 年,Java Message Service 试图解决锁定和交互性的问题,但对利用来说反而更加麻烦了。
于是 2004 年,摩根大通和 iMatrix 开始着手 Advanced Message Queuing Protocol (AMQP)凋谢规范的开发。2006 年,AMQP 标准公布。2007 年,Rabbit 技术公司基于 AMQP 规范开发的 RabbitMQ 1.0 公布。
目前 RabbitMQ 的最新版本为 3.5.7,基于 AMQP 0-9-1。
在 AMQP 协定中,音讯收发波及到如下一些概念:
- Broker: 接管和散发音讯的利用,咱们日常所用的 RabbitMQ 就是一个 Message Broker。
- Virtual host: 出于多租户和平安因素设计的,把 AMQP 的根本组件划分到一个虚构的分组中,相似于网络中的 namespace 概念。当多个不同的用户应用同一个 RabbitMQ 提供的服务时,能够划分出多个 vhost,每个用户在本人的 vhost 中创立
exchange/queue
等,这个松哥之前写过专门的文章,传送门:[RabbitMQ 中的 VirtualHost 该如何了解]()。 - Connection: publisher/consumer 和 broker 之间的 TCP 连贯,断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非呈现网络故障或 broker 服务呈现问题。
- Channel: 如果每一次拜访 RabbitMQ 都建设一个 Connection,在音讯量大的时候建设 TCP Connection 的开销将是微小的,效率也较低。Channel 是在 Connection 外部建设的逻辑连贯,如果应用程序反对多线程,通常每个 Thread 创立独自的 Channel 进行通信,AMQP method 蕴含了 Channel id 帮忙客户端和 Message Broker 辨认 Channel,所以 Channel 之间是齐全隔离的。Channel 作为轻量级的 Connection 极大缩小了操作系统建设 TCP Connection 的开销,对于 Channel,松哥在RabbitMQ 治理页面该如何应用一文中也做过具体介绍。
- Exchange: Message 达到 Broker 的第一站,依据散发规定,匹配查问表中的 routing key,散发音讯到 queue 中去。罕用的类型有:direct (点对点), topic(公布订阅) 以及 fanout (播送)。
- Queue: 音讯最终被送到这里期待 Consumer 取走,一个 Message 能够被同时拷贝到多个 queue 中。
- Binding: Exchange 和 Queue 之间的虚构连贯,binding 中能够蕴含 routing key,Binding 信息被保留到 Exchange 中的查问表中,作为 Message 的散发根据。
1.1.2.2 AMQP 实现
来看看实现了 AMQP 协定的一些具体的消息中间件产品都有哪些。
- Apache Qpid
- Apache ActiveMQ
- RabbitMQ
可能有小伙伴奇怪咋还有 ActiveMQ?其实 ActiveMQ 不仅反对 JMS,也反对 AMQP,这个松哥前面细说。
另外还有大家熟知的阿里出品的 RocketMQ,这个是自定义了一套协定,社区也提供了 JMS,然而不太成熟,前面松哥细说。
1.1.3 MQTT
做物联网开发的小伙伴应该会常常接触这个协定,MQTT(Message Queuing Telemetry Transport,音讯队列遥测传输)是 IBM 开发的一个即时通讯协定,目前看来算是物联网开发中比拟重要的协定之一了,该协定反对所有平台,简直能够把所有联网物品和内部连接起来,被用来当做传感器和 Actuator(比方通过 Twitter 让屋宇联网)的通信协议,它的长处是格局简洁、占用带宽小、反对挪动端通信、反对 PUSH、实用于嵌入式零碎。
1.1.4 XMPP
XMPP(可扩大音讯解决现场协定,Extensible Messaging and Presence Protocol)是一个基于 XML 的协定,多用于即时消息(IM)以及在线现场探测,实用于服务器之间的准即时操作。外围是基于 XML 流传输,这个协定可能最终容许因特网用户向因特网上的其余任何人发送即时消息,即便其操作系统和浏览器不同。 它的长处是通用公开、兼容性强、可扩大、安全性高,毛病是 XML 编码格局占用带宽大。
1.1.5 JMS Vs AMQP
对于咱们 Java 工程师而言,大家日常接触较多的应该是 JMS 和 AMQP 协定,既然 JMS 和 AMQP 都是协定,那么两者有什么区别呢?来看上面一张图:
这张图说的很分明了,我就不啰嗦了。
1.2. 重要产品
1.2.1 ActiveMQ
ActiveMQ 是 Apache 下的一个子项目,应用齐全反对 JMS1.1 和 J2EE1.4 标准的 JMS Provider 实现,大量代码就能够高效地实现高级利用场景,并且反对可插拔的传输协定,如:in-VM
, TCP
, SSL
, NIO
, UDP
, multicast
, JGroups and JXTA transports
。
ActiveMQ 反对罕用的多种语言客户端如 C++、Java、.Net,、Python、 Php、 Ruby 等。
当初的 ActiveMQ 分为两个版本:
- ActiveMQ Classic
- ActiveMQ Artemis
这里的 ActiveMQ Classic 就是原来的 ActiveMQ,而 ActiveMQ Artemis 是在 RedHat 捐献的 HornetQ 服务器代码的根底上开发的,两者代码齐全不同,后者反对 JMS2.0,应用基于 Netty 的异步 IO,大大晋升了性能,更为神奇的是,后者不仅反对 JMS 协定,还反对 AMQP 协定、STOMP 以及 MQTT,能够说后者的玩法相当丰盛。
因而大家在应用时,倡议间接抉择 ActiveMQ Artemis。
1.2.2 RabbitMQ
RabbitMQ 算是 AMQP 体系下最为重要的产品了,它基于 Erlang 语言开发实现,预计很多人被 RabbitMQ 的装置折磨过,松哥倡议装置 RabbitMQ 间接用 Docker,省心省力(公号后盾回复 docker 有教程)。
RabbitMQ 反对 AMQP、XMPP、SMTP、STOMP 等多种协定,功能强大,实用于企业级开发。
来看一张 RabbitMQ 的结构图:
对于 RabbitMQ,松哥最近发了十来篇教程了,这里就不再啰嗦了。
1.2.3 RocketMQ
RocketMQ 是阿里开源的一款分布式消息中间件,原名 Metaq,从 3.0 版本开始改名为 RocketMQ,是阿里参照 Kafka 设计思维应用 Java 语言实现的一套 MQ。RocketMQ 将阿里外部多款 MQ 产品(Notify、Metaq)进行整合,只保护外围性能,去除了所有其余运行时依赖,保障外围性能最简化,在此基础上配合阿里上述其余开源产品实现不同场景下 MQ 的架构,目前次要用于订单交易系统。
RocketMQ 具备以下特点:
- 保障严格的音讯程序。
- 提供针对音讯的过滤性能。
- 提供丰盛的音讯拉取模式。
- 高效的订阅者程度扩大能力。
- 实时的音讯订阅机制。
- 亿级音讯沉积能力
对于 Java 工程师而言,这也是一种常常会用到的 MQ。
1.2.4 Kafka
Kafka 是 Apache 下的一个开源流解决平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式公布订阅音讯零碎,它能够解决消费者在网站中的所有动作(网页浏览,搜寻和其余用户的口头)流数据。Kafka 的目标是通过 Hadoop 的并行加载机制来对立线上和离线的音讯解决,也是为了通过集群来提供实时的音讯。
Kafka 具备以下个性:
- 疾速长久化:通过磁盘程序读写与零拷贝机制,能够在O(1)的零碎开销下进行音讯长久化。
- 高吞吐:在一台一般的服务器上既能够达到 10W/s 的吞吐速率。
- 高沉积:反对 topic 下消费者较长时间离线,音讯沉积量大。
- 齐全的分布式系统:Broker、Producer、Consumer 都原生主动反对分布式,通过 Zookeeper 能够主动实现更加简单的负载平衡。
- 反对 Hadoop 数据并行加载。
大数据开发中大家可能会常常接触 Kafka,Java 开发中也会接触,然而相对来说可能接触的少一些。
1.2.5 ZeroMQ
ZeroMQ 号称最快的音讯队列零碎,它专门为高吞吐量/低提早的场景开发,在金融界的利用中常常应用,偏重于实时数据通信场景。ZeroMQ 不是独自的服务,而是一个嵌入式库,它封装了网络通信、音讯队列、线程调度等性能,向下层提供简洁的 API,应用程序通过加载库文件,调用 API 函数来实现高性能网络通信。
ZeroMQ 的个性:
- 无锁的队列模型:对于跨线程间的交互(用户端和 session)之间的数据交换通道 pipe,采纳无锁的队列算法 CAS,在 pipe 的两端注册有异步事件,在读或者写音讯到 pipe 时,会主动触发读写事件。
- 批量解决的算法:对于批量的音讯,进行了适应性的优化,能够批量的接管和发送音讯。
- 多核下的线程绑定,毋庸 CPU 切换:区别于传统的多线程并发模式,信号量或者临界区,ZeroMQ 充分利用多核的劣势,每个核绑定运行一个工作者线程,防止多线程之间的 CPU 切换开销。
1.2.6 其余
另外还有如 Redis 也能做音讯队列,松哥之前也发过文章和大家介绍用 Redis 做一般音讯队列和提早音讯队列,这里也就不啰嗦了。
1.3. 比拟
最初,咱们再来通过一张图来比拟下各个消息中间件。
小伙伴们在公众号后盾回复 mqpkmq,能够获取这个 Excel 表格链接。
2. RabbitMQ 治理页面
RabbitMQ 的 web 治理页面置信很多小伙伴都用过,轻易点一下预计也都晓得啥意思,不过本着精益求精的思维,松哥还是想和大家捋一捋这个治理页面的各个细节。
2.1 概览
首先,这个 Web 治理页面大略就像下图这样:
首先一共有六个选项卡:
- Overview:这里能够概览 RabbitMQ 的整体状况,如果是集群,也能够查看集群中各个节点的状况。包含 RabbitMQ 的端口映射信息等,都能够在这个选项卡中查看。
- Connections:这个选项卡中是连贯上 RabbitMQ 的生产者和消费者的状况。
- Channels:这里展现的是“通道”信息,对于“通道”和“连贯”的关系,松哥在后文再和大家具体介绍。
- Exchange:这里展现所有的交换机信息。
- Queue:这里展现所有的队列信息。
- Admin:这里展现所有的用户信息。
右上角是页面刷新的工夫,默认是 5 秒刷新一次,展现的是所有的 Virtual host。
这是整个治理页面的一个大抵状况,接下来咱们来一一介绍。
2.2 Overview
Overview 中分了如下一些功能模块:
别离是:
Totals:
Totals 外面有 筹备生产的音讯数、待确认的音讯数、音讯总数以及音讯的各种解决速率(发送速率、确认速率、写入硬盘速率等等)。
Nodes:
Nodes 其实就是撑持 RabbitMQ 运行的一些机器,相当于集群的节点。
点击每个节点,能够查看节点的详细信息。
Churn statistics:
这个不好翻译,里边展现的是 Connection、Channel 以及 Queue 的创立/敞开速率。
Ports and contexts:
这个里边展现了端口的映射信息以及 Web 的上下文信息。
- 5672 是 RabbitMQ 通信端口。
- 15672 是 Web 治理页面端口。
- 25672 是集群通信端口。
Export definitions && Import definitions:
最初面这两个能够导入导出以后实例的一些配置信息:
2.3 Connections
这里次要展现的是以后连贯上 RabbitMQ 的信息,无论是音讯生产者还是音讯消费者,只有连贯上来了这里都会显示进去。
留神协定中的 AMQP 0-9-1 指的是 AMQP 协定的版本号。
其余属性含意如下:
- User name:以后连贯应用的用户名。
- State:以后连贯的状态,running 示意运行中;idle 示意闲暇。
- SSL/TLS:示意是否应用 ssl 进行连贯。
- Channels:以后连贯创立的通道总数。
- From client:每秒收回的数据包。
- To client:每秒收到的数据包。
点击连贯名称能够查看每一个连贯的详情。
在详情中能够查看每一个连贯的通道数以及其余详细信息,也能够强制敞开一个连贯。
2.4 Channels
这个中央展现的是通道的信息:
那么什么是通道呢?
一个连贯(IP)能够有多个通道,如上图,一共是两个连贯,然而一共有 12 个通道。
一个连贯能够有多个通道,这个多个通道通过多线程实现,个别状况下,咱们在通道中创立队列、交换机等。
生产者的通道个别会立马敞开;消费者是始终监听的,通道简直是会始终存在。
下面各项参数含意别离如下:
- Channel:通道名称。
- User name:该通道登录应用的用户名。
- Model:通道确认模式,C 示意 confirm;T 示意事务。
- State:通道以后的状态,running 示意运行中;idle 示意闲暇。
- Unconfirmed:待确认的音讯总数。
- Prefetch:Prefetch 示意每个消费者最大的能接受的未确认音讯数目,简略来说就是用来指定一个消费者一次能够从 RabbitMQ 中获取多少条音讯并缓存在消费者中,一旦消费者的缓冲区满了,RabbitMQ 将会进行投递新的音讯到该消费者中直到它收回有音讯被 ack 了。总的来说,消费者负责一直解决音讯,一直 ack,而后只有 unAcked 数少于 prefetch * consumer 数目,RabbitMQ 就一直将音讯投递过来。
- Unacker:待 ack 的音讯总数。
- publish:音讯生产者发送音讯的速率。
- confirm:音讯生产者确认音讯的速率。
- unroutable (drop):示意未被接管,且曾经删除了的音讯。
- deliver/get:音讯消费者获取音讯的速率。
- ack:音讯消费者 ack 音讯的速率。
2.5 Exchange
这个中央展现交换机信息:
这里会展现交换机的各种信息。
Type 示意交换机的类型。
Features 有两个取值 D 和 I。
D 示意交换机长久化,将交换机的属性在服务器外部保留,当 MQ 的服务器发生意外或敞开之后,重启 RabbitMQ 时不须要从新手动或执行代码去建设交换机,替换机会主动建设,相当于始终存在。
I 示意这个交换机不能够被音讯生产者用来推送音讯,仅用来进行交换机和交换机之间的绑定。
Message rate in 示意音讯进入的速率。
Message rate out 示意音讯进来的速率。
点击下方的 Add a new exchange 能够创立一个新的交换机。
2.6 Queue
这个选项卡就是用来展现音讯队列的:
各项含意如下:
- Name:示意音讯队列名称。
- Type:示意音讯队列的类型,除了上图的 classic,另外还有一种音讯类型是 Quorum。两个区别如下图:
- Features:示意音讯队列的个性,D 示意音讯队列长久化。
- State:示意以后队列的状态,running 示意运行中;idle 示意闲暇。
- Ready:示意待生产的音讯总数。
- Unacked:示意待应答的音讯总数。
- Total:示意音讯总数 Ready+Unacked。
- incoming:示意音讯进入的速率。
- deliver/get:示意获取音讯的速率。
- ack:示意音讯应答的速率。
点击下方的 Add a new queue 能够增加一个新的音讯队列。
点击每一个音讯队列的名称,能够进入到音讯队列中。进入到音讯队列后,能够实现对音讯队列的进一步操作,例如:
- 将音讯队列和某一个交换机进行绑定。
- 发送音讯。
- 获取一条音讯。
- 挪动一条音讯(须要插件的反对)。
- 删除音讯队列。
- 清空音讯队列中的音讯。
- ...
如下图:
2.7 Admin
这里是做一些用户治理操作,如下图:
各项属性含意如下:
- Name:示意用户名称。
- Tags:示意角色标签,只能选取一个。
- Can access virtual hosts:示意容许进入的虚拟主机。
- Has password:示意这个用户是否设置了明码。
常见的两个操作时治理用户和虚拟主机。
点击下方的 Add a user 能够增加一个新的用户,增加用户的时候须要给用户设置 Tags,其实就是用户角色,如下:
- none:
不能拜访 management plugin - management:
用户能够通过 AMQP 做的任何事
列出本人能够通过 AMQP 登入的 virtual hosts
查看本人的 virtual hosts 中的 queues, exchanges 和 bindings
查看和敞开本人的 channels 和 connections
查看无关本人的 virtual hosts 的“全局”的统计信息,蕴含其余用户在这些 virtual hosts 中的流动 - policymaker:
management 能够做的任何事
查看、创立和删除本人的 virtual hosts 所属的 policies 和 parameters - monitoring:
management 能够做的任何事
列出所有 virtual hosts,包含他们不能登录的 virtual hosts
查看其余用户的 connections 和 channels
查看节点级别的数据如 clustering 和 memory 应用状况
查看真正的对于所有 virtual hosts 的全局的统计信息 - administrator:
policymaker 和 monitoring 能够做的任何事
创立和删除 virtual hosts
查看、创立和删除 users
查看创立和删除 permissions
敞开其余用户的 connections - impersonator(模仿者)
模仿者,无奈登录治理控制台。
另外,这里也能够进行虚拟主机 virtual host 的操作,前面大节会和大家介绍虚拟主机。
3. RabbitMQ 七种音讯收发形式
本大节来和小伙伴们分享一下 RabbitMQ 的七种消息传递模式。一起来看看。
大部分状况下,咱们可能都是在 Spring Boot 或者 Spring Cloud 环境下应用 RabbitMQ,因而本文我也次要从这两个方面来和大家分享 RabbitMQ 的用法。
3.1 RabbitMQ 架构简介
一图胜千言,如下:
这张图中波及到如下一些概念:
- 生产者(Publisher):公布音讯到 RabbitMQ 中的交换机(Exchange)上。
- 交换机(Exchange):和生产者建设连贯并接管生产者的音讯。
- 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的音讯。
- 队列(Queue):Exchange 将音讯散发到指定的 Queue,Queue 和消费者进行交互。
- 路由(Routes):交换机转发音讯到队列的规定。
3.2 筹备工作
大家晓得,RabbitMQ 是 AMQP 营垒里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因而首先创立 Spring Boot 我的项目并增加该依赖,如下:
我的项目创立胜利后,在 application.properties 中配置 RabbitMQ 的根本连贯信息,如下:
spring.rabbitmq.host=localhostspring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=5672
接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的音讯生产者提交的音讯都会交由 Exchange 进行再调配,Exchange 会依据不同的策略将音讯散发到不同的 Queue 中。
RabbitMQ 官网介绍了如下几种音讯散发的模式:
这里给出了七种,其中第七种是音讯确认,音讯确认这块松哥之前发过相干的文章,传送门:
- 四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?
- RabbitMQ 高可用之如何确保音讯胜利生产
所以这里我次要和大家介绍前六种音讯收发形式。
3.3 音讯收发
3.3.1 Hello World
咦?这个咋没有交换机?这个其实是默认的交换机,咱们须要提供一个生产者一个队列以及一个消费者。音讯流传图如下:
来看看代码实现:
先来看看队列的定义:
@Configurationpublic class HelloWorldConfig { public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue"; @Bean Queue queue1() { return new Queue(HELLO_WORLD_QUEUE_NAME); }}
再来看看音讯消费者的定义:
@Componentpublic class HelloWorldConsumer { @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(String msg) { System.out.println("msg = " + msg); }}
音讯发送:
@SpringBootTestclass RabbitmqdemoApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); }}
这个时候应用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将音讯队列绑定到一个 DirectExchange 上,当一条音讯达到 DirectExchange 时会被转发到与该条音讯 routing key
雷同的 Queue 上,例如音讯队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的音讯会被该音讯队列接管。
3.3.2 Work queues
这种状况是这样的:
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:
一个队列对应了多个消费者,默认状况下,由队列对音讯进行平均分配,音讯会被分到不同的消费者手中。消费者能够配置各自的并发能力,进而进步音讯的生产能力,也能够配置手动 ack,来决定是否要生产某一条音讯。
先来看并发能力的配置,如下:
@Componentpublic class HelloWorldConsumer { @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(String msg) { System.out.println("receive = " + msg); } @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10") public void receive2(String msg) { System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName()); }}
能够看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去生产音讯。
启动我的项目,在 RabbitMQ 后盾也能够看到一共有 11 个消费者。
此时,如果生产者发送 10 条音讯,就会一下都被生产掉。
音讯发送形式如下:
@SpringBootTestclass RabbitmqdemoApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); } }}
音讯生产日志如下:
能够看到,音讯都被第一个消费者生产了。然而小伙伴们须要留神,事件并不总是这样(多试几次就能够看到差别),音讯也有可能被第一个消费者生产(只是因为第二个消费者有十个线程一起开动,所以第二个消费者生产的音讯占比更大)。
当然音讯消费者也能够开启手动 ack,这样能够自行决定是否生产 RabbitMQ 发来的音讯,配置手动 ack 的形式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
生产代码如下:
@Componentpublic class HelloWorldConsumer { @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(Message message,Channel channel) throws IOException { System.out.println("receive="+message.getPayload()); channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true); } @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10") public void receive2(Message message, Channel channel) throws IOException { System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName()); channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true); }}
此时第二个消费者回绝了所有音讯,第一个消费者生产了所有音讯。
这就是 Work queues 这种状况。
3.3.3 Publish/Subscribe
再来看公布订阅模式,这种状况是这样:
一个生产者,多个消费者,每一个消费者都有本人的一个队列,生产者没有将音讯间接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的音讯通过交换机,达到队列,实现一个音讯被多个消费者获取的目标。须要留神的是,如果将音讯发送到一个没有队列绑定的 Exchange下面,那么该音讯将会失落,这是因为在 RabbitMQ 中 Exchange 不具备存储音讯的能力,只有队列具备存储音讯的能力,如下图:
这种状况下,咱们有四种交换机可供选择,别离是:
- Direct
- Fanout
- Topic
- Header
我别离来给大家举一个简略例子看下。
3.3.3.1 Direct
DirectExchange 的路由策略是将音讯队列绑定到一个 DirectExchange 上,当一条音讯达到 DirectExchange 时会被转发到与该条音讯 routing key 雷同的 Queue 上,例如音讯队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的音讯会被该音讯队列接管。DirectExchange 的配置如下:
@Configurationpublic class RabbitDirectConfig { public final static String DIRECTNAME = "javaboy-direct"; @Bean Queue queue() { return new Queue("hello-queue"); } @BeanDirectExchange directExchange() { return new DirectExchange(DIRECTNAME, true, false); } @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()).with("direct"); }}
- 首先提供一个音讯队列Queue,而后创立一个DirectExchange对象,三个参数别离是名字,重启后是否仍然无效以及长期未用时是否删除。
- 创立一个Binding对象将Exchange和Queue绑定在一起。
- DirectExchange和Binding两个Bean的配置能够省略掉,即如果应用DirectExchange,能够只配置一个Queue的实例即可。
再来看看消费者:
@Componentpublic class DirectReceiver { @RabbitListener(queues = "hello-queue") public void handler1(String msg) { System.out.println("DirectReceiver:" + msg); }}
通过 @RabbitListener 注解指定一个办法是一个音讯生产办法,办法参数就是所接管到的音讯。而后在单元测试类中注入一个 RabbitTemplate 对象来进行音讯发送,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void directTest() { rabbitTemplate.convertAndSend("hello-queue", "hello direct!"); }}
最终执行后果如下:
3.3.3.2 Fanout
FanoutExchange 的数据交换策略是把所有达到 FanoutExchange 的音讯转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置形式如下:
@Configurationpublic class RabbitFanoutConfig { public final static String FANOUTNAME = "sang-fanout"; @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUTNAME, true, false); } @Bean Queue queueOne() { return new Queue("queue-one"); } @Bean Queue queueTwo() { return new Queue("queue-two"); } @Bean Binding bindingOne() { return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); }}
在这里首先创立 FanoutExchange,参数含意与创立 DirectExchange 参数含意统一,而后创立两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创立两个消费者,如下:
@Componentpublic class FanoutReceiver { @RabbitListener(queues = "queue-one") public void handler1(String message) { System.out.println("FanoutReceiver:handler1:" + message); } @RabbitListener(queues = "queue-two") public void handler2(String message) { System.out.println("FanoutReceiver:handler2:" + message); }}
两个消费者别离生产两个音讯队列中的音讯,而后在单元测试中发送音讯,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void fanoutTest() { rabbitTemplate .convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, "hello fanout!"); }}
留神这里发送音讯时不须要 routingkey
,指定 exchange
即可,routingkey
能够间接传一个 null
。
最终执行日志如下:
3.3.3.3 Topic
TopicExchange 是比较复杂然而也比拟灵便的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当音讯达到 TopicExchange 后,TopicExchange 依据音讯的 routingkey 将音讯路由到一个或者多个 Queue 上。TopicExchange 配置如下:
@Configurationpublic class RabbitTopicConfig { public final static String TOPICNAME = "sang-topic"; @Bean TopicExchange topicExchange() { return new TopicExchange(TOPICNAME, true, false); } @Bean Queue xiaomi() { return new Queue("xiaomi"); } @Bean Queue huawei() { return new Queue("huawei"); } @Bean Queue phone() { return new Queue("phone"); } @Bean Binding xiaomiBinding() { return BindingBuilder.bind(xiaomi()).to(topicExchange()) .with("xiaomi.#"); } @Bean Binding huaweiBinding() { return BindingBuilder.bind(huawei()).to(topicExchange()) .with("huawei.#"); } @Bean Binding phoneBinding() { return BindingBuilder.bind(phone()).to(topicExchange()) .with("#.phone.#"); }}
- 首先创立 TopicExchange,参数和后面的统一。而后创立三个 Queue,第一个 Queue 用来存储和 “xiaomi” 无关的音讯,第二个 Queue 用来存储和 “huawei” 无关的音讯,第三个 Queue 用来存储和 “phone” 无关的音讯。
- 将三个 Queue 别离绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 示意音讯的 routingkey 但凡以 “xiaomi” 结尾的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 示意音讯的 routingkey 但凡以 “huawei” 结尾的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则示意音讯的 routingkey 中但凡蕴含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。
接下来针对三个 Queue 创立三个消费者,如下:
@Componentpublic class TopicReceiver { @RabbitListener(queues = "phone") public void handler1(String message) { System.out.println("PhoneReceiver:" + message); } @RabbitListener(queues = "xiaomi") public void handler2(String message) { System.out.println("XiaoMiReceiver:"+message); } @RabbitListener(queues = "huawei") public void handler3(String message) { System.out.println("HuaWeiReceiver:"+message); }}
而后在单元测试中进行音讯的发送,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void topicTest() { rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻.."); }}
依据 RabbitTopicConfig 中的配置,第一条音讯将被路由到名称为 “xiaomi” 的 Queue 上,第二条音讯将被路由到名为 “huawei” 的 Queue 上,第三条音讯将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,第四条音讯将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,最初一条音讯则将被路由到名为 “phone” 的 Queue 上。
3.3.3.4 Header
HeadersExchange 是一种应用较少的路由策略,HeadersExchange 会依据音讯的 Header 将音讯路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:
@Configurationpublic class RabbitHeaderConfig { public final static String HEADERNAME = "javaboy-header"; @Bean HeadersExchange headersExchange() { return new HeadersExchange(HEADERNAME, true, false); } @Bean Queue queueName() { return new Queue("name-queue"); } @Bean Queue queueAge() { return new Queue("age-queue"); } @Bean Binding bindingName() { Map<String, Object> map = new HashMap<>(); map.put("name", "sang"); return BindingBuilder.bind(queueName()) .to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge() { return BindingBuilder.bind(queueAge()) .to(headersExchange()).where("age").exists(); }}
这里的配置大部分和后面介绍的一样,差异次要体现的 Binding 的配置上,第一个 bindingName 办法中,whereAny 示意音讯的 Header 中只有有一个 Header 匹配上 map 中的 key/value,就把该音讯路由到名为 “name-queue” 的 Queue 上,这里也能够应用 whereAll 办法,示意音讯的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则示意只有音讯的 Header 中蕴含 age,不论 age 的值是多少,都将音讯路由到名为 “age-queue” 的 Queue 上。
接下来创立两个音讯消费者:
@Componentpublic class HeaderReceiver { @RabbitListener(queues = "name-queue") public void handler1(byte[] msg) { System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length)); } @RabbitListener(queues = "age-queue") public void handler2(byte[] msg) { System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length)); }}
留神这里的参数用 byte 数组接管。而后在单元测试中创立音讯的发送办法,这里音讯的发送也和 routingkey 无关,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void headerTest() { Message nameMsg = MessageBuilder .withBody("hello header! name-queue".getBytes()) .setHeader("name", "sang").build(); Message ageMsg = MessageBuilder .withBody("hello header! age-queue".getBytes()) .setHeader("age", "99").build(); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg); }}
这里创立两条音讯,两条音讯具备不同的 header,不同 header 的音讯将被发到不同的 Queue 中去。
最终执行成果如下:
3.3.4 Routing
这种状况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者在创立 Exchange 后,依据 RoutingKey 去绑定相应的队列,并且在发送音讯时,指定音讯的具体 RoutingKey 即可。
如下图:
这个就是依照 routing key 来路由音讯,我这里就不再举例子了,大家能够参考 3.3.1 小结。
3.3.5 Topics
这种状况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者创立 Topic 的 Exchange 并且绑定到队列中,这次绑定能够通过 *
和 #
关键字,对指定 RoutingKey
内容,编写时留神格局 xxx.xxx.xxx
去编写。
如下图:
这个我也就不举例啦,后面 3.3.3 大节曾经举过例子了,不再赘述。
3.3.6 RPC
RPC 这种音讯收发模式,松哥前两天刚刚写了文章和大家介绍,这里就不多说了,传送门:
- SpringBoot+RabbitMQ 实现 RPC 调用
3.3.7 Publisher Confirms
这种发送确认松哥之前有写过相干文章,传送门:
- 四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?
- RabbitMQ 高可用之如何确保音讯胜利生产
4. RabbitMQ 实现 RPC
说到 RPC(Remote Procedure Call Protocol 近程过程调用协定),小伙伴们脑海里蹦出的预计都是 RESTful API、Dubbo、WebService、Java RMI、CORBA 等。
其实,RabbitMQ 也给咱们提供了 RPC 性能,并且应用起来很简略。
松哥通过一个简略的案例来和大家分享一下 Spring Boot+RabbitMQ 如何实现一个简略的 RPC 调用。
留神
对于 RabbitMQ 实现 RPC 调用,有的小伙伴可能会有一些误会,心想这还不简略?搞两个音讯队列 queue_1 和 queue_2,首先客户端发送音讯到 queue_1 上,服务端监听 queue_1 上的音讯,收到之后进行解决;解决实现后,服务端发送音讯到 queue_2 队列上,而后客户端监听 queue_2 队列上的音讯,这样就晓得服务端的处理结果了。
这种形式不是不能够,就是有点麻烦!RabbitMQ 中提供了现成的计划能够间接应用,十分不便。接下来咱们就一起来学习下。
4.1 架构
先来看一个简略的架构图:
这张图把问题说的很明确了:
- 首先 Client 发送一条音讯,和一般的音讯相比,这条音讯多了两个要害内容:一个是 correlation_id,这个示意这条音讯的惟一 id,还有一个内容是 reply_to,这个示意音讯回复队列的名字。
- Server 从音讯发送队列获取音讯并解决相应的业务逻辑,解决实现后,将处理结果发送到 reply_to 指定的回调队列中。
- Client 从回调队列中读取音讯,就能够晓得音讯的执行状况是什么样子了。
这种状况其实非常适合解决异步调用。
4.2 实际
接下来咱们通过一个具体的例子来看看这个怎么玩。
4.2.1 客户端开发
首先咱们来创立一个 Spring Boot 工程名为 producer,作为音讯生产者,创立时候增加 web 和 rabbitmq 依赖,如下图:
我的项目创立胜利之后,首先在 application.properties 中配置 RabbitMQ 的根本信息,如下:
spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.publisher-confirm-type=correlatedspring.rabbitmq.publisher-returns=true
这个配置后面四行都好了解,我就不赘述,前面两行:首先是配置音讯确认形式,咱们通过 correlated 来确认,只有开启了这个配置,未来的音讯中才会带 correlation_id,只有通过 correlation_id 咱们能力将发送的音讯和返回值之间关联起来。最初一行配置则是开启发送失败退回。
接下来咱们来提供一个配置类,如下:
/** * @author 江南一点雨 * @微信公众号 江南一点雨 * @网站 http://www.itboyhub.com * @国内站 http://www.javaboy.org * @微信 a_java_boy * @GitHub https://github.com/lenve * @Gitee https://gitee.com/lenve */@Configurationpublic class RabbitConfig { public static final String RPC_QUEUE1 = "queue_1"; public static final String RPC_QUEUE2 = "queue_2"; public static final String RPC_EXCHANGE = "rpc_exchange"; /** * 设置音讯发送RPC队列 */ @Bean Queue msgQueue() { return new Queue(RPC_QUEUE1); } /** * 设置返回队列 */ @Bean Queue replyQueue() { return new Queue(RPC_QUEUE2); } /** * 设置交换机 */ @Bean TopicExchange exchange() { return new TopicExchange(RPC_EXCHANGE); } /** * 申请队列和交换器绑定 */ @Bean Binding msgBinding() { return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1); } /** * 返回队列和交换器绑定 */ @Bean Binding replyBinding() { return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2); } /** * 应用 RabbitTemplate发送和接管音讯 * 并设置回调队列地址 */ @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setReplyAddress(RPC_QUEUE2); template.setReplyTimeout(6000); return template; } /** * 给返回队列设置监听器 */ @Bean SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(RPC_QUEUE2); container.setMessageListener(rabbitTemplate(connectionFactory)); return container; }}
这个配置类中咱们别离配置了音讯发送队列 msgQueue 和音讯返回队列 replyQueue,而后将这两个队列和音讯交换机进行绑定。这个都是 RabbitMQ 的惯例操作,没啥好说的。
在 Spring Boot 中咱们负责音讯发送的工具是 RabbitTemplate,默认状况下,零碎主动提供了该工具,然而这里咱们须要对该工具从新进行定制,次要是增加音讯发送的返回队列,最初咱们还须要给返回队列设置一个监听器。
好啦,接下来咱们就能够开始具体的音讯发送了:
/** * @author 江南一点雨 * @微信公众号 江南一点雨 * @网站 http://www.itboyhub.com * @国内站 http://www.javaboy.org * @微信 a_java_boy * @GitHub https://github.com/lenve * @Gitee https://gitee.com/lenve */@RestControllerpublic class RpcClientController { private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class); @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { // 创立音讯对象 Message newMessage = MessageBuilder.withBody(message.getBytes()).build(); logger.info("client send:{}", newMessage); //客户端发送音讯 Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage); String response = ""; if (result != null) { // 获取已发送的音讯的 correlationId String correlationId = newMessage.getMessageProperties().getCorrelationId(); logger.info("correlationId:{}", correlationId); // 获取响应头信息 HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders(); // 获取 server 返回的音讯 id String msgId = (String) headers.get("spring_returned_message_correlation"); if (msgId.equals(correlationId)) { response = new String(result.getBody()); logger.info("client receive:{}", response); } } return response; }}
这块的代码其实也都是一些惯例代码,我挑几个要害的节点说下:
- 音讯发送调用 sendAndReceive 办法,该办法自带返回值,返回值就是服务端返回的音讯。
- 服务端返回的音讯中,头信息中蕴含了 spring_returned_message_correlation 字段,这个就是音讯发送时候的 correlation_id,通过音讯发送时候的 correlation_id 以及返回音讯头中的 spring_returned_message_correlation 字段值,咱们就能够将返回的音讯内容和发送的音讯绑定到一起,确认出这个返回的内容就是针对这个发送的音讯的。
这就是整个客户端的开发,其实最最外围的就是 sendAndReceive 办法的调用。调用尽管简略,然而筹备工作还是要做足够。例如如果咱们没有在 application.properties 中配置 correlated,发送的音讯中就没有 correlation_id,这样就无奈将返回的音讯内容和发送的音讯内容关联起来。
4.2.2 服务端开发
再来看看服务端的开发。
首先创立一个名为 consumer 的 Spring Boot 我的项目,创立我的项目增加的依赖和客户端开发创立的依赖是统一的,不再赘述。
而后配置 application.properties 配置文件,该文件的配置也和客户端中的配置统一,不再赘述。
接下来提供一个 RabbitMQ 的配置类,这个配置类就比较简单,单纯的配置一下音讯队列并将之和音讯交换机绑定起来,如下:
/** * @author 江南一点雨 * @微信公众号 江南一点雨 * @网站 http://www.itboyhub.com * @国内站 http://www.javaboy.org * @微信 a_java_boy * @GitHub https://github.com/lenve * @Gitee https://gitee.com/lenve */@Configurationpublic class RabbitConfig { public static final String RPC_QUEUE1 = "queue_1"; public static final String RPC_QUEUE2 = "queue_2"; public static final String RPC_EXCHANGE = "rpc_exchange"; /** * 配置音讯发送队列 */ @Bean Queue msgQueue() { return new Queue(RPC_QUEUE1); } /** * 设置返回队列 */ @Bean Queue replyQueue() { return new Queue(RPC_QUEUE2); } /** * 设置交换机 */ @Bean TopicExchange exchange() { return new TopicExchange(RPC_EXCHANGE); } /** * 申请队列和交换器绑定 */ @Bean Binding msgBinding() { return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1); } /** * 返回队列和交换器绑定 */ @Bean Binding replyBinding() { return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2); }}
最初咱们再来看下音讯的生产:
@Componentpublic class RpcServerController { private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class); @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = RabbitConfig.RPC_QUEUE1) public void process(Message msg) { logger.info("server receive : {}",msg.toString()); Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build(); CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId()); rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData); }}
这里的逻辑就比较简单了:
- 服务端首先收到音讯并打印进去。
- 服务端提取出原音讯中的 correlation_id。
- 服务端调用 sendAndReceive 办法,将音讯发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数。
服务端的音讯收回后,客户端将收到服务端返回的后果。
OK,功败垂成。
4.2.3 测试
接下来咱们进行一个简略测试。
首先启动 RabbitMQ。
接下来别离启动 producer 和 consumer,而后在 postman 中调用 producer 的接口进行测试,如下:
能够看到,曾经收到了服务端的返回信息。
来看看 producer 的运行日志:
能够看到,音讯发送进来后,同时也收到了 consumer 返回的信息。
能够看到,consumer 也收到了客户端发来的音讯。
5. RabbitMQ 音讯有效期
RabbitMQ 中的音讯长期未被生产会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑难,松哥来和大家捋一捋这个问题。
5.1 默认状况
首先咱们来看看默认状况。
默认状况下,音讯是不会过期的,也就是咱们素日里在音讯发送时,如果不设置任何音讯过期的相干参数,那么音讯是不会过期的,即便音讯没被生产掉,也会始终存储在队列中。
这种状况具体代码就不必我再演示了吧,松哥之前的文章但凡波及到 RabbitMQ 的,基本上都是这样的。
5.2 TTL
TTL(Time-To-Live),音讯存活的工夫,即音讯的有效期。如果咱们心愿音讯可能有一个存活工夫,那么咱们能够通过设置 TTL 来实现这一需要。如果音讯的存活工夫超过了 TTL 并且还没有被音讯,此时音讯就会变成死信
,对于死信
以及死信队列
,松哥前面再和大家介绍。
TTL 的设置有两种不同的形式:
- 在申明队列的时候,咱们能够在队列属性中设置音讯的有效期,这样所有进入该队列的音讯都会有一个雷同的有效期。
- 在发送音讯的时候设置音讯的有效期,这样不同的音讯就具备不同的有效期。
那如果两个都设置了呢?
以工夫短的为准。
当咱们设置了音讯有效期后,音讯过期了就会被从队列中删除了(进入到死信队列,后文一样,不再标注),然而两种形式对应的删除机会有一些差别:
- 对于第一种形式,当音讯队列设置过期工夫的时候,那么音讯过期了就会被删除,因为音讯进入 RabbitMQ 后是存在一个音讯队列中,队列的头部是最早要过期的音讯,所以 RabbitMQ 只须要一个定时工作,从头部开始扫描是否有过期音讯,有的话就间接删除。
- 对于第二种形式,当音讯过期后并不会立马被删除,而是当音讯要投递给消费者的时候才会去删除,因为第二种形式,每条音讯的过期工夫都不一样,想要晓得哪条音讯过期,必须要遍历队列中的所有音讯能力实现,当音讯比拟多时这样就比拟消耗性能,因而对于第二种形式,当音讯要投递给消费者的时候才去删除。
介绍完 TTL 之后,接下来咱们来看看具体用法。
接下来所有代码松哥都以 Spring Boot 中封装的 AMPQ 为例来解说。
5.2.1 单条音讯过期
咱们先来看单条音讯的过期工夫。
首先创立一个 Spring Boot 我的项目,引入 Web 和 RabbitMQ 依赖,如下:
而后在 application.properties 中配置一下 RabbitMQ 的连贯信息,如下:
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/
接下来略微配置一下音讯队列:
@Configurationpublic class QueueConfig { public static final String JAVABOY_QUEUE_DEMO = "javaboy_queue_demo"; public static final String JAVABOY_EXCHANGE_DEMO = "javaboy_exchange_demo"; public static final String HELLO_ROUTING_KEY = "hello_routing_key"; @Bean Queue queue() { return new Queue(JAVABOY_QUEUE_DEMO, true, false, false); } @Bean DirectExchange directExchange() { return new DirectExchange(JAVABOY_EXCHANGE_DEMO, true, false); } @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()) .with(HELLO_ROUTING_KEY); }}
这个配置类次要干了三件事:配置音讯队列、配置交换机以及将两者绑定在一起。
- 首先配置一个音讯队列,new 一个 Queue:第一个参数是音讯队列的名字;第二个参数示意音讯是否长久化;第三个参数示意音讯队列是否排他,个别咱们都是设置为 false,即不排他;第四个参数示意如果该队列没有任何订阅的消费者的话,该队列会被主动删除,个别实用于长期队列。
- 配置一个 DirectExchange 交换机。
- 将交换机和队列绑定到一起。
这段配置应该很简略,没啥好解释的,有一个排他性,松哥这里略微多说两句:
对于排他性,如果设置为 true,则该音讯队列只有创立它的 Connection 能力拜访,其余的 Connection 都不能拜访该音讯队列,如果试图在不同的连贯中从新申明或者拜访排他性队列,那么零碎会报一个资源被锁定的谬误。另一方面,对于排他性队列而言,当连贯断掉的时候,该音讯队列也会主动删除(无论该队列是否被申明为持久性队列都会被删除)。
接下来提供一个音讯发送接口,如下:
@RestControllerpublic class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { Message message = MessageBuilder.withBody("hello javaboy".getBytes()) .setExpiration("10000") .build(); rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message); }}
在创立 Message 对象的时候咱们能够设置音讯的过期工夫,这里设置音讯的过期工夫为 10 秒。
这就能够啦!
接下来咱们启动我的项目,进行音讯发送测试。当音讯发送胜利之后,因为没有消费者,所以这条音讯并不会被生产。关上 RabbitMQ 治理页面,点击到 Queues 选项卡,10s 之后,咱们会发现音讯曾经不见了:
很简略吧!
单条音讯设置过期工夫,就是在音讯发送的时候设置一下音讯有效期即可。
5.2.2 队列音讯过期
给队列设置音讯过期工夫,形式如下:
@BeanQueue queue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 10000); return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);}
设置实现后,咱们批改音讯的发送逻辑,如下:
@RestControllerpublic class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { Message message = MessageBuilder.withBody("hello javaboy".getBytes()) .build(); rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message); }}
能够看到,音讯失常发送即可,不必设置音讯过期工夫。
OK,启动我的项目,发送一条音讯进行测试。查看 RabbitMQ 治理页面,如下:
能够看到,音讯队列的 Features 属性为 D 和 TTL,D 示意音讯队列中音讯长久化,TTL 则示意音讯会过期。
10s 之后刷新页面,发现音讯数量曾经复原为 0。
这就是给音讯队列设置音讯过期工夫,一旦设置了,所有进入到该队列的音讯都有一个过期工夫了。
5.2.3 非凡状况
还有一种非凡状况,就是将音讯的过期工夫 TTL 设置为 0,这示意如果音讯不能立马生产则会被立刻丢掉,这个个性能够局部代替 RabbitMQ3.0 以前反对的 immediate 参数,之所以所局部代替,是因为 immediate 参数在投递失败会有 basic.return 办法将音讯体返回(这个性能能够利用死信队列来实现)。
具体代码松哥就不演示了,这个应该比拟容易。
5.3 死信队列
有小伙伴不禁要问,被删除的音讯去哪了?真的被删除了吗?非也非也!这就波及到死信队列了,接下来咱们来看看死信队列。
5.3.1 死信交换机
死信交换机,Dead-Letter-Exchange 即 DLX。
死信交换机用来接管死信音讯(Dead Message)的,那什么是死信音讯呢?个别音讯变成死信音讯有如下几种状况:
- 音讯被回绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
- 音讯过期
- 队列达到最大长度
当音讯在一个队列中变成了死信音讯后,此时就会被发送到 DLX,绑定 DLX 的音讯队列则称为死信队列。
DLX 实质上也是一个普普通通的交换机,咱们能够为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会主动的将这个死信公布到 DLX 下来,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
5.3.2 死信队列
这个好了解,绑定了死信交换机的队列就是死信队列。
5.3.3 实际
咱们来看一个简略的例子。
首先咱们来创立一个死信交换机,接着创立一个死信队列,再将死信交换机和死信队列绑定到一起:
public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";public static final String DLX_QUEUE_NAME = "dlx_queue_name";public static final String DLX_ROUTING_KEY = "dlx_routing_key";/** * 配置死信交换机 * * @return */@BeanDirectExchange dlxDirectExchange() { return new DirectExchange(DLX_EXCHANGE_NAME, true, false);}/** * 配置死信队列 * @return */@BeanQueue dlxQueue() { return new Queue(DLX_QUEUE_NAME);}/** * 绑定死信队列和死信交换机 * @return */@BeanBinding dlxBinding() { return BindingBuilder.bind(dlxQueue()) .to(dlxDirectExchange()) .with(DLX_ROUTING_KEY);}
这其实跟一般的交换机,一般的音讯队列没啥两样。
接下来为音讯队列配置死信交换机,如下:
@BeanQueue queue() { Map<String, Object> args = new HashMap<>(); //设置音讯过期工夫 args.put("x-message-ttl", 0); //设置死信交换机 args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); //设置死信 routing_key args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);}
就两个参数:
- x-dead-letter-exchange:配置死信交换机。
- x-dead-letter-routing-key:配置死信
routing_key
。
这就配置好了。
未来发送到这个音讯队列上的音讯,如果产生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的音讯队列上。
死信音讯队列的生产和一般音讯队列的生产并无二致:
@RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)public void dlxHandle(String msg) { System.out.println("dlx msg = " + msg);}
很容易吧~
6. RabbitMQ 实现提早队列
定时工作各种各样,常见的定时工作例如日志备份,咱们可能在每天凌晨 3 点去备份,这种固定工夫的定时工作咱们个别采纳 cron 表达式就能轻松的实现,还有一些比拟非凡的定时工作,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时工作就不太好用 cron 去形容,因为开始工夫不确定,咱们开发中有的时候也会遇到相似的需要,例如:
- 在电商我的项目中,当咱们下单之后,个别须要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异样解决逻辑中,被勾销,那么进入到异样解决逻辑中,就能够当成是一个提早队列。
- 我买了一个智能砂锅,能够用来煮粥,下班前把素材都放到锅里,而后设置几点几分开始煮粥,这样上班后就能够喝到香喷喷的粥了,那么这个煮粥的指令也能够看成是一个提早工作,放到一个提早队列中,工夫到了再执行。
- 公司的会议预约零碎,在会议预约胜利后,会在会议开始前半小时告诉所有预约该会议的用户。
- 平安工单超过 24 小时未解决,则主动拉企业微信群揭示相干责任人。
- 用户下单外卖当前,间隔超时工夫还有 10 分钟时揭示外卖小哥行将超时。
- ...
很多场景下咱们都须要提早队列。
本文以 RabbitMQ 为例来和大家聊一聊提早队列的玩法。
整体上来说,在 RabbitMQ 上实现定时工作有两种形式:
- 利用 RabbitMQ 自带的音讯过期和私信队列机制,实现定时工作。
- 应用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时工作,这种计划较简略。
两种用法咱们别离来看。
6.1 用插件
6.1.1 装置插件
首先咱们须要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源我的项目,咱们间接下载即可:
- https://github.com/rabbitmq/r...
抉择适宜本人的版本,我这里抉择最新的 3.9.0 版。
下载实现后在命令行执行如下命令将下载文件拷贝到 Docker 容器中去:
docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez some-rabbit:/plugins
这里第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的地位。
接下来再执行如下命令进入到 RabbitMQ 容器中:
docker exec -it some-rabbit /bin/bash
进入到容器之后,执行如下命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启用胜利之后,还能够通过如下命令查看所有装置的插件,看看是否有咱们刚刚装置过的插件,如下:
rabbitmq-plugins list
命令的残缺执行过程如下图:
OK,配置实现之后,接下来咱们执行 exit
命令退出 RabbitMQ 容器。而后开始编码。
6.1.2 音讯收发
接下来开始音讯收发。
首先咱们创立一个 Spring Boot 我的项目,引入 Web 和 RabbitMQ 依赖,如下:
我的项目创立胜利后,在 application.properties 中配置 RabbitMQ 的根本信息,如下:
spring.rabbitmq.host=localhostspring.rabbitmq.password=guestspring.rabbitmq.username=guestspring.rabbitmq.virtual-host=/
接下来提供一个 RabbitMQ 的配置类:
@Configurationpublic class RabbitConfig { public static final String QUEUE_NAME = "javaboy_delay_queue"; public static final String EXCHANGE_NAME = "javaboy_delay_exchange"; public static final String EXCHANGE_TYPE = "x-delayed-message"; @Bean Queue queue() { return new Queue(QUEUE_NAME, true, false, false); } @Bean CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args); } @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(customExchange()).with(QUEUE_NAME).noargs(); }}
这里次要是交换机的定义有所不同,小伙伴们须要留神。
这里咱们应用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创立 CustomExchange 时有五个参数,含意别离如下:
- 交换机名称。
- 交换机类型,这个中央是固定的。
- 交换机是否长久化。
- 如果没有队列绑定到交换机,交换机是否删除。
- 其余参数。
最初一个 args 参数中,指定了交换机音讯散发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,未来交换机散发音讯就按哪种形式来。
接下来咱们再创立一个音讯消费者:
@Componentpublic class MsgReceiver { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class); @RabbitListener(queues = RabbitConfig.QUEUE_NAME) public void handleMsg(String msg) { logger.info("handleMsg,{}",msg); }}
打印一下音讯内容即可。
接下来再写一个单元测试办法来发送音讯:
@SpringBootTestclass MqDelayedMsgDemoApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() throws UnsupportedEncodingException { Message msg = MessageBuilder.withBody(("hello 江南一点雨"+new Date()).getBytes("UTF-8")).setHeader("x-delay", 3000).build(); rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg); }}
在音讯头中设置音讯的延迟时间。
好啦,接下来启动 Spring Boot 我的项目,而后运行单元测试办法发送音讯,最终的控制台打印日志如下:
从日志中能够看到音讯提早曾经实现了。
6.2 DLX 实现提早队列
6.2.1 提早队列实现思路
提早队列实现的思路也很简略,就是上篇文章咱们所说的 DLX(死信交换机)+TTL(音讯超时工夫)。
咱们能够把死信队列就当成提早队列。
具体来说是这样:
如果一条音讯须要提早 30 分钟执行,咱们就设置这条音讯的有效期为 30 分钟,同时为这条音讯配置死信交换机和死信 routing_key
,并且不为这个音讯队列设置消费者,那么 30 分钟后,这条音讯因为没有被消费者生产而进入死信队列,此时咱们有一个消费者就在“蹲点”这个死信队列,音讯一进入死信队列,就立马被生产了。
这就是提早队列的实现思路,是不是很简略?
6.2.2 案例
接下来松哥通过一个简略的案例,来和大家演示一下提早队列的具体实现。
首先筹备好一个启动的 RabbitMQ。
而后咱们创立一个 Spring Boot 我的项目,引入 RabbitMQ 依赖:
而后在 application.properties 中配置一下 RabbitMQ 的根本连贯信息:
spring.rabbitmq.host=localhostspring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=5672
接下来咱们来配置两个音讯队列:一个一般队列,一个死信队列:
@Configurationpublic class QueueConfig { public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name"; public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name"; public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key"; public static final String DLX_QUEUE_NAME = "dlx_queue_name"; public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name"; public static final String DLX_ROUTING_KEY = "dlx_routing_key"; /** * 死信队列 * @return */ @Bean Queue dlxQueue() { return new Queue(DLX_QUEUE_NAME, true, false, false); } /** * 死信交换机 * @return */ @Bean DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE_NAME, true, false); } /** * 绑定死信队列和死信交换机 * @return */ @Bean Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()) .with(DLX_ROUTING_KEY); } /** * 一般音讯队列 * @return */ @Bean Queue javaboyQueue() { Map<String, Object> args = new HashMap<>(); //设置音讯过期工夫 args.put("x-message-ttl", 1000*10); //设置死信交换机 args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); //设置死信 routing_key args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(JAVABOY_QUEUE_NAME, true, false, false, args); } /** * 一般交换机 * @return */ @Bean DirectExchange javaboyExchange() { return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false); } /** * 绑定一般队列和与之对应的交换机 * @return */ @Bean Binding javaboyBinding() { return BindingBuilder.bind(javaboyQueue()) .to(javaboyExchange()) .with(JAVABOY_ROUTING_KEY); }}
这段配置代码尽管略长,不过原理其实简略。
- 配置能够分为两组,第一组配置死信队列,第二组配置一般队列。每一组都由音讯队列、音讯交换机以及 Binding 三者组成。
- 配置音讯队列时,为音讯队列指定死信队列,不相熟的小伙伴能够翻一下上篇文章,传送门:RabbitMQ 中的音讯会过期吗?。
- 配置队列中的音讯过期工夫时,默认的工夫单位时毫秒。
接下来咱们为死信队列配置一个消费者,如下:
@Componentpublic class DlxConsumer { private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class); @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME) public void handle(String msg) { logger.info(msg); }}
收到音讯后就将之打印进去。
这就完事了。
启动我的项目。
最初咱们在单元测试中发送一条音讯:
@SpringBootTestclass DelayQueueApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { System.out.println(new Date()); rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!"); }}
这个就没啥好说的了,就是一般的音讯发送,10 秒之后这条音讯会在死信队列的消费者中被打印进去。
好啦,这就是咱们用 RabbitMQ 做提早队列的两种思路~感兴趣的小伙伴能够试试哦~
7. RabbitMQ 发送可靠性
微服务能够设计成音讯驱动的微服务,响应式零碎也能够基于消息中间件来做,从这个角度来说,在互联网利用开发中,消息中间件真的是太重要了。
以 RabbitMQ 为例,松哥来和大家聊一聊音讯两头音讯发送可靠性的问题。
留神,以下内容我次要和大家探讨如何确保音讯生产者将音讯发送胜利,并不波及音讯生产的问题。
7.1 RabbitMQ 音讯发送机制
大家晓得,RabbitMQ 中的音讯发送引入了 Exchange(交换机)的概念,音讯的发送首先达到交换机上,而后再依据既定的路由规定,由交换机将音讯路由到不同的 Queue(队列)中,再由不同的消费者去生产。
大抵的流程就是这样,所以要确保音讯发送的可靠性,次要从两方面去确认:
- 音讯胜利达到 Exchange
- 音讯胜利达到 Queue
如果能确认这两步,那么咱们就能够认为音讯发送胜利了。
如果这两步中任一步骤呈现问题,那么音讯就没有胜利送达,此时咱们可能要通过重试等形式去从新发送音讯,多次重试之后,如果音讯还是不能到达,则可能就须要人工染指了。
通过下面的剖析,咱们能够确认,要确保音讯胜利发送,咱们只须要做好三件事就能够了:
- 确认音讯达到 Exchange。
- 确认音讯达到 Queue。
- 开启定时工作,定时投递那些发送失败的音讯。
7.2 RabbitMQ 的致力
下面提出的三个步骤,第三步须要咱们本人实现,前两步 RabbitMQ 则有现成的解决方案。
如何确保音讯胜利达到 RabbitMQ?RabbitMQ 给出了两种计划:
- 开启事务机制
- 发送方确认机制
这是两种不同的计划,不能够同时开启,只能抉择其中之一,如果两者同时开启,则会报如下谬误:
咱们别离来看。以下所有案例都在 Spring Boot 中开展,文末能够下载相干源码。
7.2.1 开启事务机制
Spring Boot 中开启 RabbitMQ 事务机制的形式如下:
首先须要先提供一个事务管理器,如下:
@BeanRabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory);}
接下来,在音讯生产者下面做两件事:增加事务注解并设置通信信道为事务模式:
@Servicepublic class MsgService { @Autowired RabbitTemplate rabbitTemplate; @Transactional public void send() { rabbitTemplate.setChannelTransacted(true); rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes()); int i = 1 / 0; }}
这里留神两点:
- 发送音讯的办法上增加
@Transactional
注解标记事务。 - 调用 setChannelTransacted 办法设置为 true 开启事务模式。
这就 OK 了。
在下面的案例中,咱们在结尾来了个 1/0 ,这在运行时必然抛出异样,咱们能够尝试运行该办法,发现音讯并未发送胜利。
当咱们开启事务模式之后,RabbitMQ 生产者发送音讯会多出四个步骤:
- 客户端发出请求,将信道设置为事务模式。
- 服务端给出回复,批准将信道设置为事务模式。
- 客户端发送音讯。
- 客户端提交事务。
- 服务端给出响应,确认事务提交。
下面的步骤,除了第三步是原本就有的,其余几个步骤都是平白无故多进去的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。咱们能够想想,什么我的项目会用到消息中间件?一般来说都是一些高并发的我的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保音讯发送胜利,这种形式,性能要远远高于事务模式,一起来看下。
7.2.2 发送方确认机制
7.2.2.1 单条音讯解决
首先咱们移除刚刚对于事务的代码,而后在 application.properties 中配置开启音讯发送方确认机制,如下:
spring.rabbitmq.publisher-confirm-type=correlatedspring.rabbitmq.publisher-returns=true
第一行是配置音讯达到交换器的确认回调,第二行则是配置音讯达到队列的回调。
第一行属性的配置有三个取值:
- none:示意禁用公布确认模式,默认即此。
- correlated:示意胜利公布音讯到交换器后会触发的回调办法。
- simple:相似 correlated,并且反对
waitForConfirms()
和waitForConfirmsOrDie()
办法的调用。
接下来咱们要开启两个监听,具体配置如下:
@Configurationpublic class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name"; public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name"; private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Autowired RabbitTemplate rabbitTemplate; @Bean Queue queue() { return new Queue(JAVABOY_QUEUE_NAME); } @Bean DirectExchange directExchange() { return new DirectExchange(JAVABOY_EXCHANGE_NAME); } @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()) .with(JAVABOY_QUEUE_NAME); } @PostConstruct public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("{}:音讯胜利达到交换器",correlationData.getId()); }else{ logger.error("{}:音讯发送失败", correlationData.getId()); } } @Override public void returnedMessage(ReturnedMessage returned) { logger.error("{}:音讯未胜利路由到队列",returned.getMessage().getMessageProperties().getMessageId()); }}
对于这个配置类,我说如下几点:
- 定义配置类,实现
RabbitTemplate.ConfirmCallback
和RabbitTemplate.ReturnsCallback
两个接口,这两个接口,前者的回调用来确定音讯达到交换器,后者则会在音讯路由到队列失败时被调用。 - 定义 initRabbitTemplate 办法并增加 @PostConstruct 注解,在该办法中为 rabbitTemplate 别离配置这两个 Callback。
这就能够了。
接下来咱们对音讯发送进行测试。
首先咱们尝试将音讯发送到一个不存在的交换机中,像上面这样:
rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
留神第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下谬误:
接下来咱们给定一个实在存在的交换器,然而给一个不存在的队列,像上面这样:
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
留神此时第二个参数是一个字符串,不是变量。
能够看到,音讯尽管胜利达到交换器了,然而没有胜利路由到队列(因为队列不存在)。
这是一条音讯的发送,咱们再来看看音讯的批量发送。
7.2.2.2 音讯批量解决
如果是音讯批量解决,那么发送胜利的回调监听是一样的,这里不再赘述。
这就是 publisher-confirm 模式。
相比于事务,这种模式下的音讯吞吐量会失去极大的晋升。
7.3 失败重试
失败重试分两种状况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,然而音讯发送失败了。
两种重试咱们别离来看。
7.3.1 自带重试机制
后面所说的事务机制和发送方确认机制,都是发送方确认音讯发送胜利的方法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,然而这个重试机制就和 MQ 自身没有关系了,这是利用 Spring 中的 retry 机制来实现的,具体配置如下:
spring.rabbitmq.template.retry.enabled=truespring.rabbitmq.template.retry.initial-interval=1000msspring.rabbitmq.template.retry.max-attempts=10spring.rabbitmq.template.retry.max-interval=10000msspring.rabbitmq.template.retry.multiplier=2
从上往下配置含意顺次是:
- 开启重试机制。
- 重试起始间隔时间。
- 最大重试次数。
- 最大重试间隔时间。
- 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
配置实现后,再次启动 Spring Boot 我的项目,而后关掉 MQ,此时尝试发送音讯,就会发送失败,进而导致主动重试。
7.3.2 业务重试
业务重试次要是针对音讯没有达到交换器的状况。
如果音讯没有胜利达到交换器,依据咱们第二大节的解说,此时就会触发音讯发送失败回调,在这个回调中,咱们就能够做文章了!
整体思路是这样:
- 首先创立一张表,用来记录发送到中间件上的音讯,像上面这样:
每次发送音讯的时候,就往数据库中增加一条记录。这里的字段都很好了解,有三个我额定说下:
- status:示意音讯的状态,有三个取值,0,1,2 别离示意音讯发送中、音讯发送胜利以及音讯发送失败。
- tryTime:示意音讯的第一次重试工夫(音讯收回去之后,在 tryTime 这个工夫点还未显示发送胜利,此时就能够开始重试了)。
- count:示意音讯重试次数。
其余字段都很好了解,我就不一一啰嗦了。
- 在音讯发送的时候,咱们就往该表中保留一条音讯发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
- 在 confirm 回调办法中,如果收到音讯发送胜利的回调,就将该条音讯的 status 设置为1(在音讯发送时为音讯设置 msgId,在音讯发送胜利回调时,通过 msgId 来惟一锁定该条音讯)。
- 另外开启一个定时工作,定时工作每隔 10s 就去数据库中捞一次音讯,专门去捞那些 status 为 0 并且曾经过了 tryTime 工夫记录,把这些音讯拎进去后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则批改该条音讯的 status 为 2,示意这条音讯发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则从新去发送音讯,并且为其 count 的值+1。
大抵的思路就是下面这样,松哥这里就不给出代码了,松哥的 vhr 里边邮件发送就是这样的思路来解决的,残缺代码大家能够参考 vhr 我的项目(https://github.com/lenve/vhr)。
当然这种思路有两个弊病:
- 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候咱们并不需要 MQ 有很高的 Qos,所以这个利用时要看具体情况。
- 依照下面的思路,可能会呈现同一条音讯反复发送的状况,不过这都不是事,咱们在音讯生产时,解决好幂等性问题就行了。
当然,大家也要留神,音讯是否要确保 100% 发送胜利,也要看具体情况。
好啦,这就是对于音讯生产者的一些常见问题以及对应的解决方案,下一大节松哥和大家探讨如果保障音讯生产胜利并解决幂等性问题。
8. RabbitMQ 生产可靠性
上一大节松哥和大家聊了 MQ 高可用之如何确保音讯胜利发送,各种配置齐上阵,最终确保了音讯的胜利发送,甚至在一些极其状况下还可能产生同一条音讯反复发送的状况,不论怎么样,音讯总算发送进来了,如果小伙伴们还没看过上篇文章,倡议先看看,再来学习本文:
- 四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?
明天咱们就来聊一聊音讯生产的问题,看看如何确保音讯生产胜利,并且确保幂等性。
8.1 两种生产思路
RabbitMQ 的音讯生产,整体上来说有两种不同的思路:
- 推(push):MQ 被动将音讯推送给消费者,这种形式须要消费者设置一个缓冲区去缓存音讯,对于消费者而言,内存中总是有一堆须要解决的音讯,所以这种形式的效率比拟高,这也是目前大多数利用采纳的生产形式。
- 拉(pull):消费者被动从 MQ 拉取音讯,这种形式效率并不是很高,不过有的时候如果服务端须要批量拉取音讯,倒是能够采纳这种形式。
两种形式我都举个例子看下。
先来看推(push):
这种形式大家比拟常见,就是通过 @RabbitListener
注解去标记消费者,如下:
@Componentpublic class ConsumerDemo { @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) public void handle(String msg) { System.out.println("msg = " + msg); }}
当监听的队列中有音讯时,就会触发该办法。
再来看拉(pull):
@Testpublic void test01() throws UnsupportedEncodingException { Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME); System.out.println("o = " + new String(((byte[]) o),"UTF-8"));}
调用 receiveAndConvert 办法,办法参数为队列名称,办法执行实现后,会从 MQ 上拉取一条音讯下来,如果该办法返回值为 null,示意该队列上没有音讯了。receiveAndConvert 办法有一个重载办法,能够在重载办法中传入一个期待超时工夫,例如 3 秒。此时,假如队列中没有音讯了,则 receiveAndConvert 办法会阻塞 3 秒,3 秒内如果队列中有了新音讯就返回,3 秒后如果队列中还是没有新音讯,就返回 null,这个期待超时工夫要是不设置的话,默认为 0。
这是音讯两种不同的生产模式。
如果须要从音讯队列中继续取得音讯,就能够应用推模式;如果只是单纯的生产一条音讯,则应用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅音讯,这会重大影响 RabbitMQ 的性能。
8.2 确保生产胜利两种思路
在上篇文章中,咱们想尽办法确保音讯可能发送胜利,对于音讯生产胜利,其实官网提供了相干的机制,咱们一起来看下。
为了保障音讯可能牢靠的达到音讯消费者,RabbitMQ 中提供了音讯生产确认机制。当消费者去生产音讯的时候,能够通过指定 autoAck 参数来示意音讯生产的确认形式。
- 当 autoAck 为 false 的时候,此时即便消费者曾经收到音讯了,RabbitMQ 也不会立马将音讯移除,而是期待消费者显式的回复确认信号后,才会将音讯打上删除标记,而后再删除。
- 当 autoAck 为 true 的时候,此时音讯消费者就会主动把发送进来的音讯设置为确认,而后将音讯移除(从内存或者磁盘中),即便这些音讯并没有达到消费者。
咱们来看一张图:
如上图所示,在 RabbitMQ 的 web 治理页面:
- Ready 示意待生产的音讯数量。
- Unacked 示意曾经发送给消费者然而还没收到消费者 ack 的音讯数量。
这是咱们能够从 UI 层面察看音讯的生产状况确认状况。
当咱们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,生产分成了两个局部:
- 待生产的音讯
- 曾经投递给消费者,然而还没有被消费者确认的音讯
换句话说,当设置 autoAck 为 false 的时候,消费者就变得十分从容了,它将有足够的工夫去解决这条音讯,当音讯失常解决实现后,再手动 ack,此时 RabbitMQ 才会认为这条音讯生产胜利了。如果 RabbitMQ 始终没有收到客户端的反馈,并且此时客户端也曾经断开连接了,那么 RabbitMQ 就会将刚刚的音讯从新放回队列中,期待下一次被生产。
综上所述,确保音讯被胜利生产,无非就是手动 Ack 或者主动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致音讯被反复生产,所以一般来说咱们还须要在解决音讯时,解决幂等性问题。
8.3 音讯回绝
当客户端收到音讯时,能够抉择生产这条音讯,也能够抉择回绝这条音讯。咱们来看下回绝的形式:
@Componentpublic class ConsumerDemo { @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) public void handle(Channel channel, Message message) { //获取音讯编号 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //回绝音讯 channel.basicReject(deliveryTag, true); } catch (IOException e) { e.printStackTrace(); } }}
消费者收到音讯之后,能够抉择回绝生产该条音讯,回绝的步骤分两步:
- 获取音讯编号 deliveryTag。
- 调用 basicReject 办法回绝音讯。
调用 basicReject 办法时,第二个参数是 requeue,即是否从新入队。如果第二个参数为 true,则这条被回绝的音讯会从新进入到音讯队列中,期待下一次被生产;如果第二个参数为 false,则这条被回绝的音讯就会被丢掉,不会有新的消费者去生产它了。
须要留神的是,basicReject 办法一次只能回绝一条音讯。
8.4 音讯确认
音讯确认分为主动确认和手动确认,咱们别离来看。
8.4.1 主动确认
先来看看主动确认,在 Spring Boot 中,默认状况下,音讯生产就是主动确认的。
咱们来看如下一个音讯生产办法:
@Componentpublic class ConsumerDemo { @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) public void handle2(String msg) { System.out.println("msg = " + msg); int i = 1 / 0; }}
通过 @Componet 注解将以后类注入到 Spring 容器中,而后通过 @RabbitListener 注解来标记一个音讯生产办法,默认状况下,音讯生产办法自带事务,即如果该办法在执行过程中抛出异样,那么被生产的音讯会从新回到队列中期待下一次被生产,如果该办法失常执行完没有抛出异样,则这条音讯就算是被生产了。
8.4.2 手动确认
手动确认我又把它分为两种:推模式手动确认与拉模式手动确认。
8.4.2.1 推模式手动确认
要开启手动确认,须要咱们首先敞开主动确认,敞开形式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
这个配置示意将音讯的确认模式改为手动确认。
接下来咱们来看下消费者中的代码:
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)public void handle3(Message message,Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //音讯生产的代码写到这里 String s = new String(message.getBody()); System.out.println("s = " + s); //生产实现后,手动 ack channel.basicAck(deliveryTag, false); } catch (Exception e) { //手动 nack try { channel.basicNack(deliveryTag, false, true); } catch (IOException ex) { ex.printStackTrace(); } }}
将消费者要做的事件放到一个 try..catch
代码块中。
如果音讯失常生产胜利,则执行 basicAck
实现确认。
如果音讯生产失败,则执行 basicNack
办法,通知 RabbitMQ 音讯生产失败。
这里波及到两个办法:
- basicAck:这个是手动确认音讯曾经胜利生产,该办法有两个参数:第一个参数示意音讯的 id;第二个参数 multiple 如果为 false,示意仅确认以后音讯生产胜利,如果为 true,则示意以后音讯之前所有未被以后消费者确认的音讯都生产胜利。
- basicNack:这个是通知 RabbitMQ 以后音讯未被胜利生产,该办法有三个参数:第一个参数示意音讯的 id;第二个参数 multiple 如果为 false,示意仅回绝以后音讯的生产,如果为 true,则示意回绝以后音讯之前所有未被以后消费者确认的音讯;第三个参数 requeue 含意和后面所说的一样,被回绝的音讯是否从新入队。
当 basicNack 中最初一个参数设置为 false 的时候,还波及到一个死信队列的问题,这个松哥当前再专门写文章和大家细聊。
8.4.2.2 拉模式手动确认
拉模式手动 ack 比拟麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的办法,所以咱们得用原生的方法,如下:
public void receive2() { Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); long deliveryTag = 0L; try { GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false); deliveryTag = getResponse.getEnvelope().getDeliveryTag(); System.out.println("o = " + new String((getResponse.getBody()), "UTF-8")); channel.basicAck(deliveryTag, false); } catch (IOException e) { try { channel.basicNack(deliveryTag, false, true); } catch (IOException ex) { ex.printStackTrace(); } }}
这里波及到的 basicAck 和 basicNack 办法跟后面的一样,我就不再赘述。
8.5 幂等性问题
最初咱们再来说说音讯的幂等性问题。
大家构想上面一个场景:
消费者在生产完一条音讯后,向 RabbitMQ 发送一个 ack 确认,此时因为网络断开或者其余起因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条音讯删除,当从新建设起连贯后,消费者还是会再次收到该条音讯,这就造成了音讯的反复生产。同时,因为相似的起因,音讯在发送的时候,同一条音讯也可能会发送两次(参见四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?)。种种原因导致咱们在生产音讯时,肯定要解决好幂等性问题。
幂等性问题的解决倒也不难,基本上都是从业务上来解决,我来大略说说思路。
采纳 Redis,在消费者生产音讯之前,现将音讯的 id 放到 Redis 中,存储形式如下:
- id-0(正在执行业务)
- id-1(执行业务胜利)
如果 ack 失败,在 RabbitMQ 将音讯交给其余的消费者时,先执行 setnx,如果 key 曾经存在(阐明之前有人生产过该音讯),获取他的值,如果是 0,以后消费者就什么都不做,如果是 1,间接 ack。
极其状况:第一个消费者在执行业务时,呈现了死锁,在 setnx 的根底上,再给 key 设置一个生存工夫。生产者,发送音讯时,指定 messageId。
当然这只是一个简略思路供大家参考。
松哥在 vhr 我的项目中也解决了音讯幂等性问题,感兴趣的小伙伴能够查看 vhr 源码(https://github.com/lenve/vhr),代码在 mailserver 中。
9. 了解 VirtualHost
当咱们第一次装置好一个 RabbitMQ 之后,咱们可能都会通过 Web 页面去治理这个 RabbitMQ,默认状况下,咱们第一次应用的默认用户是 guest。
登录胜利后,在 admin 选项卡能够查看所有用户:
能够看到,每个用户都有一个 Can access virtual hosts
属性,这个属性是啥意思呢?
明天松哥来和大家略微捋一捋。
9.1 多租户
RabbitMQ 中有一个概念叫做多租户,怎么了解呢?
咱们装置一个 RabbitMQ 服务器,每一个 RabbitMQ 服务器都能创立出许多虚构的音讯服务器,这些虚构的音讯服务器就是咱们所说的虚拟主机(virtual host),个别简称为 vhost。
实质上,每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有本人的音讯队列、音讯交换机以及相应的绑定关系等等,并且领有本人独立的权限,不同的 vhost 中的队列和交换机不能相互绑定,这样技能保障运行平安又能防止命名抵触。
咱们并不需要特地的去对待 vhost,他就跟一般的物理 RabbitMQ 一样,不同的 vhost 可能提供逻辑上的拆散,确保不同的利用音讯队列可能平安独立运行。
要我来说,咱们该怎么对待 vhost 和 RabbitMQ 的关系呢?RabbitMQ 相当于一个 Excel 文件,而 vhost 则是 Excel 文件中的一个个 sheet,咱们所有的操作都是在某一个 sheet 上进行操作。
实质上来说,vhost 算是 AMQP 协定中的概念。
9.2 命令行创立 vhost
先来看看如何通过命令行创立 vhost。
因为松哥这里的 RabbitMQ 是用 docker 装置的,所以咱们首先进入到 docker 容器中:
docker exec -it some-rabbit /bin/bash
而后执行如下命令创立一个名为 /myvh
的 vhost:
rabbitmqctl add_vhost myvh
最终执行后果如下:
而后通过如下命令能够查看已有的 vhost:
rabbitmqctl list_vhosts
当然这个命令也能够增加两个选项 name 和 tracing,name 示意 vhost 的名称,tracing 则示意是否应用了 tracing 性能(tracing 能够帮忙追踪 RabbitMQ 中音讯的流入流出状况),如下图:
能够通过如下命令删除一个 vhost:
rabbitmqctl delete_vhost myvh
当删除一个 vhost 的时候,与这个 vhost 相干的音讯队列、交换机以及绑定关系等,通通都会被删除。
给一个用户设置 vhost:
rabbitmqctl set_permissions -p myvh guest ".*" ".*" ".*"
后面参数都好说,最初面三个 ".*"
含意别离如下:
- 用户在所有资源上都领有可配置权限(创立/删除音讯队列、创立/删除交换机等)。
- 用户在所有资源上都领有写权限(发消息)。
- 用户在所有资源上都领有读权限(音讯生产,清空队列等)。
禁止一个用户拜访某个 vhost:
rabbitmqctl clear_permissions -p myvh guest
9.3 治理页面创立 vhost
当然咱们也能够在网页端治理 vhost:
在 admin 选项卡中,点击左边的 Virtual Hosts,如下:
而后点击下边的 Add a new virtual host ,能够增加一个新的 vhost:
进入到某一个 vhost 之后,能够批改其权限以及删除一个 vhost,如下图:
9.4 用户治理
因为 vhost 通常跟用户一起呈现,所以这里我也顺便说下 user 的相干操作。
增加一个用户名为 javaboy,明码为 123 的用户,形式如下:
rabbitmqctl add_user javaboy 123
通过如下命令能够批改用户明码(将 javaboy 的明码改为 123456):
rabbitmqctl change_password javaboy 123456
通过如下命令能够验证用户明码:
rabbitmqctl authenticate_user javaboy 123456
验证胜利和验证失败的状况别离如下:
通过如下命令能够查看以后的所有用户:
第一列是用户名,第二列是用户角色。
对于用户角色,我在上篇文章中曾经聊过了,这里就不再赘述。传送门:RabbitMQ 治理页面该如何应用。
给用户设置角色的命令如下(给 javaboy 设置 administrator 角色):
rabbitmqctl set_user_tags javaboy administrator
最初,删除一个用户的命令如下:
rabbitmqctl delete_user javaboy
10. REST API
对于 RabbitMQ 的治理,咱们能够通过网页来进行,在松哥后面的文章中也和小伙伴们做了相干的介绍了:
- RabbitMQ 治理页面该如何应用
不过呢,如果咱们装置了 rabbitmq_management 插件,即装置了 RabbitMQ 中的 Web 治理客户端,那么咱们就能够通过 REST API 来进行 RabbitMQ 的治理。
可能有小伙伴会问,这有什么用?
如果咱们的我的项目应用了如 Granglia 或者 Graphite 之类的图形工具,咱们想抓取以后 RabbitMQ 上音讯生产/累积的状况,就能够应用应用 REST API 去查问这些信息并将查问后果传输到新的图形工具上,同时,因为 REST API 就是 HTTP 申请,所以反对的客户端也是多样化,只有能发送 HTTP 申请,就能用,是不是特地不便?
10.1 REST API
可能有小伙伴还不懂什么是 REST API,这里就先简略科普下:
REST(Representational State Transfer)是一种 Web 软件架构格调,它是一种格调,而不是规范,匹配或兼容这种架构格调的的网络服务称为 REST 服务。
REST 服务简洁并且有档次,它通常基于 HTTP、URI、XML 以及 HTML 这些现有的宽泛风行的协定和规范。在 REST 中,资源是由 URI 来指定,对资源的增删改查操作能够通过 HTTP 协定提供的 GET、POST、PUT、DELETE 等办法实现。
应用 REST 能够更高效的利用缓存来进步响应速度,同时 REST 中的通信会话状态由客户端来保护,这能够让不同的服务器解决一系列申请中的不同申请,进而进步服务器的扩展性。
在前后端拆散我的项目中,一个设计良好的 Web 软件架构必然要满足 REST 格调。
10.2 开启 Web 治理页面
再来说说如何开启 Web 治理页面,整体上来说,咱们有两种形式开启 Web 治理页面:
- 装置 RabbitMQ 的时候,间接抉择
rabbitmq:3-management
镜像,装置命令如下:
docker run -d --rm --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
这样装置好的 RabbitMQ 就能够间接应用 Web 治理页面了。
- 装置的时候就抉择失常的一般镜像
rabbitmq:3
,装置命令如下:
docker run -d --hostname my-rabbit --name some-rabbit2 -p 5673:5672 -p 25672:15672 rabbitmq:3
这个装置好之后,须要咱们进入到容器中,而后手动开启 Web 治理插件,命令如下:
docker exec -it some-rabbit2 /bin/bashrabbitmq-plugins enable rabbitmq_management
第一条命令是进入到容器中,第二条命令开启 Web 治理插件,执行后果如下:
通过以上两种形式任意一种把 Web 治理页面关上,而后咱们就能够应用 REST API 了。
10.3 实际
接下来咱们就来体验几个常见的 REST API 操作。
咱们能够通过 CURL 工具来发送申请,也能够通过 POSTMAN 来发送申请,两者皆可,抉择本人喜爱的即可。松哥这里两种形式都和大家演示一下。
10.3.1 查看队列统计数据
例如咱们想查看虚拟主机 myvh 下 hello-queue 队列的数据统计,咱们能够通过如下形式来查看:
curl -i -u javaboy:123 http://localhost:15672/api/queues/myvh/hello-queue
-i
示意显示响应头信息。
最终执行后果如下:
能够看到,返回的信息有响应头,也有 JSON,不过返回的 JSON 没有格式化,看起来有点好受,如果返回的数据只有 JSON 而不蕴含响应头,那么咱们能够应用 python 来实现数据的格式化,如下:
能够看到,此时返回的数据就格式化了。
当然咱们也能够应用 POSTMAN 来发送这个申请,形式如下:
留神抉择认证形式为 Basic Auth,同时设置正确的用户名明码。
POSTMAN 申请还是不便很多。
10.3.2 创立队列
在 /myvh 虚拟主机下创立一个名为 javaboy-queue 的队列,应用 CURL 申请形式如下:
curl -i -u javaboy:123 -XPUT -H "Content-Type:application/json" -d '{"auto_delete":false,"durable":true}' http://localhost:15672/api/queues/myvh/javaboy-queue
留神申请形式是 PUT 申请,申请参数是 JSON 模式,JSON 里边有两个货色,一个 auto_delete
是说如果该队列没有任何消费者订阅的话,该队列是否会被主动删除(如果是一些长期队列,则该属性能够设置为 true);另外一个 durable
则是说队列是否长久化(长久化的队列,在 RabbitMQ 重启之后,队列仍然存在),如果大家用 Java 代码创立过队列,这两个参数很好了解,因为咱们用 Java 代码创立队列的时候这两个参数也会常常用到。
当然,咱们也能够用 POSTMAN 来发送申请:
返回 201 Created
示意队列创立胜利。
不过要留神在 Authorization 选项卡中设置用户名/明码:
10.3.3 查看以后连贯信息
咱们能够通过如下申请查看以后连贯信息:
申请如下:
curl -i -u javaboy:123 http://localhost:15672/api/connections
POSTMAN 查看形式如下:
10.3.4 查看以后用户信息
curl -i -u javaboy:123 http://localhost:15672/api/users
POSTMAN 查看信息如下:
10.3.5 创立一个用户
创立一个名为 zhangsan,明码是 123 ,角色是 administrator 的用户。
CURL:
curl -i -u javaboy:123 -H "{Content-Type:application/json}" -d '{"password":"123","tags":"administrator"}' -XPUT http://localhost:15672/api/users/zhangsan
POSTMAN:
10.3.6 为新用户设置 vhost
将名为 zhangsan 的用户设置到名为 myvh 的 vhost 下:
curl -i -u javaboy:123 -H "{Content-Type:application/json}" -d '{"configure":".*","write":".*","read":".*"}' -XPUT http://localhost:15672/api/permissions/myvh/zhangsan
参数是具体的权限信息:
POSTMAN 申请形式如下:
好啦,松哥这里轻易给大家举几个例子,其余 API 的用法,小伙伴们能够关上 RabbitMQ 的治理页面,点击下方的 HTTP API 按钮,里边有一个残缺的文档:
11. 常见操作命令
后面咱们介绍了一些 REST API,在不便发送 HTTP 申请的中央调用这些 REST API,还是十分不便的。然而,在一些不不便发送 HTTP 申请的中央,这些 REST API 用着并不太不便,那么明天松哥就给大家再来介绍 RabbitMQ 的另一种玩法---rabbitmqadmin。
11.1 rabbitmqadmin
咱们本人平时做练习,个别都会开启 RabbitMQ 的 Web 治理页面,然而在生产环境下,常常是没有 Web 治理页面的,只能通过 CLI 命令去治理 MQ。
其实呀,Web 治理页面尽管敌对,然而很多时候没有 CLI 快捷,而且通过 CLI 命令行的操作,咱们能够做更多的定制,例如将要害信息查出来后提供给集中的监控零碎以触发报警。
间接操作 CLI 命令行有点麻烦,RabbitMQ 提供了 CLI 管理工具 rabbitmqadmin ,其实就是基于 RabbitMQ 的 HTTP API,用 Python 写的一个脚本。因为 REST API 手动写申请还是挺麻烦的,这些脚本刚好替咱们简化了这个操作,让这个事件变得更加简略了。
应用 rabbitmqadmin 要先会装置它。
如果咱们创立 RabbitMQ 容器的时候应用的是 rabbitmq:3-management
镜像,那么默认状况下,rabbitmqadmin 就是装置好的。
否则可能须要咱们本人装置 rabbitmqadmin,装置形式很简略,
首先确认你的设施上装置了 Python,这是最根本的,因为 rabbitmqadmin 这个工具就是 Python 脚本。
而后开启 RabbitMQ 的 Web 治理页面,而后输出如下地址(我的治理页面度那口映射为 25672):
http://localhost:25672/cli/index.html
在关上的页面中就能够看到 rabbitmqadmin 的下载链接。将 rabbitmqadmin 下载下来后,而后赋予其可执行权限即可:
chmod +x rabbitmqadmin
下载后的 rabbitmqadmin 咱们能够间接用记事本关上,里边其实就是一堆 Python 脚本。
这套流程操作下来还是挺麻烦的,所以,我倡议大家间接应用 rabbitmq:3-management
镜像,一步到位。
11.2 rabbitmqadmin 的性能
- 列出 exchanges, queues, bindings, vhosts, users, permissions, connections and channels。
- 创立和删除 exchanges, queues, bindings, vhosts, users and permissions。
- 公布和获取音讯,以及音讯详情。
- 敞开连贯和清空队列。
- 导入导出配置。
接下来松哥就这些性能逐个和小伙伴们进行介绍。
11.3 列出各种信息
查看所有交换机:
rabbitmqadmin list exchanges
查看所有队列:
rabbitmqadmin list queues
查看所有 Binding:
rabbitmqadmin list bindings
查看所有虚拟主机:
rabbitmqadmin list vhosts
查看所有用户信息:
rabbitmqadmin list users
查看所有权限信息:
rabbitmqadmin list permissions
查看所有连贯信息:
rabbitmqadmin list connections
查看所有通道信息:
rabbitmqadmin list channels
11.4 一个残缺的例子
接下来咱们用 rabbitmqadmin 来写一个残缺的音讯收发例子看看。
首先创立一个名为 javaboy-exchange 的交换机:
rabbitmqadmin declare exchange name=javaboy-exchange durable=true auto_delete=false type=direct
这里各种参数都好了解,我就不多说了。
接下来创立一个名为 javaboy-queue 的队列:
rabbitmqadmin declare queue name=javaboy-queue durable=true auto_delete=false
接下来再创立一个 Binding,将交换机和音讯队列绑定起来:
rabbitmqadmin declare binding source=javaboy-exchange destination=javaboy-queue routing_key=javaboy-routing
这里波及到到三个概念:
- source:源,其实就是指交换机。
- destination:指标,其实就是指音讯队列。
- routing_key:这个就是路由的 key。
接下来公布一条音讯:
rabbitmqadmin publish routing_key=javaboy-queue payload="hello javaboy"
这里参数都很简略,没啥好说的。
查看队列中的音讯(只查看,不生产,看完之后音讯还在):
rabbitmqadmin get queue=javaboy-queue
清空一个队列中的音讯:
rabbitmqadmin purge queue name=javaboy-queue
11.5 命令一览
表格字体有点小,大家在公众号【江南一点雨】后盾回复 rabbitmqadmin 获取 Excel 文档链接。
12. RabbitMQ 权限零碎
不论咱们是通过网页还是通过命令行工具创立用户对象,刚创立好的用户对象都是没法间接应用的,须要咱们首先把这个用户置于某一个 vhost 之下,而后再赋予其权限,有了权限,这个用户才能够失常应用。
那么明天咱们就来理解一下 RabbitMQ 中的权限零碎,看下这个权限零碎是什么样子的。
12.1 RabbitMQ 权限零碎介绍
RabbitMQ 是从 1.6 这个版本开始实现了一套 ACL 格调的权限零碎,可能有小伙伴还不晓得什么是 ACL 格调的权限零碎,能够看看松哥之前发的这两篇文章:
- Spring Security 中如何细化权限粒度?
- 一个案例演示 Spring Security 中粒度超细的权限管制!
在这套 ACL 格调的权限管理系统中,容许十分多细粒度的权限管制,能够为不同用户别离设置读、写以及配置等权限。
这里波及到三种不同的权限:
- 读:和音讯生产无关的所有操作,包含革除整个队列的音讯。
- 写:公布音讯。
- 配置:音讯队列、交换机等的创立和删除。
这是 RabbitMQ 权限零碎的一个简略介绍。
12.2 操作和权限的对应关系
接下来,下图展现了操作和权限的对应关系:
公众号后盾回复 rabbitmq_permission
能够获取这张图的 Excel 表格。
执行什么命令,须要什么权限,这张图形容的一清二楚了。
12.3 权限操作命令
RabbitMQ 中权限操作命令格局如下:
rabbitmqctl set_permissions [-p vhosts] {user} {conf} {write} {read}
这里有几个参数:
- [-p vhost]:授予用户拜访权限的 vhost 名称,如果不写默认为
/
。 - user:用户名。
- conf:用户在哪些资源上领有可配置权限(反对正则表达式)。
- write:用户在哪些资源上领有写权限(反对正则表达式)。
- read:用户在哪些资源上领有读权限(反对正则表达式)。
至于可配置权限能干嘛,写权限能干嘛,读权限能干嘛,大家能够参考第二大节,这里不再赘述。
松哥来举一个简略的例子。
假如咱们有一个名为 zhangsan
的用户,咱们心愿该用户在 myvh 虚拟主机下具备所有权限,那么咱们的操作命令如下:
rabbitmqctl set_permissions -p myvh zhangsan ".*" ".*" ".*"
执行后果如下:
接下来执行如下命令能够验证受权是否胜利:
rabbitmqctl -p myvh list_permissions
能够看到,张三的权限曾经赋值到位。
在下面的受权命令中,咱们用的都是 ".*"
,松哥再额定说下这个通配符:
".*"
:这个示意匹配所有的交换机和队列。"javaboy-.*"
:这个示意匹配名字以javaboy-
结尾的交换机和队列。""
:这个示意不匹配任何队列与交换机(如果想撤销用户的权限能够应用这个)。
咱们能够应用如下命令来移除某一个用户在某一个 vhost 上的权限,例如移除 zhangsan 在 myvh 上的所有权限,如下:
rabbitmqctl clear_permissions -p myvh zhangsan
执行实现后,咱们能够通过 rabbitmqctl -p myvh list_permissions
命令来查看执行后果是否失效,最终执行成果如下:
如果一个用户在多个 vhost 上都有对应的权限,依照下面的 rabbitmqctl -p myvh list_permissions
命令只能查看一个 vhost 上的权限,此时咱们能够通过如下命令来查看 lisi
在所有 vhost 上的权限:
rabbitmqctl list_user_permissions lisi
12.4 Web 治理页面操作
当然,如果你不想敲命令,也能够通过 Web 治理端去操作权限。
在 Admin 选项卡,点击用户名称,就能够给用户设置权限了,如下:
能够设置权限,也能够革除权限。
当然,在网页上还有一个 Topic Permissions,这是 RabbitMQ3.7 开始的一个新性能,能够针对某一个 topic exchange
设置权限,次要针对 STOMP 或者 MQTT 协定,咱们日常 Java 开发用上这个配置的机会很少。如果用户不设置的话,相应的 topic exchange
也总是有权限的。
13. RabbitMQ 集群搭建
单个的 RabbitMQ 必定无奈实现高可用,要想高可用,还得上集群。
明天松哥就来和大家聊一聊 RabbitMQ 集群的搭建。
13.1 两种模式
说到集群,小伙伴们可能第一个问题是,如果我有一个 RabbitMQ 集群,那么是不是我的音讯集群中的每一个实例都保留一份呢?
这其实就波及到 RabbitMQ 集群的两种模式:
- 一般集群
- 镜像集群
13.1.1 一般集群
一般集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行音讯通信。
此时咱们创立的队列 Queue,它的元数据(次要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,然而队列中的音讯只会存在于一个 RabbitMQ 实例上,而不会同步到其余队列。
当咱们生产音讯的时候,如果连贯到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的地位,而后拜访 Queue 所在的实例,拉取数据过去发送给消费者。
这种集群能够进步 RabbitMQ 的音讯吞吐能力,然而无奈保障高可用,因为一旦一个 RabbitMQ 实例挂了,音讯就没法拜访了,如果音讯队列做了长久化,那么等 RabbitMQ 实例复原后,就能够持续拜访了;如果音讯队列没做长久化,那么音讯就丢了。
大抵的流程图如下图:
13.1.2 镜像集群
它和一般集群最大的区别在于 Queue 数据和原数据不再是独自存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(正本数据)。每次写入音讯的时候都会主动把数据同步到多台实例下来,这样一旦其中一台机器产生故障,其余机器还有一份正本数据能够持续提供服务,也就实现了高可用。
大抵流程图如下图:
13.1.3 节点类型
RabbitMQ 中的节点类型有两种:
- RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,益处是能够使得交换机和队列申明等操作速度更快。
- Disk node:将元数据存储在磁盘中,单节点零碎只容许磁盘类型的节点,避免重启 RabbitMQ 的时候,失落零碎的配置信息
RabbitMQ 要求在集群中至多有一个磁盘节点,所有其余节点能够是内存节点,当节点退出或者来到集群时,必须要将该变更告诉到至多一个磁盘节点。如果集群中惟一的一个磁盘节点解体的话,集群依然能够放弃运行,然而无奈进行其余操作(增删改查),直到节点复原。为了确保集群信息的可靠性,或者在不确定应用磁盘节点还是内存节点的时候,倡议间接用磁盘节点。
13.2 搭建一般集群
13.2.1 准备常识
大抵的构造理解了,接下来咱们就把集群给搭建起来。先从一般集群开始,咱们就应用 docker 来搭建。
搭建之前,有两个准备常识须要大家理解:
- 搭建集群时,节点中的 Erlang Cookie 值要统一,默认状况下,文件在 /var/lib/rabbitmq/.erlang.cookie,咱们在用 docker 创立 RabbitMQ 容器时,能够为之设置相应的 Cookie 值。
- RabbitMQ 是通过主机名来连贯服务,必须保障各个主机名之间能够 ping 通。能够通过编辑 /etc/hosts 来手工增加主机名和 IP 对应关系。如果主机名 ping 不通,RabbitMQ 服务启动会失败(如果咱们是在不同的服务器上搭建 RabbitMQ 集群,大家须要留神这一点,接下来的 2.2 小结,咱们将通过 Docker 的容器连贯 link 来实现容器之间的拜访,略有不同)。
13.2.2 开始搭建
执行如下命令创立三个 RabbitMQ 容器:
docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-managementdocker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-managementdocker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
运行后果如下:
三个节点当初就启动好了,留神在 mq02 和 mq03 中,别离应用了 --link
参数来实现容器连贯,对于这个参数,如果大家不懂,能够在公众号江南一点雨后盾回复 docker,由松哥写的 docker 入门教程,里边有讲这个。这里我就不啰嗦了。另外还须要留神,mq03 容器中要既可能连贯 mq01 也可能连贯 mq02。
接下来进入到 mq02 容器中,首先查看一下 hosts 文件,能够看到咱们配置的容器连贯曾经失效了:
未来在 mq02 容器中,就能够通过 mylink01 或者 rabbit01 拜访到 mq01 容器了。
接下来咱们开始集群的配置。
别离执行如下命令将 mq02 容器退出集群中:
rabbitmqctl stop_apprabbitmqctl join_cluster rabbit@rabbit01rabbitmqctl start_app
接下来输出如下命令咱们能够查看集群的状态:
rabbitmqctl cluster_status
能够看到,集群中曾经有两个节点了。
接下来通过雷同的形式将 mq03 也退出到集群中:
rabbitmqctl stop_apprabbitmqctl join_cluster rabbit@rabbit01rabbitmqctl start_app
接下来,咱们能够查看集群信息:
能够看到,此时集群中曾经有三个节点了。
其实,这个时候,咱们也能够通过网页来查看集群信息,在三个 RabbitMQ 实例的 Web 端首页,都能够看到如下内容:
13.2.3 代码测试
接下来咱们来简略测试一下这个集群。
咱们创立一个名为 mq_cluster_demo 的父工程,而后在其中创立两个子工程。
第一个子工程名为 provider,是一个音讯生产者,创立时引入 Web 和 RabbitMQ 依赖,如下:
而后配置 applicaiton.properties,内容如下(留神集群配置):
spring.rabbitmq.addresses=localhost:5671,localhost:5672,localhost:5673spring.rabbitmq.username=guestspring.rabbitmq.password=guest
接下来提供一个简略的队列,如下:
@Configurationpublic class RabbitConfig { public static final String MY_QUEUE_NAME = "my_queue_name"; public static final String MY_EXCHANGE_NAME = "my_exchange_name"; public static final String MY_ROUTING_KEY = "my_queue_name"; @Bean Queue queue() { return new Queue(MY_QUEUE_NAME, true, false, false); } @Bean DirectExchange directExchange() { return new DirectExchange(MY_EXCHANGE_NAME, true, false); } @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()) .with(MY_ROUTING_KEY); }}
这个没啥好说的,都是根本内容,接下来咱们在单元测试中进行音讯发送测试:
@SpringBootTestclass ProviderApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend(null, RabbitConfig.MY_QUEUE_NAME, "hello 江南一点雨"); }}
这条音讯发送胜利之后,在 RabbitMQ 的 Web 治理端,咱们会看到三个 RabbitMQ 实例上都会显示有一条音讯,然而实际上音讯自身只存在于一个 RabbitMQ 实例。
接下来咱们再创立一个音讯消费者,音讯消费者的依赖以及配置和音讯生产者都是截然不同,我就不反复了,音讯消费者中减少一个音讯接收器:
@Componentpublic class MsgReceiver { @RabbitListener(queues = RabbitConfig.MY_QUEUE_NAME) public void handleMsg(String msg) { System.out.println("msg = " + msg); }}
当音讯消费者启动胜利后,这个办法中只收到一条音讯,进一步验证了咱们搭建的 RabbitMQ 集群是没问题的。
13.2.4 反向测试
接下来松哥再举两个反例,以证实音讯并没有同步到其余 RabbitMQ 实例。
确保三个 RabbitMQ 实例都是启动状态,敞开掉 Consumer,而后通过 provider 发送一条音讯,发送胜利之后,敞开 mq01 实例,而后启动 Consumer 实例,此时 Consumer 实例并不会生产音讯,反而会报错说 mq01 实例连贯不上,这个例子就能够阐明音讯在 mq01 上,并没有同步到另外两个 MQ 上。相同,如果 provider 发送音讯胜利之后,咱们没有敞开 mq01 实例而是敞开了 mq02 实例,那么你就会发现音讯的生产不受影响。
13.3 搭建镜像集群
所谓的镜像集群模式并不需要额定搭建,只须要咱们将队列配置为镜像队列即可。
这个配置能够通过网页配置,也能够通过命令行配置,咱们别离来看。
13.3.1 网页配置镜像队列
先来看看网页上如何配置镜像队列。
点击 Admin 选项卡,而后点击左边的 Policies,再点击 Add/update a policy
,如下图:
接下来增加一个策略,如下图:
各参数含意如下:
- Name: policy 的名称。
- Pattern: queue 的匹配模式(正则表达式)。
Definition:镜像定义,次要有三个参数:ha-mode, ha-params, ha-sync-mode。
- ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 示意在集群中所有的节点上进行镜像(默认即此);exactly 示意在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 示意在指定的节点上进行镜像,节点名称通过 ha-params 指定。
- ha-params:ha-mode 模式须要用到的参数。
- ha-sync-mode:进行队列中音讯的同步形式,有效值为 automatic 和 manual。
- priority 为可选参数,示意 policy 的优先级。
配置实现后,点击上面的 add/update policy
按钮,实现策略的增加,如下:
增加实现后,咱们能够进行一个简略的测试。
首先确认三个 RabbitMQ 都启动了,而后用下面的 provider 向音讯队列发送一条音讯。
发完之后敞开 mq01 实例。
接下来启动 consumer,此时发现 consumer 能够实现音讯的生产(留神和后面的反向测试辨别),这就阐明镜像队列曾经搭建胜利了。
13.3.2 命令行配置镜像队列
命令行的配置格局如下:
rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
举一个简略的配置案例:
rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'