Spring Boot(十三)RabbitMQ安装与集成

32次阅读

共计 13643 个字符,预计需要花费 35 分钟才能阅读完成。

一、前言
RabbitMQ 是一个开源的消息代理软件(面向消息的中间件),它的核心作用就是创建消息队列,异步接收和发送消息,MQ 的全程是:Message Queue 中文的意思是消息队列。
<!–more–>
1.1 使用场景

削峰填谷:用于应对间歇性流量提升对于系统的“破坏”,比如秒杀活动,可以把请求先发送到消息队列在平滑的交由系统去处理,当访问量大于一定数量的时候,还可以直接屏蔽后续操作,给前台的用户友好的显示;
延迟处理:可以进行事件后置,比如订单超时业务,用户下单 30 分钟未支付取消订单;
系统解耦:消息队列也可以帮开发人员完成业务的解耦,比如用户上传头像的功能,最初的设计是用户上传完之后才能发帖,后面有增加了经验系统,需要在上传头像之后增加经验值,到后来又上线了金币系统,上传头像之后可以增加金币,像这种需求的不断升级,如果在业务代码里面写死每次该业务代码是很不优雅的,这个时候如果使用消息队列,那么只需要增加一个订阅器用于介绍用户上传头像的消息,再执行经验的增加和金币的增加是非常简单的,并且在不改动业务模块业务代码的基础上可以轻松实现,如果后期需要撤销某个模块了,只需要删除订阅器即可,就这样就降低了系统开发的耦合性;

1.2 为什么使用 RabbitMQ?
现在市面上比较主流的消息队列还有 Kafka、RocketMQ、RabbitMQ,它们的介绍和区别如下:

Kafka 是 LinkedIn 开源的分布式发布 - 订阅消息系统,目前归属于 Apache 定级项目。Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8 版本开始支持复制,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布 / 订阅)、可靠性、安全。AMQP 协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RocketMQ 是阿里开源的消息中间件,它是纯 Java 开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ 思路起源于 Kafka,但并不是 Kafka 的一个 Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog 分发等场景。

简单总结:Kafka 的性能最好,适用于对消息吞吐量达,对消息丢失不敏感的系统;RocketMQ 借鉴了 Kafka 并提高了消息的可靠性,修复了 Kafka 的不足;RabbitMQ 性能略低于 Kafka,并实现了 AMQP(Advanced Message Queuing Protocol)高级消息队列协议的标准,有非常好的稳定性。
支持语言对比

RocketMQ 支持语言:Java、C++、Golang
Kafka 支持语言:Java、Scala
RabbitMQ 支持语言:C#、Java、Js/NodeJs、Python、Ruby、Erlang、Perl、Clojure、Golang

1.3 RabbitMQ 特点
RabbitMQ 的特点是易用、扩展性好(集群访问)、高可用,具体如下:

可靠性:持久化、消息确认、事务等保证了消息的可靠性;
伸缩性:集群服务,可以很方便的添加服务器来提高系统的负载;
高可用:集群状态下部分节点出现问题依然可以运行;
多语言支持:RabbitMQ 几乎支持了所有的语言,比如 Java、.Net、Nodejs、Golang 等;
易用的管理页面:RabbitMQ 提供了易用了网页版的管理监控系统,可以很方便的完成 RabbitMQ 的控制和查看;
插件机制:RabbitMQ 提供了许多插件,可以丰富和扩展 Rabbit 的功能,用户也可编写自己的插件;

