前情提要:rabbitmq 治理界面查看姿态
一、疾速搭建/根本信息发送和生产
1、引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、application.yml
spring: rabbitmq: host: ipXXX port: 5672 username: 账户XXX password: 明码XXX virtual-host: /wen # 交换器名称
以 direct模式为例
1、配置文件import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/@Configurationpublic class RabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue emailQueue() { // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效 // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。 // return new Queue("TestDirectQueue",true,true,false); //个别设置一下队列的长久化就好,其余两个就是默认false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效 // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。 // return new Queue("TestDirectQueue",true,true,false); //个别设置一下队列的长久化就好,其余两个就是默认false return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效 // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。 // return new Queue("TestDirectQueue",true,true,false); //个别设置一下队列的长久化就好,其余两个就是默认false return new Queue("weixin.fanout.queue", true); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则30秒后过期 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } //Direct交换机 起名:TestDirectExchange @Bean public DirectExchange fanoutOrderExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("fanout_exchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); } @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with(""); }}2、生产者package com.pit.barberShop.common.MQ.Rabbit.fanout;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author :wenye * @date :Created in 2021/6/15 21:41 * @description:播送模式 * @version: $ */@RestController@RequestMapping("/rabbitmq")public class ProducerFanout { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定义交换机 private String exchangeName = "fanout_exchange"; // 2: 路由key private String routeKey = ""; @RequestMapping("/fanout") public void markerFanout() { String message ="shua"; // 发送音讯 rabbitTemplate.convertAndSend(exchangeName, routeKey, message); } @RequestMapping("/ttl") public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒 byte[] msgBytes = "测试音讯主动过期".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message); return "ok"; }}3、消费者package com.pit.barberShop.common.MQ.Rabbit.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;/** * @author :wenye * @date :Created in 2021/6/15 22:07 * @description:fanout消费者 * @version: $ */@Componentpublic class ConsumerFanout { @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。 value = @Queue(value = "sms.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_exchange", // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式 type = ExchangeTypes.DIRECT) )) public void messagerevice(String message){ // 此处省略发邮件的逻辑 System.out.println("sms-two111------------->" + message); } @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "fanout_exchange", // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式 type = ExchangeTypes.DIRECT) )) public void messageWXrevice(String message){ // 此处省略发邮件的逻辑 System.out.println("weixin----two---------->" + message); }}
二、过期工夫
1、生产者发送音讯时设置过期工夫 public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒 byte[] msgBytes = "测试音讯主动过期".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message); return "ok"; }2、队列中的所有音讯设置过期工夫配置中增加@Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则30秒后过期 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则30秒后过期 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); }
三、音讯确认机制配置
参考:https://blog.csdn.net/qq33098...
默认是自动应答
spring: rabbitmq: # 开启发送确认 publisher-confirms: true # 开启发送失败退回 publisher-returns: true
目前回调存在ConfirmCallback和ReturnCallback两者。他们的区别在于
如果音讯没有到exchange,则ConfirmCallback回调,ack=false,
如果音讯达到exchange,则ConfirmCallback回调,ack=true
exchange到queue胜利,则不回调ReturnCallback
rabbitMQ 音讯生产者发送音讯的流程
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;@Slf4j@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /** * correlationData:对象外部只有一个 id 属性,用来示意以后音讯的唯一性。 * ack:音讯投递到broker 的状态,true示意胜利。 * cause:示意投递失败的起因。 **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (!ack) { log.error("音讯发送异样!"); } else { log.info("发送者爸爸曾经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } }}import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;@Slf4j@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback { //重写 returnedMessage() 办法,办法有五个参数message(音讯体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey路由(队列) @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); }}配置文件1、避免反复签发ack须要在配置类中重写 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //此处也设置为手动ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }2、从新创立设置交换器和队列属性@Bean public Queue chongfuQueue() { // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效 // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。 // return new Queue("TestDirectQueue",true,true,false); //个别设置一下队列的长久化就好,其余两个就是默认false return new Queue("chongfu.fanout.queue", true);} //Direct交换机 起名:TestDirectExchange@Beanpublic DirectExchange chongfuExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("chongfu_exchange", true, false);}@Beanpublic Binding bindingDirect4() { return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with("");}生产者public void markerchongfu() { /** * 确保音讯发送失败后能够从新返回到队列中 * 留神:yml须要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消费者确认收到音讯后,手动ack回执回调解决 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 音讯投递到队列失败回调解决 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 发送音讯 */ String s = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帅哥", message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(s)); }消费者@RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。 value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "chongfu_exchange", // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式 type = ExchangeTypes.DIRECT) )) public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到音讯:{}", msg);// log.info("序号:{}", message.getMessageProperties().getDeliveryTag());// System.out.println(msg); //TODO 具体业务 // 收到音讯 basicAck() channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("音讯已反复解决失败,回绝再次接管..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 回绝音讯 } else { log.error("音讯行将再次返回队列解决..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
生产音讯有三种回执办法
1、basicAck
basicAck:示意胜利确认,应用此回执办法后,音讯会被rabbitmq broker 删除。
void basicAck(long deliveryTag, boolean multiple)
- deliveryTag:示意音讯投递序号,每次生产音讯或者音讯从新投递后,deliveryTag都会减少。手动音讯确认模式下,咱们能够对指定deliveryTag的音讯进行ack、nack、reject等操作。
- multiple:是否批量确认,值为 true 则会一次性 ack所有小于以后音讯 deliveryTag 的音讯。
举个栗子: 假如我先发送三条音讯deliveryTag别离是5、6、7,可它们都没有被确认,当我发第四条音讯此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的音讯全副进行确认。
2、basicNack
basicNack :示意失败确认,个别在生产音讯业务异样时用到此办法,能够将音讯从新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:示意音讯投递序号。
- multiple:是否批量确认。
- requeue:值为 true 音讯将从新入队列。
3、basicReject
basicReject:回绝音讯,与basicNack区别在于不能进行批量操作,其余用法很类似。
void basicReject(long deliveryTag, boolean requeue)
- deliveryTag:示意音讯投递序号。
- requeue:值为 true 音讯将从新入队列。
四、死信队列
死信队列其实和一般的队列没啥大的区别,都须要创立本人的Queue、Exchange,而后通过RoutingKey绑定到Exchange下来,只不过死信队列的RoutingKey和Exchange要作为参数,绑定到失常的队列下来,一种利用场景是失常队列外面的音讯被basicNack或者reject时,音讯就会被路由到失常队列绑定的死信队列中,还有一种还有罕用的场景就是开启了主动签收,而后消费者生产音讯时出现异常,超过了重试次数,那么这条音讯也会进入死信队列,如果配置了话,
例子
//模仿异样用的交换器 ,topic交换器会通配符匹配,当然字符串截然不同也会匹配 @Bean TopicExchange emailExchange() { return new TopicExchange("demoTopicExchange"); } //死信队列 @Bean public Queue deadLetterQueue() { return new Queue("demo.dead.letter"); } //死信交换器 @Bean TopicExchange deadLetterExchange() { return new TopicExchange("demoDeadLetterTopicExchange"); } //绑定死信队列 @Bean Binding bindingDeadLetterQueue() { return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter"); }生产者@RequestMapping("/sixin") public void sendEmailMessage() { CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData); log.info("---发送 email 音讯---{}---messageId---{}","111",correlationData.getId()); }消费者 /** * 邮件消费者 * @param message * @param channel * @throws IOException */ @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。 value = @Queue(value = "demo.email",autoDelete = "false", arguments = { @Argument(name = "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"), @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"), @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") }), key = "demo.email", // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "demoTopicExchange", // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式 type = ExchangeTypes.TOPIC) )) public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException { try { log.info("---承受到音讯---{}",msg); //被动异样 int m=1/0; //手动签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { //异样,ture 从新入队,或者false,进入死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } } /** * 死信消费者,主动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack * @param message */ @RabbitListener(queues = "demo.dead.letter") public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException { //能够思考数据库记录,每次进来查数量,达到肯定的数量,进行预警,人工染指解决 log.info("接管到死信音讯:---{}---音讯ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation")); //回复ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
同样也可应用java类配置
@Bean public Queue emailQueue() { Map<String, Object> arguments = new HashMap<>(2); // 绑定死信交换机 arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange"); // 绑定死信的路由key arguments.put("x-dead-letter-routing-key", "demo.dead.letter"); arguments.put("x-message-ttl", 3000); return new Queue(emailQueue,true,false,false,arguments); } @Bean TopicExchange emailExchange() { return new TopicExchange(topicExchange); } @Bean Binding bindingEmailQueue() { return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#"); }
五、长久化机制和内存磁盘监控
1、长久化
RabbitMQ的长久化队列分为:
1:队列长久化
2:音讯长久化
3:交换机长久化
不论是长久化的音讯还是非长久化的音讯都能够写入到磁盘中,只不过非长久的是等内存不足的状况下才会被写入到磁盘中。
2、内存磁盘监控
六、分布式事务
七、配置详解
rabbitmq: addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连贯到的server的地址,多个以逗号分隔(优先取addresses,而后再取host)# port: ##集群配置 addresses之间用逗号隔开 # addresses: ip:port,ip:port password: admin username: 123456 virtual-host: / # 连贯到rabbitMQ的vhost requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s publisher-confirms: #是否启用 公布确认 publisher-reurns: # 是否启用公布返回 connection-timeout: #连贯超时,单位毫秒,0示意无穷大,不超时 cache: channel.size: # 缓存中放弃的channel数量 channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时工夫,单位毫秒;如果为0,则总是创立一个新channel connection.size: # 缓存的连接数,只有是CONNECTION模式时失效 connection.mode: # 连贯工厂缓存模式:CHANNEL 和 CONNECTION listener: simple.auto-startup: # 是否启动时主动启动容器 simple.acknowledge-mode: # 示意音讯确认形式,其有三种配置形式,别离是none、manual和auto;默认auto simple.concurrency: # 最小的消费者数量 simple.max-concurrency: # 最大的消费者数量 simple.prefetch: # 指定一个申请能解决多少个音讯,如果有事务的话,必须大于等于transaction数量. simple.transaction-size: # 指定一个事务处理的音讯数量,最好是小于等于prefetch的数量. simple.default-requeue-rejected: # 决定被回绝的音讯是否从新入队;默认是true(与参数acknowledge-mode有关系) simple.idle-event-interval: # 多少长时间公布闲暇容器工夫,单位毫秒 simple.retry.enabled: # 监听重试是否可用 simple.retry.max-attempts: # 最大重试次数 simple.retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离 simple.retry.multiplier: # 利用于上一重试距离的乘数 simple.retry.max-interval: # 最大重试工夫距离 simple.retry.stateless: # 重试是有状态or无状态 template: mandatory: # 启用强制信息;默认false receive-timeout: # receive() 操作的超时工夫 reply-timeout: # sendAndReceive() 操作的超时工夫 retry.enabled: # 发送重试是否可用 retry.max-attempts: # 最大重试次数 retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离 retry.multiplier: # 利用于上一重试距离的乘数 retry.max-interval: #最大重试工夫距离