1.音讯公布确认的计划
2.音讯的回退
3.备份交换机
1.音讯公布确认的计划
在后面的文章中,零碎学习音讯队列——RabbitMQ的音讯公布确认,咱们肯定水平上学习了音讯的公布确认的根底,然而在生产环境中,因为RabbitMq的重启,RabbitMQ在重启过程中投递失败,导致音讯失落,须要手动解决和复原。那么咱们该如何保障当RabbitMQ不可用的时候,音讯的稳固投递呢?
咱们采取上面的计划:
咱们将要发送音讯做一个长久化,发送音讯的时候,咱们长久化一份到数据库或者缓存中,当发送音讯失败的时候,咱们进行一次从新发送。所以在发送音讯的时候,咱们要进行代码业务逻辑的解决:
yml:
server: port: 11000spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest publisher-confirm-type: correlated
publisher-confirm-type这个参数一共有三种配置办法:
NONE:禁用公布确认,是默认值。CORRELATED:公布音讯后,替换机会触发回调办法。SIMPLE:有两种成果:1:和CORRELATED一样会触发回调办法2:公布音讯胜利后应用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 办法期待 broker 节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是waitForConfirmsOrDie 办法如果返回 false 则会敞开 channel,则接下来无奈发送音讯到 broker。
回调办法类:
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback { /** * 交换机是否收到音讯的回调办法 * CorrelationData 音讯相干数据 * ack 交换机是否收到音讯 * cause 交换机未收到音讯的起因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("交换机曾经收到 id 为:{}的音讯", correlationData.getId()); } else { log.info("交换机还未收到 id 为:{}音讯,因为起因:{}", correlationData.getId(), cause); } }}
队列配置类:
@Configurationpublic class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @Autowired private MyCallBack myCallBack; @Autowired private RabbitTemplate rabbitTemplate; //依赖注入 rabbitTemplate 之后再设置它的回调对象 @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(myCallBack); } //申明业务 Exchange @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } // 申明确认队列 @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } // 申明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key1"); }}
生产者:
@RestController@RequestMapping("/confirm")@Slf4jpublic class ProducerController { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message) { //指定音讯 id 为 1 CorrelationData correlationData1 = new CorrelationData("1"); //这个key1是有交换机的key,会发送胜利 String routingKey = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1); //这个交换机不存在,会发送失败 CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"1", routingKey, message + routingKey, correlationData2); CorrelationData correlationData3 = new CorrelationData("3"); //这个key2是没有交换机的key,会发送失败 routingKey = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData3); log.info("发送音讯内容:{}", message); }}
消费者:
@Component@Slf4jpublic class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @RabbitListener(queues =CONFIRM_QUEUE_NAME) public void receiveMsg(Message message){ String msg=new String(message.getBody()); log.info("承受到队列 confirm.queue 音讯:{}",msg); }}
咱们发送信息:
http://localhost:11000/confir...能够啊
咱们发送三条音讯:
一条是有交换机有队列的音讯
二条是没有交换机的音讯
三条是有交换机没有队列的音讯
后果如下:
咱们能够看出:
第一条音讯失常生产
第二条音讯找不到交换机,抛异样了
第三条音讯绑定键找不到队列,这条音讯间接被抛弃了
2.音讯的回退
咱们发现第三条音讯的反馈并不是很好,在仅仅开启了生产者确认机制的状况下,交换机收到音讯后,会间接给生产者发送确认音讯,如果该音讯不可路由,那么音讯会间接被摈弃,此时生产者是不晓得这条音讯被抛弃的。所以咱们这里要引入音讯的回退机制,如果音讯不能路由到队列,就会有一个告诉,通过设置mandatory参数能够将不可到达队列的音讯返回给生产者。
回调解决逻辑:
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { /** * 交换机是否收到音讯的回调办法 * CorrelationData 音讯相干数据 * ack 交换机是否收到音讯 * cause 交换机未收到音讯的起因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("交换机曾经收到 id 为:{}的音讯", correlationData.getId()); } else { log.info("交换机还未收到 id 为:{}音讯,因为起因:{}", correlationData.getId(), cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new String(message.getBody()), exchange, replyText, routingKey); }}
批改一下后面那个配置类的办法:
//依赖注入 rabbitTemplate 之后再设置它的回调对象 @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(myCallBack); /** * true: * 交换机无奈将音讯进行路由时,会将该音讯返回给生产者 * false: * 如果发现音讯无奈进行路由,则间接抛弃 */ rabbitTemplate.setMandatory(true); //设置回退音讯交给谁解决 rabbitTemplate.setReturnCallback(myCallBack); }
持续发送音讯:http://localhost:11000/confir...能够啊
咱们发现,交换机路由不到的队列,也会有反馈了:
3.备份交换机
有了后面那个mandatory参数和回退音讯,咱们对于无奈投递到目的地的音讯,能够进行解决了。然而咱们在解决这些日志的时候,顶多就是打印了一下日志,而后触发报警,接着手动进行解决。通过日志收集这些无奈达到路由的音讯十分不优雅,而且手动复制日志非常容易出错。而且mandatory参数设置,还得减少配置类,减少了复杂性。
如果咱们不想失落音讯,又不想减少配置类,该怎么做呢?在后面学习死信队列的时候零碎学习音讯队列——RabbitMQ的死信队列,咱们能够为队列设置死信交换机来解决那些失败的音讯。
RabbitMQ中有备份交换机这种存在,它就像死信交换机一样,能够用来解决那些路由不到的音讯,当交换机接管到一份不可路由的音讯的时候,咱们就会把这条音讯转发到备份交换机中,由备份交换机进行对立解决。
@Configurationpublic class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; // 申明确认队列 @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //申明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key1"); } //申明备份 Exchange @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //申明确认 Exchange 交换机的备份交换机 @Bean("confirmExchange") public DirectExchange confirmExchange(){ //设置该交换机的备份交换机 ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); return (DirectExchange)exchangeBuilder.build(); } // 申明正告队列 @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } // 申明报警队列绑定关系 @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); } // 申明备份队列 @Bean("backQueue") public Queue backQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } // 申明备份队列绑定关系 @Bean public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); }}
咱们发现,不可路由的音讯被发现后,就被送到了报警的备份队列外面。
而且这种配置的优先级,比mandatory参数更高。