1.4 RabbitMQ 基础知识
在了解消息通讯之前首先要了解 3 个概念:生产者、消费者和代理。
生产者:消息的创建者,负责创建和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
(一)消息发送原理
首先你必须连接到 Rabbit 才能发布和消费消息,那怎么连接和发送消息的呢?
你的应用程序和 Rabbit Server 之间会创建一个 TCP 连接,一旦 TCP 打开,并通过了认证,认证就是你试图连接 Rabbit 之前发送的 Rabbit 服务器连接信息和用户名和密码,有点像程序连接数据库,使用 Java 有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和 Rabbit 就创建了一条 AMQP 信道(Channel)。
信道是创建在“真实”TCP 上的虚拟连接,AMQP 命令都是通过信道发送出去的,每个信道都会有一个唯一的 ID,不论是发布消息,订阅队列或者接收消息都是通过信道完成的。
(二)为什么不通过 TCP 直接发送命令?
对于操作系统来说创建和销毁 TCP 会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条 TCP 会话,这就造成了 TCP 连接的巨大浪费,而且操作系统每秒能创建的 TCP 也是有限的,因此很快就会遇到系统瓶颈。
如果我们每个请求都使用一条 TCP 连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

(三)RabbitMQ 名称解释
ConnectionFactory(连接管理器):应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:

1.5 交换器分类
RabbitMQ 的 Exchange(交换器)分为四类:

direct(默认)
headers
fanout
topic

其中 headers 交换器允许你匹配 AMQP 消息的 header 而非路由键,除此之外 headers 交换器和 direct 交换器完全一致,但性能却很差,几乎用不到,所以我们这里不做解释。
1.5.1 direct 交换器
direct 为默认的交换器类型,也非常的简单,如果路由键匹配的话,消息就投递到相应的队列,如下图:

1.5.2 fanout 交换器
fanout 有别于 direct 交换器,fanout 是一种发布 / 订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
注意:对于 fanout 交换器来说 routingKey(路由键)是无效的,这个参数是被忽略的。
1.5.3 topic 交换器
topic 交换器运行和 fanout 类似,但是可以更灵活的匹配自己想要订阅的信息,这个时候 routingKey 路由键就排上用场了,使用路由键进行消息(规则)匹配。
topic 路由器的关键在于定义路由键,定义 routingKey 名称不能超过 255 字节,使用“.”作为分隔符,例如:com.mq.rabbit.error。
匹配规则
匹配表达式可以用“*”和“#”匹配任何字符,具体规则如下:

“*”匹配一个分段 (用“.”分割) 的内容;
“#”匹配所有字符;

例如发布了一个“cn.mq.rabbit.error”的消息:
能匹配上的路由键:

cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
#

不能匹配上的路由键:

cn.mq.*
*.error
*

1.6 消息持久化
RabbitMQ 队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证 Rabbit 在重启的时候不丢失呢?答案就是消息持久化。
当你把消息发送到 Rabbit 服务器的时候,你需要选择你是否要进行持久化,但这并不能保证 Rabbit 能从崩溃中恢复,想要 Rabbit 消息能恢复必须满足 3 个条件:

投递消息的时候 durable 设置为 true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数 2 设置为 true 持久化;
设置投递模式 deliveryMode 设置为 2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数 3 设置为存储纯文本到磁盘;
消息已经到达持久化交换器上;
消息已经到达持久化的队列;

持久化工作原理
Rabbit 会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit 会把这条消息标识为等待垃圾回收。
持久化的缺点
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用 SSD 硬盘可以使事情得到缓解,但他仍然吸干了 Rabbit 的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
所以使用者要根据自己的情况,选择适合自己的方式。
学习更多 RabbitMQ 知识,访问:https://gitbook.cn/gitchat/ac…
二、在 Docker 中安装 RabbitMQ
(1)下载镜像
https://hub.docker.com/r/libr…

alpine 轻量版
management 带插件的版本

从镜像的大小也可以很直观的看出来 alpine 是轻量版。
使用命令:
docker pull rabbitmq:3.7.7-management
下载带 management 插件的版本。
(2)运行 RabbitMQ
使用命令:
docker run -d –hostname myrabbit –name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management

-d 后台运行
–hostname 主机名称
–name 容器名称
-p 15672:15672 http 访问端口,映射本地端口到容器端口
-p 5672:5672 amqp 端口,映射本地端口到容器端口

正常启动之后,访问:http://localhost:15672/
登录网页管理页面,用户名密码:guest/guest,登录成功如下图:

