目录
Windows装置RabbitMQ
环境工具下载
rabbitMQ是Erlang语言开发的所以先下载Erlang;
RabbitMQ官网地址: https://www.rabbitmq.com/Erlang下载: https://www.erlang.org/downloads
Erlang环境装置
间接运行: otp_win64_23.0.exe 程序始终next即可,如需扭转装置地位自行抉择,装置实现后对系统环境变量新增ERLANG_HOME地址为:
C:\Program Files\erl-23.0
双击零碎变量path,点击“新建”,将%ERLANG_HOME%\bin退出到path中。
win+R键,输出cmd,再输出erl,看到erlang版本号就阐明erlang装置胜利了。
RabbitMQ装置
间接运行: rabbitmq-server-3.8.8 程序始终next即可,如需扭转装置地位自行抉择.
RabbitMQ Web治理端安裝
进入装置后的RabbitMQ的sbin目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
Cmd命令执行: rabbitmq-plugins enable rabbitmq_managementr
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin>rabbitmq-plugins enable rabbitmq_managementEnabling plugins on node rabbit@LX-P1DMPLUV:rabbitmq_managementThe following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatchApplying plugin configuration to rabbit@LX-P1DMPLUV...Plugin configuration unchanged
常用命令:
# 启动RabbitMQrabbitmq-service start# 进行RabbitMQrabbitmq-service stop# 启用RabbitMQ Web可视化界面插件rabbitmq-plugins enable rabbitmq_management# 停用RabbitMQ Web可视化界面插件rabbitmq-plugins disable rabbitmq_management# 查看RabbitMQ状态rabbitmqctl status
拜访治理端页面,默认账号密码为: guest
可视化界面: http://127.0.0.1:15672/#/
RabbitMQ新增超级管理员
进入装置后的RabbitMQ的sbin目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
<# 创立用户root用户 明码为123456rabbitmqctl add_user root 123456# 为该用户调配所有权限rabbitmqctl set_permissions -p / root ".*" ".*" ".*"# 设置该用户为管理员角色rabbitmqctl set_user_tags root administrator
RabbitMQ特点
RabbitMQ是一款应用Erlang语言开发的,实现AMQP(高级音讯队列协定)的开源消息中间件。首先要晓得一些RabbitMQ的特点:
- 可靠性:反对长久化,传输确认,公布确认等保障了MQ的可靠性。
- 灵便的散发音讯策略:在音讯进入MQ前由Exchange(交换机)进行路由音讯。
- 散发音讯策略:简略模式、工作队列模式、公布订阅模式、路由模式、通配符模式。
- 反对集群:多台RabbitMQ服务器能够组成一个集群,造成一个逻辑Broker。
- 多种协定:RabbitMQ反对多种音讯队列协定,比方 STOMP、MQTT 等等。
- 反对多种语言客户端:RabbitMQ简直反对所有罕用编程语言,包含 Java、.NET、Ruby 等等。
- 可视化治理界面:RabbitMQ提供了一个易用的用户界面,使得用户能够监控和治理音讯 Broker。
- 插件机制:RabbitMQ提供了许多插件,能够通过插件进行扩大,也能够编写本人的插件。
RabbitMQ 3种罕用交换机
- Direct Exchange 直连型交换机:依据音讯携带的路由键将音讯投递给对应队列。
- Fanout Exchange 扇型交换机:这个交换机没有路由键概念,就算你绑了路由键也是忽视的。 这个交换机在接管到音讯后,会间接转发到绑定到它下面的所有队列。
- Topic Exchange 主题交换机:这个交换机其实跟直连交换机流程差不多,然而它的特点就是在它的路由键和绑定键之间是有规定的
RabbitMQ 5种罕用模式
- Simple Work Queue 简略工作队列:该模式是很少用到的一个场景,个别都会通过Exchange进行音讯调配到队列从而为当前扩大预留一个入口。
- Publish/Subscribe 公布订阅模式:该模式性能最好,拿到音讯间接放入队列。
- Routing 路由模式:该模式通过routing key 进行全字匹配,匹配上将相干音讯放入相干队列。
- Topics 主题模式:该模式通过routng key进行含糊匹配,匹配上将相干信息放入相干队列。
- Header 模式:通过message header头部信息进行比对,能够依据定义全匹配、局部匹配等规定。
RabbitMQ名词解释
- Producer/Publisher:生产者,投递音讯的一方。
- Consumer:消费者,接管音讯的一方。
- Message音讯:理论的数据,如demo中的order订单音讯载体。
- Queue队列:是RabbitMQ的外部对象,用于存储音讯,最终将音讯传输到消费者。
- Exchange交换机:在RabbitMQ中,生产者发送音讯到交换机,由交换机将音讯路由到一个或者多个队列中
- RoutingKey路由键:生产者将音讯发给交换器的时候,个别会指定一个RoutingKey,用来指定这个音讯的路由规定。
- Binding绑定:RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候个别会指定一个绑定键(BindingKey),这样RabbitMQ就晓得如何正确地将音讯路由到队列。
MQ实用场景
异步解决场景
如当一个站点新增用户时须要走以下流程:验证账号信息->用户入库->发送注册胜利欢送邮箱给用户;
从该流程中剖析用户注册胜利后首先冀望的是可能胜利登录上站点,而对于是否收到注册胜利的邮件对于用户而言并不重要,
而邮件发送对于如遇到网络问题可能导致发送邮件迟缓素来导致整个用户注册流程响应很慢;
对于告诉邮件发送对于性能而言并不重要的时候,这个时候就能够将该业务放在MQ中异步执行从而能够从肯定水平上晋升整个流程的性能。
利用解耦
如当一个站点新增用户时须要走以下流程:验证账号信息->用户入库->发送注册胜利欢送邮箱给用户;
通常通过零碎划分会划分为:用户模块,音讯模块;
以Spring Cloud的为例依照原始做法会在用户入库胜利后会通过Feign调用音讯模块的发送邮件接口,然而如果音讯模块全集群宕机就会导致Feign申请失败从而导致业务不可用;
应用MQ就不会造成上述的问题,因为咱们在用户注册实现后想音讯模块对应的邮件发送业务队列去发送音讯即可,队列会监督音讯模块实现,如果完不成队列会始终监督,直到实现为止
流量削峰
秒杀和抢购等场景常常应用 MQ 进行流量削峰。流动开始时流量暴增,用户的申请写入 MQ,超过 MQ 最大长度抛弃申请,业务零碎接管 MQ 中的音讯进行解决,达到流量削峰、保证系统可用性的目标。
影响:MQ是排队执行的所以对性能有肯定的影响,并且申请过多后会导致申请被抛弃问题
音讯通信
点对点或者订阅公布模式,通过音讯进行通信。如微信的音讯发送与接管、聊天室等。
SpringBoot中应用RabbitMQ
工程创立&筹备
阐明该工程依照包辨别同时负责生产者与消费者
POM导入依赖:
<dependencies> <!-- RabbitMQ依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 导入Web服务不便测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 代码简化工具 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency></dependencies>
创立SpringBoot启动类:
@SpringBootApplicationpublic class SimpleRabbitMQCaseApplication { public static void main(String[] args) { SpringApplication.run(SimpleRabbitMQCaseApplication.class,args); }}
创立applicatin.yaml:
server: port: 8021spring: application: name: rabbitmq-simple-case #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: root password: 123456 virtual-host: / # 虚构host 能够不设置,应用server默认host listener: simple: concurrency: 10 # 生产端的监听个数(即@RabbitListener开启几个线程去解决数据。) max-concurrency: 10 # 生产端的监听最大个数 prefetch: 5 acknowledge-mode: auto # MANUAL:手动解决 AUTO:主动解决 default-requeue-rejected: true # 生产不胜利的音讯回绝入队 retry: enabled: true # 开启音讯重试 max-attempts: 5 # 重试次数 max-interval: 10000 # 重试最大间隔时间 initial-interval: 2000 # 重试初始间隔时间
简略队列生产生产
生产者:
/** * 简略队列音讯生产 * @author wuwentao */@RestController@RequestMapping("/simple/queue")@AllArgsConstructorpublic class SimpleQueueProducer { private RabbitTemplate rabbitTemplate; // 发送到的队列名称 public static final String AMQP_SIMPLE_QUEUE = "amqp.simple.queue"; /** * 发送简略音讯 * @param message 音讯内容 */ @GetMapping("/sendMessage") public String sendMessage(@RequestParam(value = "message") String message){ rabbitTemplate.convertAndSend(AMQP_SIMPLE_QUEUE, message); return "OK"; }}
消费者:
/** * 简略队列音讯消费者 * @author wuwentao */@Component@Slf4jpublic class SimpleQueueConsumer { /** * 监听一个简略的队列,队列不存在时候会创立 * @param content 音讯 */ @RabbitListener(queuesToDeclare = @Queue(name = SimpleQueueProducer.AMQP_SIMPLE_QUEUE)) public void consumerSimpleMessage(String content, Message message, Channel channel) throws IOException { // 通过Message对象解析音讯 String messageStr = new String(message.getBody()); log.info("通过参数模式接管的音讯:{}" ,content); //log.info("通过Message:{}" ,messageStr); // 可通过Meessage对象解析音讯 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认音讯生产胜利 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 手动确认音讯生产失败 }}
测试生成音讯拜访接口地址:
http://localhost:8021/simple/queue/sendMessage?message=这是一条简略的音讯序号1http://localhost:8021/simple/queue/sendMessage?message=这是一条简略的音讯序号2http://localhost:8021/simple/queue/sendMessage?message=这是一条简略的音讯序号3
控制台打印生产信息:
2022-08-22 09:45:26.846 INFO 14400 --- [ntContainer#0-1] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯:这是一条简略的音讯序号12022-08-22 09:45:29.064 INFO 14400 --- [tContainer#0-10] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯:这是一条简略的音讯序号22022-08-22 09:45:31.441 INFO 14400 --- [ntContainer#0-4] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯:这是一条简略的音讯序号3
注意事项:在YAML中开启的配置acknowledge-mode为auto也是默认的所以音讯不须要手动确认默认没有异样则生产胜利,如果须要定制ACK形式能够将acknowledge-mode批改为MANUAL则要在生产实现后自行ACK或NACK否则将导致音讯反复生产
Fanout Exchange 扇形交换机 播送模式
fanout模式也叫播送模式,每一条音讯能够被绑定在同一个交换机上的所有队列的消费者生产
生产者:
@RestController@RequestMapping("/exchange/fanout")@AllArgsConstructorpublic class ExchangeFanoutProducer { private RabbitTemplate rabbitTemplate; // 扇形交换机定义 public static final String EXCHANGE_FANOUT = "exchange.fanout"; // 绑定扇形交换机的队列1 public static final String EXCHANGE_FANOUT_QUEUE_1 = "exchange.fanout.queue1"; // 绑定扇形交换机的队列2 public static final String EXCHANGE_FANOUT_QUEUE_2 = "exchange.fanout.queue2"; /** * 发送扇形音讯音讯可能被所有绑定该交换机的队列给生产 * @param message 音讯内容 */ @GetMapping("/sendMessage") public String sendMessage(@RequestParam(value = "message") String message){ // routingkey 在fanout模式不应用,会在direct和topic模式应用,所以这里给空 rabbitTemplate.convertAndSend(EXCHANGE_FANOUT,"", message); return "OK"; }}
消费者:
这里定义两个消费者同时绑定同一个扇形交换机,这里次要申明交换机Type为ExchangeTypes.FANOUT
/** * 扇形交换机队列消费者 * @author wuwentao */@Component@Slf4jpublic class ExchangeFanoutConsumer { /** * 创立交换机并且绑定队列(队列1) * * @param content 内容 * @param channel 通道 * @param message 音讯 * @throws IOException ioexception * @throws TimeoutException 超时异样 */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT), value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_1, durable = "true") )) @RabbitHandler public void exchangeFanoutQueue1(String content, Channel channel, Message message) { log.info("EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:{}",content); } /** * 创立交换机并且绑定队列(队列2) */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT), value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_2, durable = "true") )) @RabbitHandler public void exchangeFanoutQueue2(String content, Channel channel, Message message) { log.info("EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:{}",content); }}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的音讯序号1http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的音讯序号2http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的音讯序号3
控制台打印生产信息:
2022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#1-2] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:这是一条扇形交换机中的音讯序号12022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#0-7] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:这是一条扇形交换机中的音讯序号12022-08-22 10:10:49.151 INFO 12016 --- [tContainer#0-10] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:这是一条扇形交换机中的音讯序号22022-08-22 10:10:49.151 INFO 12016 --- [ntContainer#1-4] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:这是一条扇形交换机中的音讯序号22022-08-22 10:10:54.254 INFO 12016 --- [ntContainer#0-6] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:这是一条扇形交换机中的音讯序号32022-08-22 10:10:54.255 INFO 12016 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:这是一条扇形交换机中的音讯序号3
Direct Exchange 直连型交换机,
直连交换机与扇形交换机的区别在于,队列都是绑定同一个交换机,然而在队列上会增加routingkey标识,消费者会依据对应的不同routingkey生产对应的音讯。
生产者:
@RestController@RequestMapping("/exchange/direct")@AllArgsConstructorpublic class ExchangeDirectProducer { private RabbitTemplate rabbitTemplate; // 直连交换机定义 public static final String EXCHANGE_DIRECT = "exchange.direct"; // 直连交换机队列定义1 public static final String EXCHANGE_DIRECT_QUEUE_1 = "exchange.direct.queue1"; // 直连交换机队列定义2 public static final String EXCHANGE_DIRECT_QUEUE_2 = "exchange.direct.queue2"; // 直连交换机路由KEY定义1 public static final String EXCHANGE_DIRECT_ROUTING_KEY_1 = "exchange.direct.routing.key1"; // 直连交换机路由KEY定义2 public static final String EXCHANGE_DIRECT_ROUTING_KEY_2 = "exchange.direct.routing.key2"; /** * 发送音讯到直连交换机并且指定对应routingkey * @param message 音讯内容 */ @GetMapping("/sendMessage") public String sendMessage(@RequestParam(value = "message") String message, @RequestParam(value = "routingkey") int routingkey){ if(routingkey == 1){ rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_1, message); } else if (routingkey == 2){ rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_2, message); }else{ return "非法参数!"; } return "OK"; }}
消费者:
这里定义多个消费者同时绑定同一个直连交换机,这里次要申明交换机Type为ExchangeTypes.DIRECT,不申明则默认为DIRECT。
/** * 直连交换机队列消费者 * @author wuwentao */@Component@Slf4jpublic class ExchangeDirectConsumer { /** * 创立交换机并且绑定队列1(绑定routingkey1) * * @param content 内容 * @param channel 通道 * @param message 音讯 * @throws IOException ioexception * @throws TimeoutException 超时异样 */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT), value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_1, durable = "true"), key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_1 )) @RabbitHandler public void exchangeDirectRoutingKey1(String content, Channel channel, Message message) { log.info("队列1 KEY1接管到音讯:{}",content); } /** * 创立交换机并且绑定队列2(绑定routingkey2) */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT), value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_2, durable = "true"), key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_2 )) @RabbitHandler public void exchangeDirectRoutingKey2(String content, Channel channel, Message message) { log.info("队列2 KEY2接管到音讯:{}",content); }}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/direct/sendMessage?routingkey=1&message=这是一条发给路由key为1的音讯http://localhost:8021/exchange/direct/sendMessage?routingkey=2&message=这是一条发给路由key为2的音讯
控制台打印生产信息:
2022-08-22 10:37:22.173 INFO 4380 --- [ntContainer#0-1] c.g.b.s.consumer.ExchangeDirectConsumer : 队列1 KEY1接管到音讯:这是一条发给路由key为1的音讯2022-08-22 10:37:26.882 INFO 4380 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeDirectConsumer : 队列2 KEY2接管到音讯:这是一条发给路由key为2的音讯
Topic Exchange 主题交换机
这个交换机其实跟直连交换机流程差不多,然而它的特点就是在它的路由键和绑定键之间是有规定的;规定如下:
Topic交换机接管的音讯RoutingKey必须是多个单词,以 . 宰割
Topic交换机与队列绑定时的routingKey能够指定通配符
#:代表0个或多个词*:代表1个词
生产者:
@RestController@RequestMapping("/exchange/topic")@AllArgsConstructorpublic class ExchangeTopicProducer { private RabbitTemplate rabbitTemplate; // 主題交换机定义 public static final String EXCHANGE_TOPIC = "exchange.topic"; // 主題交换机队列定义1 public static final String EXCHANGE_TOPIC_QUEUE_1 = "exchange.topic.queue1"; // 主題交换机队列定义1 public static final String EXCHANGE_TOPIC_QUEUE_2 = "exchange.topic.queue2"; // 主題交换机队列路由Key定义1 public static final String EXCHANGE_TOPIC_ROUTING1_KEY_1 = "#.routingkey.#"; // 主題交换机队列路由Key定义2 public static final String EXCHANGE_TOPIC_ROUTING2_KEY_2 = "routingkey.*"; // 案例KEY1 能够被EXCHANGE_TOPIC_ROUTING1_KEY_1匹配不能被EXCHANGE_TOPIC_ROUTING2_KEY_2匹配 public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1"; // 案例KEY2 同时能够被EXCHANGE_TOPIC_ROUTING1_KEY_1和EXCHANGE_TOPIC_ROUTING2_KEY_2匹配 public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2"; /** * 发送音讯到主题交换机并且指定对应可通配routingkey * @param message 音讯内容 */ @GetMapping("/sendMessage") public String sendMessage(@RequestParam(value = "message") String message, @RequestParam(value = "routingkey") int routingkey){ if(routingkey == 1){ // 同时匹配 topic.routingkey.case1 和 routingkey.case2 rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_1, message); } else if (routingkey == 2){ // 只能匹配 routingkey.case2 rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_2, message); }else{ return "非法参数!"; } return "OK"; }}
消费者:
这里定义多个消费者同时绑定同一个直主题交换机,这里次要申明交换机Type为ExchangeTypes.TOPIC,当routingkey发送的音讯可能被消费者给匹配仅可能接管到音讯。
@Component@Slf4jpublic class ExchangeTopicConsumer { /** * 创立交换机并且绑定队列1(绑定routingkey1) * * @param content 内容 * @param channel 通道 * @param message 音讯 * @throws IOException ioexception * @throws TimeoutException 超时异样 */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC), value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_1, durable = "true"), key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING1_KEY_1 )) @RabbitHandler public void exchangeTopicRoutingKey1(String content, Channel channel, Message message) { log.info("#号统配符号队列1接管到音讯:{}",content); } /** * 创立交换机并且绑定队列2(绑定routingkey2) */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC), value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_2, durable = "true"), key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING2_KEY_2 )) @RabbitHandler public void exchangeTopicRoutingKey2(String content, Channel channel, Message message) { log.info("*号统配符号队列2接管到音讯:{}",content); }}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/topic/sendMessage?routingkey=1&message=前后多重匹配http://localhost:8021/exchange/topic/sendMessage?routingkey=2&message=后一个词匹配
控制台打印生产信息:
2022-08-22 15:10:50.444 INFO 1376 --- [ntContainer#4-8] c.g.b.s.consumer.ExchangeTopicConsumer : #号统配符号队列1接管到音讯:前后多重匹配2022-08-22 15:10:55.118 INFO 1376 --- [ntContainer#5-8] c.g.b.s.consumer.ExchangeTopicConsumer : *号统配符号队列2接管到音讯:后一个词匹配2022-08-22 15:10:55.119 INFO 1376 --- [ntContainer#4-9] c.g.b.s.consumer.ExchangeTopicConsumer : #号统配符号队列1接管到音讯:后一个词匹配
手动ACK与音讯确认机制
新增SpringBoot配置文件YAML,这里次要将主动ACK批改为手工ACK并且开启音讯确认模式与音讯回退:
spring: rabbitmq: listener: acknowledge-mode: manual # MANUAL:手动解决 AUTO:主动解决 # NONE值是禁用公布确认模式,是默认值 # CORRELATED值是公布音讯胜利到交换器后会触发回调办法,如1示例 # SIMPLE值经测试有两种成果,其一成果和CORRELATED值一样会触发回调办法,其二在公布音讯胜利后应用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie办法期待broker节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是waitForConfirmsOrDie办法如果返回false则会敞开channel,则接下来无奈发送音讯到broker; publisher-confirm-type: simple #音讯确认机制 publisher-returns: true # 音讯回退确认机制
定义音讯回调确认实现类:
/** * 消费者确认收到音讯后,手动ack回执回调解决 * @author wuwentao */@Slf4j@Componentpublic class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("==================================================="); log.info("音讯确认机制回调函数参数信息如下:"); log.info("ACK状态:{}",ack); log.info("投递失败起因:{}",cause); log.info("==================================================="); }}
消费者:
/** * RabbitMQ Message 回调地址消费者测试 * @author wuwentao */@Component@Slf4jpublic class MessagesCallbackConsumer { @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = MessagesCallbackProducer.MESSAGE_CALLBACK_EXCHANGE, durable = "true", type = ExchangeTypes.DIRECT), value = @Queue(value = MessagesCallbackProducer.MESSAGE_CALLBACK_QUEUE, durable = "true"), key = MessagesCallbackProducer.MESSAGE_CALLBACK_ROUTINGKEY )) @RabbitHandler public void consumer(String content, Channel channel, Message message) throws IOException { if("胜利".equals(content)){ log.info("音讯解决胜利:{}",content); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认音讯生产胜利 }else{ if(message.getMessageProperties().getRedelivered()){ log.info("音讯已被解决过了请勿反复解决音讯:{}",content); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 回绝该音讯,音讯会被抛弃,不会重回队列 }else{ log.info("音讯解决失败期待重新处理:{}",content); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }}
生产者:
/** * 音讯回调机制测试 * @author wuwentao */@RestController@RequestMapping("/message/callback")@AllArgsConstructorpublic class MessagesCallbackProducer { private RabbitTemplate rabbitTemplate; private MessageConfirmCallback messageConfirmCallback; // 发送到的队列名称 public static final String MESSAGE_CALLBACK_QUEUE = "amqp.message.callback.queue"; public static final String MESSAGE_CALLBACK_EXCHANGE = "amqp.message.callback.exchange"; public static final String MESSAGE_CALLBACK_ROUTINGKEY = "amqp.message.callback.routingkey"; /** * 测试音讯确认机制 * @param message 音讯内容 */ @GetMapping("/sendMessage") public String sendMessage(@RequestParam(value = "message") String message){ // 设置失败和确认回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(messageConfirmCallback); //构建回调id为uuid String callBackId = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(callBackId); if("失败的音讯".equals(message)){ // 写一个不存的替换机器 和不存在的路由KEY rabbitTemplate.convertAndSend("sdfdsafadsf","123dsfdasf",message, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return msg; },correlationData); }else{ rabbitTemplate.convertAndSend(MESSAGE_CALLBACK_EXCHANGE,MESSAGE_CALLBACK_ROUTINGKEY,message, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return msg; },correlationData); } return "OK"; }}
测试生成音讯拜访接口地址:
# 发送找不到交换机的音讯http://localhost:8021/message/callback/sendMessage?message=失败的音讯# 发送手动ACK胜利的音讯http://localhost:8021/message/callback/sendMessage?message=胜利# 发送手动ACK失败的音讯http://localhost:8021/message/callback/sendMessage?message=失败
控制台打印生产信息:
2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:2022-08-24 09:11:50.123 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK状态:false2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'sdfdsafadsf' in vhost '/', class-id=60, method-id=40)2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================2022-08-24 09:12:02.704 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK状态:true2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:null2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================2022-08-24 09:12:02.735 INFO 11440 --- [ntContainer#6-1] c.g.b.s.c.MessagesCallbackConsumer : 音讯解决胜利:胜利2022-08-24 09:12:16.680 INFO 11440 --- [ntContainer#6-4] c.g.b.s.c.MessagesCallbackConsumer : 音讯解决失败期待重新处理:失败2022-08-24 09:12:16.688 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK状态:true2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:null2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================2022-08-24 09:12:16.693 INFO 11440 --- [ntContainer#6-7] c.g.b.s.c.MessagesCallbackConsumer : 音讯已被解决过了请勿反复解决音讯:失败