共计 20463 个字符,预计需要花费 52 分钟才能阅读完成。
目录
Windows 装置 RabbitMQ
环境工具下载
rabbitMQ 是 Erlang 语言开发的所以先下载 Erlang;
RabbitMQ 官网地址: https://www.rabbitmq.com/
Erlang 下载: https://www.erlang.org/downloads
Erlang 环境装置
间接运行: otp_win64_23.0.exe 程序始终 next 即可,如需扭转装置地位自行抉择,装置实现后对系统环境变量新增 ERLANG_HOME 地址为:
C:\Program Files\erl-23.0
双击零碎变量 path,点击“新建”,将 %ERLANG_HOME%\bin 退出到 path 中。
win+ R 键,输出 cmd,再输出 erl,看到 erlang 版本号就阐明 erlang 装置胜利了。
RabbitMQ 装置
间接运行: rabbitmq-server-3.8.8 程序始终 next 即可,如需扭转装置地位自行抉择.
RabbitMQ Web 治理端安裝
进入装置后的 RabbitMQ 的 sbin 目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
Cmd 命令执行: rabbitmq-plugins enable rabbitmq_managementr
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin>rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@LX-P1DMPLUV:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LX-P1DMPLUV...
Plugin configuration unchanged
常用命令:
# 启动 RabbitMQ
rabbitmq-service start
# 进行 RabbitMQ
rabbitmq-service stop
# 启用 RabbitMQ Web 可视化界面插件
rabbitmq-plugins enable rabbitmq_management
# 停用 RabbitMQ Web 可视化界面插件
rabbitmq-plugins disable rabbitmq_management
# 查看 RabbitMQ 状态
rabbitmqctl status
拜访治理端页面,默认账号密码为: guest
可视化界面: http://127.0.0.1:15672/#/
RabbitMQ 新增超级管理员
进入装置后的 RabbitMQ 的 sbin 目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
<# 创立用户 root 用户 明码为 123456
rabbitmqctl add_user root 123456
# 为该用户调配所有权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# 设置该用户为管理员角色
rabbitmqctl set_user_tags root administrator
RabbitMQ 特点
RabbitMQ 是一款应用 Erlang 语言开发的,实现 AMQP(高级音讯队列协定) 的开源消息中间件。首先要晓得一些 RabbitMQ 的特点:
- 可靠性: 反对长久化,传输确认,公布确认等保障了 MQ 的可靠性。
- 灵便的散发音讯策略: 在音讯进入 MQ 前由 Exchange(交换机) 进行路由音讯。
- 散发音讯策略: 简略模式、工作队列模式、公布订阅模式、路由模式、通配符模式。
- 反对集群: 多台 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker。
- 多种协定:RabbitMQ 反对多种音讯队列协定,比方 STOMP、MQTT 等等。
- 反对多种语言客户端:RabbitMQ 简直反对所有罕用编程语言,包含 Java、.NET、Ruby 等等。
- 可视化治理界面:RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和治理音讯 Broker。
- 插件机制:RabbitMQ 提供了许多插件,能够通过插件进行扩大,也能够编写本人的插件。
RabbitMQ 3 种罕用交换机
- Direct Exchange 直连型交换机: 依据音讯携带的路由键将音讯投递给对应队列。
- Fanout Exchange 扇型交换机: 这个交换机没有路由键概念,就算你绑了路由键也是忽视的。这个交换机在接管到音讯后,会间接转发到绑定到它下面的所有队列。
- Topic Exchange 主题交换机: 这个交换机其实跟直连交换机流程差不多,然而它的特点就是在它的路由键和绑定键之间是有规定的
RabbitMQ 5 种罕用模式
- Simple Work Queue 简略工作队列: 该模式是很少用到的一个场景,个别都会通过 Exchange 进行音讯调配到队列从而为当前扩大预留一个入口。
- Publish/Subscribe 公布订阅模式: 该模式性能最好,拿到音讯间接放入队列。
- Routing 路由模式: 该模式通过 routing key 进行全字匹配,匹配上将相干音讯放入相干队列。
- Topics 主题模式: 该模式通过 routng key 进行含糊匹配,匹配上将相干信息放入相干队列。
- Header 模式: 通过 message header 头部信息进行比对,能够依据定义全匹配、局部匹配等规定。
RabbitMQ 名词解释
- Producer/Publisher: 生产者,投递音讯的一方。
- Consumer: 消费者,接管音讯的一方。
- Message 音讯: 理论的数据,如 demo 中的 order 订单音讯载体。
- Queue 队列: 是 RabbitMQ 的外部对象,用于存储音讯,最终将音讯传输到消费者。
- Exchange 交换机: 在 RabbitMQ 中,生产者发送音讯到交换机,由交换机将音讯路由到一个或者多个队列中
- RoutingKey 路由键: 生产者将音讯发给交换器的时候,个别会指定一个 RoutingKey,用来指定这个音讯的路由规定。
- Binding 绑定:RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候个别会指定一个绑定键(BindingKey),这样 RabbitMQ 就晓得如何正确地将音讯路由到队列。
MQ 实用场景
异步解决场景
如当一个站点新增用户时须要走以下流程:验证账号信息 -> 用户入库 -> 发送注册胜利欢送邮箱给用户;
从该流程中剖析用户注册胜利后首先冀望的是可能胜利登录上站点,而对于是否收到注册胜利的邮件对于用户而言并不重要,
而邮件发送对于如遇到网络问题可能导致发送邮件迟缓素来导致整个用户注册流程响应很慢;
对于告诉邮件发送对于性能而言并不重要的时候,这个时候就能够将该业务放在 MQ 中异步执行从而能够从肯定水平上晋升整个流程的性能。
利用解耦
如当一个站点新增用户时须要走以下流程:验证账号信息 -> 用户入库 -> 发送注册胜利欢送邮箱给用户;
通常通过零碎划分会划分为:用户模块,音讯模块;
以 Spring Cloud 的为例依照原始做法会在用户入库胜利后会通过 Feign 调用音讯模块的发送邮件接口,然而如果音讯模块全集群宕机就会导致 Feign 申请失败从而导致业务不可用;
应用 MQ 就不会造成上述的问题,因为咱们在用户注册实现后想音讯模块对应的邮件发送业务队列去发送音讯即可,队列会监督音讯模块实现,如果完不成队列会始终监督,直到实现为止
流量削峰
秒杀和抢购等场景常常应用 MQ 进行流量削峰。流动开始时流量暴增,用户的申请写入 MQ,超过 MQ 最大长度抛弃申请,业务零碎接管 MQ 中的音讯进行解决,达到流量削峰、保证系统可用性的目标。
影响:MQ 是排队执行的所以对性能有肯定的影响,并且申请过多后会导致申请被抛弃问题
音讯通信
点对点或者订阅公布模式,通过音讯进行通信。如微信的音讯发送与接管、聊天室等。
SpringBoot 中应用 RabbitMQ
工程创立 & 筹备
阐明该工程依照包辨别同时负责生产者与消费者
POM 导入依赖:
<dependencies>
<!-- RabbitMQ 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 导入 Web 服务不便测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 代码简化工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
创立 SpringBoot 启动类:
@SpringBootApplication
public class SimpleRabbitMQCaseApplication {public static void main(String[] args) {SpringApplication.run(SimpleRabbitMQCaseApplication.class,args);
}
}
创立 applicatin.yaml:
server:
port: 8021
spring:
application:
name: rabbitmq-simple-case
#配置 rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123456
virtual-host: / # 虚构 host 能够不设置, 应用 server 默认 host
listener:
simple:
concurrency: 10 # 生产端的监听个数 (即 @RabbitListener 开启几个线程去解决数据。)
max-concurrency: 10 # 生产端的监听最大个数
prefetch: 5
acknowledge-mode: auto # MANUAL: 手动解决 AUTO: 主动解决
default-requeue-rejected: true # 生产不胜利的音讯回绝入队
retry:
enabled: true # 开启音讯重试
max-attempts: 5 # 重试次数
max-interval: 10000 # 重试最大间隔时间
initial-interval: 2000 # 重试初始间隔时间
简略队列生产生产
生产者:
/**
* 简略队列音讯生产
* @author wuwentao
*/
@RestController
@RequestMapping("/simple/queue")
@AllArgsConstructor
public class SimpleQueueProducer {
private RabbitTemplate rabbitTemplate;
// 发送到的队列名称
public static final String AMQP_SIMPLE_QUEUE = "amqp.simple.queue";
/**
* 发送简略音讯
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){rabbitTemplate.convertAndSend(AMQP_SIMPLE_QUEUE, message);
return "OK";
}
}
消费者:
/**
* 简略队列音讯消费者
* @author wuwentao
*/
@Component
@Slf4j
public class SimpleQueueConsumer {
/**
* 监听一个简略的队列,队列不存在时候会创立
* @param content 音讯
*/
@RabbitListener(queuesToDeclare = @Queue(name = SimpleQueueProducer.AMQP_SIMPLE_QUEUE))
public void consumerSimpleMessage(String content, Message message, Channel channel) throws IOException {
// 通过 Message 对象解析音讯
String messageStr = new String(message.getBody());
log.info("通过参数模式接管的音讯:{}" ,content);
//log.info("通过 Message:{}" ,messageStr); // 可通过 Meessage 对象解析音讯
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认音讯生产胜利
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 手动确认音讯生产失败
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/simple/queue/sendMessage?message= 这是一条简略的音讯序号 1
http://localhost:8021/simple/queue/sendMessage?message= 这是一条简略的音讯序号 2
http://localhost:8021/simple/queue/sendMessage?message= 这是一条简略的音讯序号 3
控制台打印生产信息:
2022-08-22 09:45:26.846 INFO 14400 --- [ntContainer#0-1] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯: 这是一条简略的音讯序号 1
2022-08-22 09:45:29.064 INFO 14400 --- [tContainer#0-10] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯: 这是一条简略的音讯序号 2
2022-08-22 09:45:31.441 INFO 14400 --- [ntContainer#0-4] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯: 这是一条简略的音讯序号 3
注意事项: 在 YAML 中开启的配置 acknowledge-mode 为 auto 也是默认的所以音讯不须要手动确认默认没有异样则生产胜利,如果须要定制 ACK 形式能够将 acknowledge-mode 批改为 MANUAL 则要在生产实现后自行 ACK 或 NACK 否则将导致音讯反复生产
Fanout Exchange 扇形交换机 播送模式
fanout 模式也叫播送模式,每一条音讯能够被绑定在同一个交换机上的所有队列的消费者生产
生产者:
@RestController
@RequestMapping("/exchange/fanout")
@AllArgsConstructor
public class ExchangeFanoutProducer {
private RabbitTemplate rabbitTemplate;
// 扇形交换机定义
public static final String EXCHANGE_FANOUT = "exchange.fanout";
// 绑定扇形交换机的队列 1
public static final String EXCHANGE_FANOUT_QUEUE_1 = "exchange.fanout.queue1";
// 绑定扇形交换机的队列 2
public static final String EXCHANGE_FANOUT_QUEUE_2 = "exchange.fanout.queue2";
/**
* 发送扇形音讯音讯可能被所有绑定该交换机的队列给生产
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
// routingkey 在 fanout 模式不应用,会在 direct 和 topic 模式应用, 所以这里给空
rabbitTemplate.convertAndSend(EXCHANGE_FANOUT,"", message);
return "OK";
}
}
消费者:
这里定义两个消费者同时绑定同一个扇形交换机, 这里次要申明交换机 Type 为 ExchangeTypes.FANOUT
/**
* 扇形交换机队列消费者
* @author wuwentao
*/
@Component
@Slf4j
public class ExchangeFanoutConsumer {
/**
* 创立交换机并且绑定队列(队列 1)*
* @param content 内容
* @param channel 通道
* @param message 音讯
* @throws IOException ioexception
* @throws TimeoutException 超时异样
*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),
value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_1, durable = "true")
))
@RabbitHandler
public void exchangeFanoutQueue1(String content, Channel channel, Message message) {log.info("EXCHANGE_FANOUT_QUEUE_1 队列接管到音讯:{}",content);
}
/**
* 创立交换机并且绑定队列(队列 2)*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),
value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_2, durable = "true")
))
@RabbitHandler
public void exchangeFanoutQueue2(String content, Channel channel, Message message) {log.info("EXCHANGE_FANOUT_QUEUE_2 队列接管到音讯:{}",content);
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/fanout/sendMessage?message= 这是一条扇形交换机中的音讯序号 1
http://localhost:8021/exchange/fanout/sendMessage?message= 这是一条扇形交换机中的音讯序号 2
http://localhost:8021/exchange/fanout/sendMessage?message= 这是一条扇形交换机中的音讯序号 3
控制台打印生产信息:
2022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#1-2] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2 队列接管到音讯:这是一条扇形交换机中的音讯序号 1
2022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#0-7] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1 队列接管到音讯:这是一条扇形交换机中的音讯序号 1
2022-08-22 10:10:49.151 INFO 12016 --- [tContainer#0-10] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1 队列接管到音讯:这是一条扇形交换机中的音讯序号 2
2022-08-22 10:10:49.151 INFO 12016 --- [ntContainer#1-4] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2 队列接管到音讯:这是一条扇形交换机中的音讯序号 2
2022-08-22 10:10:54.254 INFO 12016 --- [ntContainer#0-6] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1 队列接管到音讯:这是一条扇形交换机中的音讯序号 3
2022-08-22 10:10:54.255 INFO 12016 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2 队列接管到音讯:这是一条扇形交换机中的音讯序号 3
Direct Exchange 直连型交换机,
直连交换机与扇形交换机的区别在于, 队列都是绑定同一个交换机, 然而在队列上会增加 routingkey 标识, 消费者会依据对应的不同 routingkey 生产对应的音讯。
生产者:
@RestController
@RequestMapping("/exchange/direct")
@AllArgsConstructor
public class ExchangeDirectProducer {
private RabbitTemplate rabbitTemplate;
// 直连交换机定义
public static final String EXCHANGE_DIRECT = "exchange.direct";
// 直连交换机队列定义 1
public static final String EXCHANGE_DIRECT_QUEUE_1 = "exchange.direct.queue1";
// 直连交换机队列定义 2
public static final String EXCHANGE_DIRECT_QUEUE_2 = "exchange.direct.queue2";
// 直连交换机路由 KEY 定义 1
public static final String EXCHANGE_DIRECT_ROUTING_KEY_1 = "exchange.direct.routing.key1";
// 直连交换机路由 KEY 定义 2
public static final String EXCHANGE_DIRECT_ROUTING_KEY_2 = "exchange.direct.routing.key2";
/**
* 发送音讯到直连交换机并且指定对应 routingkey
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message,
@RequestParam(value = "routingkey") int routingkey){if(routingkey == 1){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_1, message);
} else if (routingkey == 2){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_2, message);
}else{return "非法参数!";}
return "OK";
}
}
消费者:
这里定义多个消费者同时绑定同一个直连交换机, 这里次要申明交换机 Type 为 ExchangeTypes.DIRECT, 不申明则默认为 DIRECT。
/**
* 直连交换机队列消费者
* @author wuwentao
*/
@Component
@Slf4j
public class ExchangeDirectConsumer {
/**
* 创立交换机并且绑定队列 1(绑定 routingkey1)*
* @param content 内容
* @param channel 通道
* @param message 音讯
* @throws IOException ioexception
* @throws TimeoutException 超时异样
*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_1, durable = "true"),
key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_1
))
@RabbitHandler
public void exchangeDirectRoutingKey1(String content, Channel channel, Message message) {log.info("队列 1 KEY1 接管到音讯:{}",content);
}
/**
* 创立交换机并且绑定队列 2(绑定 routingkey2)*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_2, durable = "true"),
key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_2
))
@RabbitHandler
public void exchangeDirectRoutingKey2(String content, Channel channel, Message message) {log.info("队列 2 KEY2 接管到音讯:{}",content);
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/direct/sendMessage?routingkey=1&message= 这是一条发给路由 key 为 1 的音讯
http://localhost:8021/exchange/direct/sendMessage?routingkey=2&message= 这是一条发给路由 key 为 2 的音讯
控制台打印生产信息:
2022-08-22 10:37:22.173 INFO 4380 --- [ntContainer#0-1] c.g.b.s.consumer.ExchangeDirectConsumer : 队列 1 KEY1 接管到音讯:这是一条发给路由 key 为 1 的音讯
2022-08-22 10:37:26.882 INFO 4380 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeDirectConsumer : 队列 2 KEY2 接管到音讯:这是一条发给路由 key 为 2 的音讯
Topic Exchange 主题交换机
这个交换机其实跟直连交换机流程差不多,然而它的特点就是在它的路由键和绑定键之间是有规定的;规定如下:
Topic 交换机接管的音讯 RoutingKey 必须是多个单词,以 . 宰割
Topic 交换机与队列绑定时的 routingKey 能够指定通配符
#:代表 0 个或多个词
*:代表 1 个词
生产者:
@RestController
@RequestMapping("/exchange/topic")
@AllArgsConstructor
public class ExchangeTopicProducer {
private RabbitTemplate rabbitTemplate;
// 主題交换机定义
public static final String EXCHANGE_TOPIC = "exchange.topic";
// 主題交换机队列定义 1
public static final String EXCHANGE_TOPIC_QUEUE_1 = "exchange.topic.queue1";
// 主題交换机队列定义 1
public static final String EXCHANGE_TOPIC_QUEUE_2 = "exchange.topic.queue2";
// 主題交换机队列路由 Key 定义 1
public static final String EXCHANGE_TOPIC_ROUTING1_KEY_1 = "#.routingkey.#";
// 主題交换机队列路由 Key 定义 2
public static final String EXCHANGE_TOPIC_ROUTING2_KEY_2 = "routingkey.*";
// 案例 KEY1 能够被 EXCHANGE_TOPIC_ROUTING1_KEY_1 匹配不能被 EXCHANGE_TOPIC_ROUTING2_KEY_2 匹配
public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";
// 案例 KEY2 同时能够被 EXCHANGE_TOPIC_ROUTING1_KEY_1 和 EXCHANGE_TOPIC_ROUTING2_KEY_2 匹配
public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";
/**
* 发送音讯到主题交换机并且指定对应可通配 routingkey
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message,
@RequestParam(value = "routingkey") int routingkey){if(routingkey == 1){
// 同时匹配 topic.routingkey.case1 和 routingkey.case2
rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_1, message);
} else if (routingkey == 2){
// 只能匹配 routingkey.case2
rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_2, message);
}else{return "非法参数!";}
return "OK";
}
}
消费者:
这里定义多个消费者同时绑定同一个直主题交换机, 这里次要申明交换机 Type 为 ExchangeTypes.TOPIC, 当 routingkey 发送的音讯可能被消费者给匹配仅可能接管到音讯。
@Component
@Slf4j
public class ExchangeTopicConsumer {
/**
* 创立交换机并且绑定队列 1(绑定 routingkey1)*
* @param content 内容
* @param channel 通道
* @param message 音讯
* @throws IOException ioexception
* @throws TimeoutException 超时异样
*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),
value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_1, durable = "true"),
key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING1_KEY_1
))
@RabbitHandler
public void exchangeTopicRoutingKey1(String content, Channel channel, Message message) {log.info("# 号统配符号队列 1 接管到音讯:{}",content);
}
/**
* 创立交换机并且绑定队列 2(绑定 routingkey2)*/
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),
value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_2, durable = "true"),
key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING2_KEY_2
))
@RabbitHandler
public void exchangeTopicRoutingKey2(String content, Channel channel, Message message) {log.info("* 号统配符号队列 2 接管到音讯:{}",content);
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/topic/sendMessage?routingkey=1&message= 前后多重匹配
http://localhost:8021/exchange/topic/sendMessage?routingkey=2&message= 后一个词匹配
控制台打印生产信息:
2022-08-22 15:10:50.444 INFO 1376 --- [ntContainer#4-8] c.g.b.s.consumer.ExchangeTopicConsumer : #号统配符号队列 1 接管到音讯: 前后多重匹配
2022-08-22 15:10:55.118 INFO 1376 --- [ntContainer#5-8] c.g.b.s.consumer.ExchangeTopicConsumer : * 号统配符号队列 2 接管到音讯: 后一个词匹配
2022-08-22 15:10:55.119 INFO 1376 --- [ntContainer#4-9] c.g.b.s.consumer.ExchangeTopicConsumer : #号统配符号队列 1 接管到音讯: 后一个词匹配
手动 ACK 与音讯确认机制
新增 SpringBoot 配置文件 YAML, 这里次要将主动 ACK 批改为手工 ACK 并且开启音讯确认模式与音讯回退:
spring:
rabbitmq:
listener:
acknowledge-mode: manual # MANUAL: 手动解决 AUTO: 主动解决
# NONE 值是禁用公布确认模式,是默认值
# CORRELATED 值是公布音讯胜利到交换器后会触发回调办法,如 1 示例
# SIMPLE 值经测试有两种成果,其一成果和 CORRELATED 值一样会触发回调办法,其二在公布音讯胜利后应用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 办法期待 broker 节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是 waitForConfirmsOrDie 办法如果返回 false 则会敞开 channel,则接下来无奈发送音讯到 broker;
publisher-confirm-type: simple #音讯确认机制
publisher-returns: true # 音讯回退确认机制
定义音讯回调确认实现类:
/**
* 消费者确认收到音讯后,手动 ack 回执回调解决
* @author wuwentao
*/
@Slf4j
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("===================================================");
log.info("音讯确认机制回调函数参数信息如下:");
log.info("ACK 状态:{}",ack);
log.info("投递失败起因:{}",cause);
log.info("===================================================");
}
}
消费者:
/**
* RabbitMQ Message 回调地址消费者测试
* @author wuwentao
*/
@Component
@Slf4j
public class MessagesCallbackConsumer {
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = MessagesCallbackProducer.MESSAGE_CALLBACK_EXCHANGE, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = MessagesCallbackProducer.MESSAGE_CALLBACK_QUEUE, durable = "true"),
key = MessagesCallbackProducer.MESSAGE_CALLBACK_ROUTINGKEY
))
@RabbitHandler
public void consumer(String content, Channel channel, Message message) throws IOException {if("胜利".equals(content)){log.info("音讯解决胜利:{}",content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认音讯生产胜利
}else{if(message.getMessageProperties().getRedelivered()){log.info("音讯已被解决过了请勿反复解决音讯:{}",content);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 回绝该音讯,音讯会被抛弃,不会重回队列
}else{log.info("音讯解决失败期待重新处理:{}",content);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
生产者:
/**
* 音讯回调机制测试
* @author wuwentao
*/
@RestController
@RequestMapping("/message/callback")
@AllArgsConstructor
public class MessagesCallbackProducer {
private RabbitTemplate rabbitTemplate;
private MessageConfirmCallback messageConfirmCallback;
// 发送到的队列名称
public static final String MESSAGE_CALLBACK_QUEUE = "amqp.message.callback.queue";
public static final String MESSAGE_CALLBACK_EXCHANGE = "amqp.message.callback.exchange";
public static final String MESSAGE_CALLBACK_ROUTINGKEY = "amqp.message.callback.routingkey";
/**
* 测试音讯确认机制
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
// 设置失败和确认回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(messageConfirmCallback);
// 构建回调 id 为 uuid
String callBackId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(callBackId);
if("失败的音讯".equals(message)){
// 写一个不存的替换机器 和不存在的路由 KEY
rabbitTemplate.convertAndSend("sdfdsafadsf","123dsfdasf",message,
msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},correlationData);
}else{
rabbitTemplate.convertAndSend(MESSAGE_CALLBACK_EXCHANGE,MESSAGE_CALLBACK_ROUTINGKEY,message,
msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},correlationData);
}
return "OK";
}
}
测试生成音讯拜访接口地址:
# 发送找不到交换机的音讯
http://localhost:8021/message/callback/sendMessage?message= 失败的音讯
# 发送手动 ACK 胜利的音讯
http://localhost:8021/message/callback/sendMessage?message= 胜利
# 发送手动 ACK 失败的音讯
http://localhost:8021/message/callback/sendMessage?message= 失败
控制台打印生产信息:
2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:
2022-08-24 09:11:50.123 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK 状态:false
2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'sdfdsafadsf' in vhost '/', class-id=60, method-id=40)
2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.704 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK 状态:true
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:null
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.735 INFO 11440 --- [ntContainer#6-1] c.g.b.s.c.MessagesCallbackConsumer : 音讯解决胜利: 胜利
2022-08-24 09:12:16.680 INFO 11440 --- [ntContainer#6-4] c.g.b.s.c.MessagesCallbackConsumer : 音讯解决失败期待重新处理: 失败
2022-08-24 09:12:16.688 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK 状态:true
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:null
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:16.693 INFO 11440 --- [ntContainer#6-7] c.g.b.s.c.MessagesCallbackConsumer : 音讯已被解决过了请勿反复解决音讯: 失败