三、RabbitMQ 集成
3.1 添加依赖
如果用 Idea 创建新项目,可以直接在创建 Spring Boot 的时候,点击“Integration”面板,选择 RabbitMQ 集成,如下图:

如果是老 Maven 项目,直接在 pom.xml 添加如下代码:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 配置 RabbitMQ 信息
在 application.properties 设置如下信息:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
3.3 代码
3.3 代码实现
本节分别来看三种交换器:direct、fanout、topic 的实现代码。
3.3.1 Direct Exchange
3.3.1.1 配置队列
创建 DirectConfig.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {
final static String QUEUE_NAME = “direct”; // 队列名称
final static String EXCHANGE_NAME = “mydirect”; // 交换器名称
@Bean
public Queue queue() {
// 声明队列 参数一:队列名称;参数二:是否持久化
return new Queue(DirectConfig.QUEUE_NAME, false);
}
// 配置默认的交换机,以下部分都可以不配置,不设置使用默认交换器(AMQP default)
@Bean
DirectExchange directExchange() {
// 参数一:交换器名称;参数二:是否持久化;参数三:是否自动删除消息
return new DirectExchange(DirectConfig.EXCHANGE_NAME, false, false);
}
// 绑定“direct”队列到上面配置的“mydirect”路由器
@Bean
Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(DirectConfig.QUEUE_NAME);
}
}
3.3.1.2 发送消息
创建 Sender.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 消息发送者 - 生产消息
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void driectSend(String message) {
System.out.println(“Direct 发送消息:” + message);
// 参数一:交换器名称,可以省略(省略存储到 AMQP default 交换器);参数二:路由键名称(direct 模式下路由键 = 队列名称);参数三:存储消息
this.rabbitTemplate.convertAndSend(“direct”, message);
}
}
注意:

在 direct 交换器中,路由键名称就是队列的名称;
发送消息“convertAndSend”的时候,第一个参数为交换器的名称,非必填可以忽略,如果忽略则会把消息发送到默认交换器“AMQP default”;

3.3.1.3 消费消息
创建 Receiver.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 消息接收者 - 消费消息
*/
@Component
@RabbitListener(queues = “direct”)
public class Receiver {
@Autowired
private AmqpTemplate rabbitTemplate;
@RabbitHandler
/**
* 监听消费消息
*/
public void process(String message) {
System.out.println(“Direct 消费消息:” + message);
}
}
3.3.1.4 测试代码
使用 Spring Boot 中的默认测试框架 JUnit 进行单元测试,不了解 JUnit 的可以参考我的上一篇文章,创建 MQTest.java 代码如下:
package com.example.rabbitmq.mq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
import static org.junit.Assert.*;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
@Autowired
private Sender sender;
@Test
public void driectTest() {
SimpleDateFormat sf = new SimpleDateFormat(“yyyy-MM-dd”);
sender.driectSend(“Driect Data:” + sf.format(new Date()));
}
}
执行之后,效果如下图:

表示消息已经被发送并被消费了。
3.3.2 Fanout Exchange
3.3.2.1 配置队列
创建 FanoutConfig.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
final static String QUEUE_NAME = “fanout”; // 队列名称
final static String QUEUE_NAME2 = “fanout2”; // 队列名称
final static String EXCHANGE_NAME = “myfanout”; // 交换器名称
@Bean
public Queue queueFanout() {
return new Queue(FanoutConfig.QUEUE_NAME);
}
@Bean
public Queue queueFanout2() {
return new Queue(FanoutConfig.QUEUE_NAME2);
}
// 配置交换器
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue(Queue queueFanout, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout).to(fanoutExchange);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue2(Queue queueFanout2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout2).to(fanoutExchange);
}
}
3.3.2.2 发送消息
创建 FanoutSender.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message) {
System.out.println(“ 发送消息:” + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME, message);
}
public void send2(String message) {
System.out.println(“ 发送消息 2:” + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME2, message);
}
}
3.3.2.3 消费消息
创建两个监听类,第一个 FanoutReceiver.java 代码如下:
package com.example.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@RabbitListener(queues = “fanout”)
public class FanoutReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println(“Fanout(FanoutReceiver)消费消息:” + msg);
}
}
第二个 FanoutReceiver2.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = “fanout2”)
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println(“Fanout(FanoutReceiver2)消费消息:” + message);
}
}
3.3.2.4 测试代码
创建 FanoutTest.java 代码如下:
package com.example.rabbitmq.mq;
import com.example.rabbitmq.RabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender sender;

@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat(“yyyy-MM-dd”);
sender.send(“Time1 => ” + sf.format(new Date()));
sender.send2(“Date2 => ” + sf.format(new Date()));
}
}
运行测试代码,输出结果如下:
发送消息:Time1 => 2018-09-11
发送消息 2:Date2 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Date2 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Date2 => 2018-09-11
总结:可以看出 fanout 会把消息分发到所有订阅到该交换器的队列,fanout 模式是忽略路由键的。
3.3.3 Topic Exchange
3.3.3.1 配置队列
@Configuration
public class TopicConfig {
final static String QUEUE_NAME = “log”;
final static String QUEUE_NAME2 = “log.all”;
final static String QUEUE_NAME3 = “log.all.error”;
final static String EXCHANGE_NAME = “topicExchange”; // 交换器名称
@Bean
public Queue queuetopic() {
return new Queue(TopicConfig.QUEUE_NAME);
}
@Bean
public Queue queuetopic2() {
return new Queue(TopicConfig.QUEUE_NAME2);
}
@Bean
public Queue queuetopic3() {
return new Queue(TopicConfig.QUEUE_NAME3);
}
// 配置交换器
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TopicConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器,并设置路由键(log.#)
@Bean
Binding bindingtopicExchangeQueue(Queue queuetopic, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic).to(topicExchange).with(“log.#”);
}
// 绑定队列到交换器,并设置路由键(log.*)
@Bean
Binding bindingtopicExchangeQueue2(Queue queuetopic2, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic2).to(topicExchange).with(“log.*”);
}
// 绑定队列到交换器,并设置路由键(log.*.error)
@Bean
Binding bindingtopicExchangeQueue3(Queue queuetopic3, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic3).to(topicExchange).with(“log.*.error”);
}
}
3.3.3.2 发布消息
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void topicSender(String message) {
String routingKey = “log.all.error”;
System.out.println(routingKey + ” 发送消息:” + message);
this.rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, message);
}
}
3.3.3.3 消费消息
@Component
@RabbitListener(queues = “log”)
public class TopicReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println(“log.# 消费消息:” + msg);
}
}
@Component
@RabbitListener(queues = “log.all”)
public class TopicReceiver2 {
@RabbitHandler
public void process(String msg) {
System.out.println(“log.* 消费消息:” + msg);
}
}
@Component
@RabbitListener(queues = “log.all.error”)
public class TopicReceiver3 {
@RabbitHandler
public void process(String msg) {
System.out.println(“log.*.error 消费消息:” + msg);
}
}
3.3.3.4 测试代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void Test() {
SimpleDateFormat sf = new SimpleDateFormat(“yyyy-MM-dd”);
fanoutSender.send(“Time1 => ” + sf.format(new Date()));
fanoutSender.send2(“Date2 => ” + sf.format(new Date()));
}
}
输出结果:
log.all.error 发送消息:time => 2018-09-11
log.# 消费消息:time => 2018-09-11
log.*.error 消费消息:time => 2018-09-11
总结:在 Topic Exchange 中“#”可以匹配所有内容,而“*”则是匹配一个字符段的内容。
以上示例代码 Github 地址:https://github.com/vipstone/s…
参考文档
阿里 RocketMQ 优势对比:https://juejin.im/entry/5a0ab…

正文完
 0

Spring Boot(十三)RabbitMQ安装与集成

32次阅读

共计 13643 个字符,预计需要花费 35 分钟才能阅读完成。

一、前言
RabbitMQ 是一个开源的消息代理软件(面向消息的中间件),它的核心作用就是创建消息队列,异步接收和发送消息,MQ 的全程是:Message Queue 中文的意思是消息队列。
<!–more–>
1.1 使用场景

削峰填谷:用于应对间歇性流量提升对于系统的“破坏”,比如秒杀活动,可以把请求先发送到消息队列在平滑的交由系统去处理,当访问量大于一定数量的时候,还可以直接屏蔽后续操作,给前台的用户友好的显示;
延迟处理:可以进行事件后置,比如订单超时业务,用户下单 30 分钟未支付取消订单;
系统解耦:消息队列也可以帮开发人员完成业务的解耦,比如用户上传头像的功能,最初的设计是用户上传完之后才能发帖,后面有增加了经验系统,需要在上传头像之后增加经验值,到后来又上线了金币系统,上传头像之后可以增加金币,像这种需求的不断升级,如果在业务代码里面写死每次该业务代码是很不优雅的,这个时候如果使用消息队列,那么只需要增加一个订阅器用于介绍用户上传头像的消息,再执行经验的增加和金币的增加是非常简单的,并且在不改动业务模块业务代码的基础上可以轻松实现,如果后期需要撤销某个模块了,只需要删除订阅器即可,就这样就降低了系统开发的耦合性;

1.2 为什么使用 RabbitMQ?
现在市面上比较主流的消息队列还有 Kafka、RocketMQ、RabbitMQ,它们的介绍和区别如下:

Kafka 是 LinkedIn 开源的分布式发布 - 订阅消息系统,目前归属于 Apache 定级项目。Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8 版本开始支持复制,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布 / 订阅)、可靠性、安全。AMQP 协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RocketMQ 是阿里开源的消息中间件,它是纯 Java 开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ 思路起源于 Kafka,但并不是 Kafka 的一个 Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog 分发等场景。

简单总结:Kafka 的性能最好,适用于对消息吞吐量达,对消息丢失不敏感的系统;RocketMQ 借鉴了 Kafka 并提高了消息的可靠性,修复了 Kafka 的不足;RabbitMQ 性能略低于 Kafka,并实现了 AMQP(Advanced Message Queuing Protocol)高级消息队列协议的标准,有非常好的稳定性。
支持语言对比

RocketMQ 支持语言:Java、C++、Golang
Kafka 支持语言:Java、Scala
RabbitMQ 支持语言:C#、Java、Js/NodeJs、Python、Ruby、Erlang、Perl、Clojure、Golang

1.3 RabbitMQ 特点
RabbitMQ 的特点是易用、扩展性好(集群访问)、高可用,具体如下:

可靠性:持久化、消息确认、事务等保证了消息的可靠性;
伸缩性:集群服务,可以很方便的添加服务器来提高系统的负载;
高可用:集群状态下部分节点出现问题依然可以运行;
多语言支持:RabbitMQ 几乎支持了所有的语言,比如 Java、.Net、Nodejs、Golang 等;
易用的管理页面:RabbitMQ 提供了易用了网页版的管理监控系统,可以很方便的完成 RabbitMQ 的控制和查看;
插件机制:RabbitMQ 提供了许多插件,可以丰富和扩展 Rabbit 的功能,用户也可编写自己的插件;

1.4 RabbitMQ 基础知识
在了解消息通讯之前首先要了解 3 个概念:生产者、消费者和代理。
生产者:消息的创建者,负责创建和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
(一)消息发送原理
首先你必须连接到 Rabbit 才能发布和消费消息,那怎么连接和发送消息的呢?
你的应用程序和 Rabbit Server 之间会创建一个 TCP 连接,一旦 TCP 打开,并通过了认证,认证就是你试图连接 Rabbit 之前发送的 Rabbit 服务器连接信息和用户名和密码,有点像程序连接数据库,使用 Java 有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和 Rabbit 就创建了一条 AMQP 信道(Channel)。
信道是创建在“真实”TCP 上的虚拟连接,AMQP 命令都是通过信道发送出去的,每个信道都会有一个唯一的 ID,不论是发布消息,订阅队列或者接收消息都是通过信道完成的。
(二)为什么不通过 TCP 直接发送命令?
对于操作系统来说创建和销毁 TCP 会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条 TCP 会话,这就造成了 TCP 连接的巨大浪费,而且操作系统每秒能创建的 TCP 也是有限的,因此很快就会遇到系统瓶颈。
如果我们每个请求都使用一条 TCP 连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

(三)RabbitMQ 名称解释
ConnectionFactory(连接管理器):应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:

1.5 交换器分类
RabbitMQ 的 Exchange(交换器)分为四类:

direct(默认)
headers
fanout
topic

其中 headers 交换器允许你匹配 AMQP 消息的 header 而非路由键,除此之外 headers 交换器和 direct 交换器完全一致,但性能却很差,几乎用不到,所以我们这里不做解释。
1.5.1 direct 交换器
direct 为默认的交换器类型,也非常的简单,如果路由键匹配的话,消息就投递到相应的队列,如下图:

1.5.2 fanout 交换器
fanout 有别于 direct 交换器,fanout 是一种发布 / 订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
注意:对于 fanout 交换器来说 routingKey(路由键)是无效的,这个参数是被忽略的。
1.5.3 topic 交换器
topic 交换器运行和 fanout 类似,但是可以更灵活的匹配自己想要订阅的信息,这个时候 routingKey 路由键就排上用场了,使用路由键进行消息(规则)匹配。
topic 路由器的关键在于定义路由键,定义 routingKey 名称不能超过 255 字节,使用“.”作为分隔符,例如:com.mq.rabbit.error。
匹配规则
匹配表达式可以用“*”和“#”匹配任何字符,具体规则如下:

“*”匹配一个分段 (用“.”分割) 的内容;
“#”匹配所有字符;

例如发布了一个“cn.mq.rabbit.error”的消息:
能匹配上的路由键:

cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
#

不能匹配上的路由键:

cn.mq.*
*.error
*

1.6 消息持久化
RabbitMQ 队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证 Rabbit 在重启的时候不丢失呢?答案就是消息持久化。
当你把消息发送到 Rabbit 服务器的时候,你需要选择你是否要进行持久化,但这并不能保证 Rabbit 能从崩溃中恢复,想要 Rabbit 消息能恢复必须满足 3 个条件:

投递消息的时候 durable 设置为 true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数 2 设置为 true 持久化;
设置投递模式 deliveryMode 设置为 2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数 3 设置为存储纯文本到磁盘;
消息已经到达持久化交换器上;
消息已经到达持久化的队列;

持久化工作原理
Rabbit 会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit 会把这条消息标识为等待垃圾回收。
持久化的缺点
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用 SSD 硬盘可以使事情得到缓解,但他仍然吸干了 Rabbit 的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
所以使用者要根据自己的情况,选择适合自己的方式。
学习更多 RabbitMQ 知识,访问:https://gitbook.cn/gitchat/ac…
二、在 Docker 中安装 RabbitMQ
(1)下载镜像
https://hub.docker.com/r/libr…

alpine 轻量版
management 带插件的版本

从镜像的大小也可以很直观的看出来 alpine 是轻量版。
使用命令:
docker pull rabbitmq:3.7.7-management
下载带 management 插件的版本。
(2)运行 RabbitMQ
使用命令:
docker run -d –hostname myrabbit –name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management

-d 后台运行
–hostname 主机名称
–name 容器名称
-p 15672:15672 http 访问端口,映射本地端口到容器端口
-p 5672:5672 amqp 端口,映射本地端口到容器端口

正常启动之后,访问:http://localhost:15672/
登录网页管理页面,用户名密码:guest/guest,登录成功如下图:

三、RabbitMQ 集成
3.1 添加依赖
如果用 Idea 创建新项目,可以直接在创建 Spring Boot 的时候,点击“Integration”面板,选择 RabbitMQ 集成,如下图:

如果是老 Maven 项目,直接在 pom.xml 添加如下代码:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 配置 RabbitMQ 信息
在 application.properties 设置如下信息:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
3.3 代码
3.3 代码实现
本节分别来看三种交换器:direct、fanout、topic 的实现代码。
3.3.1 Direct Exchange
3.3.1.1 配置队列
创建 DirectConfig.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {
final static String QUEUE_NAME = “direct”; // 队列名称
final static String EXCHANGE_NAME = “mydirect”; // 交换器名称
@Bean
public Queue queue() {
// 声明队列 参数一:队列名称;参数二:是否持久化
return new Queue(DirectConfig.QUEUE_NAME, false);
}
// 配置默认的交换机,以下部分都可以不配置,不设置使用默认交换器(AMQP default)
@Bean
DirectExchange directExchange() {
// 参数一:交换器名称;参数二:是否持久化;参数三:是否自动删除消息
return new DirectExchange(DirectConfig.EXCHANGE_NAME, false, false);
}
// 绑定“direct”队列到上面配置的“mydirect”路由器
@Bean
Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(DirectConfig.QUEUE_NAME);
}
}
3.3.1.2 发送消息
创建 Sender.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 消息发送者 - 生产消息
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void driectSend(String message) {
System.out.println(“Direct 发送消息:” + message);
// 参数一:交换器名称,可以省略(省略存储到 AMQP default 交换器);参数二:路由键名称(direct 模式下路由键 = 队列名称);参数三:存储消息
this.rabbitTemplate.convertAndSend(“direct”, message);
}
}
注意:

在 direct 交换器中,路由键名称就是队列的名称;
发送消息“convertAndSend”的时候,第一个参数为交换器的名称,非必填可以忽略,如果忽略则会把消息发送到默认交换器“AMQP default”;

3.3.1.3 消费消息
创建 Receiver.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 消息接收者 - 消费消息
*/
@Component
@RabbitListener(queues = “direct”)
public class Receiver {
@Autowired
private AmqpTemplate rabbitTemplate;
@RabbitHandler
/**
* 监听消费消息
*/
public void process(String message) {
System.out.println(“Direct 消费消息:” + message);
}
}
3.3.1.4 测试代码
使用 Spring Boot 中的默认测试框架 JUnit 进行单元测试,不了解 JUnit 的可以参考我的上一篇文章,创建 MQTest.java 代码如下:
package com.example.rabbitmq.mq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
import static org.junit.Assert.*;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
@Autowired
private Sender sender;
@Test
public void driectTest() {
SimpleDateFormat sf = new SimpleDateFormat(“yyyy-MM-dd”);
sender.driectSend(“Driect Data:” + sf.format(new Date()));
}
}
执行之后,效果如下图:

表示消息已经被发送并被消费了。
3.3.2 Fanout Exchange
3.3.2.1 配置队列
创建 FanoutConfig.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
final static String QUEUE_NAME = “fanout”; // 队列名称
final static String QUEUE_NAME2 = “fanout2”; // 队列名称
final static String EXCHANGE_NAME = “myfanout”; // 交换器名称
@Bean
public Queue queueFanout() {
return new Queue(FanoutConfig.QUEUE_NAME);
}
@Bean
public Queue queueFanout2() {
return new Queue(FanoutConfig.QUEUE_NAME2);
}
// 配置交换器
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue(Queue queueFanout, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout).to(fanoutExchange);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue2(Queue queueFanout2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout2).to(fanoutExchange);
}
}
3.3.2.2 发送消息
创建 FanoutSender.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message) {
System.out.println(“ 发送消息:” + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME, message);
}
public void send2(String message) {
System.out.println(“ 发送消息 2:” + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME2, message);
}
}
3.3.2.3 消费消息
创建两个监听类,第一个 FanoutReceiver.java 代码如下:
package com.example.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@RabbitListener(queues = “fanout”)
public class FanoutReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println(“Fanout(FanoutReceiver)消费消息:” + msg);
}
}
第二个 FanoutReceiver2.java 代码如下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = “fanout2”)
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println(“Fanout(FanoutReceiver2)消费消息:” + message);
}
}
3.3.2.4 测试代码
创建 FanoutTest.java 代码如下:
package com.example.rabbitmq.mq;
import com.example.rabbitmq.RabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender sender;

@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat(“yyyy-MM-dd”);
sender.send(“Time1 => ” + sf.format(new Date()));
sender.send2(“Date2 => ” + sf.format(new Date()));
}
}
运行测试代码,输出结果如下:
发送消息:Time1 => 2018-09-11
发送消息 2:Date2 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Date2 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Date2 => 2018-09-11
总结:可以看出 fanout 会把消息分发到所有订阅到该交换器的队列,fanout 模式是忽略路由键的。
3.3.3 Topic Exchange
3.3.3.1 配置队列
@Configuration
public class TopicConfig {
final static String QUEUE_NAME = “log”;
final static String QUEUE_NAME2 = “log.all”;
final static String QUEUE_NAME3 = “log.all.error”;
final static String EXCHANGE_NAME = “topicExchange”; // 交换器名称
@Bean
public Queue queuetopic() {
return new Queue(TopicConfig.QUEUE_NAME);
}
@Bean
public Queue queuetopic2() {
return new Queue(TopicConfig.QUEUE_NAME2);
}
@Bean
public Queue queuetopic3() {
return new Queue(TopicConfig.QUEUE_NAME3);
}
// 配置交换器
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TopicConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器,并设置路由键(log.#)
@Bean
Binding bindingtopicExchangeQueue(Queue queuetopic, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic).to(topicExchange).with(“log.#”);
}
// 绑定队列到交换器,并设置路由键(log.*)
@Bean
Binding bindingtopicExchangeQueue2(Queue queuetopic2, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic2).to(topicExchange).with(“log.*”);
}
// 绑定队列到交换器,并设置路由键(log.*.error)
@Bean
Binding bindingtopicExchangeQueue3(Queue queuetopic3, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic3).to(topicExchange).with(“log.*.error”);
}
}
3.3.3.2 发布消息
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void topicSender(String message) {
String routingKey = “log.all.error”;
System.out.println(routingKey + ” 发送消息:” + message);
this.rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, message);
}
}
3.3.3.3 消费消息
@Component
@RabbitListener(queues = “log”)
public class TopicReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println(“log.# 消费消息:” + msg);
}
}
@Component
@RabbitListener(queues = “log.all”)
public class TopicReceiver2 {
@RabbitHandler
public void process(String msg) {
System.out.println(“log.* 消费消息:” + msg);
}
}
@Component
@RabbitListener(queues = “log.all.error”)
public class TopicReceiver3 {
@RabbitHandler
public void process(String msg) {
System.out.println(“log.*.error 消费消息:” + msg);
}
}
3.3.3.4 测试代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void Test() {
SimpleDateFormat sf = new SimpleDateFormat(“yyyy-MM-dd”);
fanoutSender.send(“Time1 => ” + sf.format(new Date()));
fanoutSender.send2(“Date2 => ” + sf.format(new Date()));
}
}
输出结果:
log.all.error 发送消息:time => 2018-09-11
log.# 消费消息:time => 2018-09-11
log.*.error 消费消息:time => 2018-09-11
总结:在 Topic Exchange 中“#”可以匹配所有内容,而“*”则是匹配一个字符段的内容。
以上示例代码 Github 地址:https://github.com/vipstone/s…
参考文档
阿里 RocketMQ 优势对比:https://juejin.im/entry/5a0ab…

正文完
 0