乐趣区

关于rabbitmq:系统学习消息队列RabbitMQ的发布确认高级篇

1. 音讯公布确认的计划

2. 音讯的回退

3. 备份交换机

1. 音讯公布确认的计划
在后面的文章中,零碎学习音讯队列——RabbitMQ 的音讯公布确认,咱们肯定水平上学习了音讯的公布确认的根底,然而在生产环境中,因为 RabbitMq 的重启,RabbitMQ 在重启过程中投递失败,导致音讯失落,须要手动解决和复原。那么咱们该如何保障当RabbitMQ 不可用的时候,音讯的 稳固投递 呢?

咱们采取上面的计划:


咱们将要发送音讯做一个长久化,发送音讯的时候,咱们长久化一份到数据库或者缓存中 ,当发送音讯 失败 的时候,咱们进行一次 从新发送。所以在发送音讯的时候,咱们要进行代码业务逻辑的解决:

yml:

server:
  port: 11000
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated

publisher-confirm-type 这个参数一共有三种配置办法:

NONE:
禁用公布确认,是默认值。CORRELATED:
公布音讯后,替换机会触发回调办法。SIMPLE:
有两种成果:1:和 CORRELATED 一样会触发回调办法
2:公布音讯胜利后应用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 办法期待 broker 节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是 waitForConfirmsOrDie 办法如果返回 false 则会敞开 channel,则接下来无奈发送音讯到 broker。

回调办法类:

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    /**
     * 交换机是否收到音讯的回调办法
     * CorrelationData 音讯相干数据
     * ack 交换机是否收到音讯
     * cause 交换机未收到音讯的起因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("交换机曾经收到 id 为:{}的音讯", correlationData.getId());
        } else {log.info("交换机还未收到 id 为:{}音讯, 因为起因:{}", correlationData.getId(), cause);
        }
    }

}

队列配置类:

@Configuration
public class ConfirmQueueConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    
    @Autowired
    private MyCallBack myCallBack;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {rabbitTemplate.setConfirmCallback(myCallBack);
    }

    // 申明业务 Exchange
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    // 申明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}

    // 申明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");
    }


}

生产者:

@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        // 指定音讯 id 为 1
        CorrelationData correlationData1 = new CorrelationData("1");
        // 这个 key1 是有交换机的 key,会发送胜利
        String routingKey = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
        // 这个交换机不存在,会发送失败
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"1", routingKey, message + routingKey, correlationData2);
        CorrelationData correlationData3 = new CorrelationData("3");
        // 这个 key2 是没有交换机的 key,会发送失败
        routingKey = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData3);
        log.info("发送音讯内容:{}", message);
    }
}

消费者:

@Component
@Slf4j
public class ConfirmConsumer {

    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues =CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message){String msg=new String(message.getBody());
        log.info("承受到队列 confirm.queue 音讯:{}",msg);
    }


}

咱们发送信息:
http://localhost:11000/confir… 能够啊

咱们发送 三条音讯
一条是 有交换机有队列的音讯
二条是 没有交换机的音讯
三条是 有交换机没有队列的音讯

后果如下:

咱们能够看出:
第一条音讯 失常生产
第二条音讯 找不到交换机,抛异样了
第三条音讯 绑定键找不到队列,这条音讯间接被抛弃了

2. 音讯的回退

咱们发现 第三条音讯的反馈并不是很好 ,在仅仅开启了生产者确认机制的状况下,交换机收到音讯后,会间接给生产者发送确认音讯, 如果该音讯不可路由,那么音讯会间接被摈弃,此时 生产者是不晓得这条音讯被抛弃的 。所以咱们这里要引入音讯的 回退机制 ,如果音讯 不能路由到队列 ,就会有一个 告诉 ,通过设置mandatory 参数 能够将不可到达队列的音讯返回给生产者。

回调解决逻辑:

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    /**
     * 交换机是否收到音讯的回调办法
     * CorrelationData 音讯相干数据
     * ack 交换机是否收到音讯
     * cause 交换机未收到音讯的起因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("交换机曾经收到 id 为:{}的音讯", correlationData.getId());
        } else {log.info("交换机还未收到 id 为:{}音讯, 因为起因:{}", correlationData.getId(), cause);
        }
    }


    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("消 息 {}, 被 交 换 机 {} 退 回,退 回 原 因 :{}, 路 由 key:{}",
                new String(message.getBody()),
                exchange,
                replyText,
                routingKey);
    }



}

批改一下后面那个配置类的办法:

    // 依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {rabbitTemplate.setConfirmCallback(myCallBack);
        /**
         * true:* 交换机无奈将音讯进行路由时,会将该音讯返回给生产者
         * false:* 如果发现音讯无奈进行路由,则间接抛弃
         */
        rabbitTemplate.setMandatory(true);
        // 设置回退音讯交给谁解决
        rabbitTemplate.setReturnCallback(myCallBack);

    }

持续发送音讯:http://localhost:11000/confir… 能够啊

咱们发现,交换机路由不到的队列,也会有反馈了:

3. 备份交换机

有了后面那个 mandatory 参数和回退音讯,咱们对于无奈投递到目的地的音讯,能够进行解决了。然而咱们在解决这些日志的时候,顶多就是打印了一下日志 ,而后触发报警,接着手动进行解决。 通过日志收集这些无奈达到路由的音讯十分不优雅 ,而且 手动复制日志非常容易出错。而且 mandatory 参数设置,还得减少配置类,减少了复杂性。

如果咱们不想失落音讯,又不想减少配置类,该怎么做呢?在后面学习死信队列的时候零碎学习音讯队列——RabbitMQ 的死信队列,咱们能够为 队列设置死信交换机 来解决那些失败的音讯。

RabbitMQ 中有 备份交换机 这种存在,它就 像死信交换机一样 ,能够用来 解决那些路由不到的音讯 ,当 交换机接管到一份不可路由的音讯的时候,咱们就会把这条音讯转发到备份交换机中,由备份交换机进行对立解决。

@Configuration
public class ConfirmQueueConfig {


    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    // 申明确认队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}
    // 申明确认队列绑定关系
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
                                @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");
    }

    // 申明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    
    // 申明确认 Exchange 交换机的备份交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        // 设置该交换机的备份交换机
        ExchangeBuilder exchangeBuilder =
            ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                    .durable(true)
                    .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); 
        return (DirectExchange)exchangeBuilder.build();}
    // 申明正告队列
    @Bean("warningQueue")
    public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}
    // 申明报警队列绑定关系
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
                                  @Qualifier("backupExchange") FanoutExchange
                                          backupExchange){return BindingBuilder.bind(queue).to(backupExchange);
    }
    // 申明备份队列
    @Bean("backQueue")
    public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}
    // 申明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);
    }

}

咱们发现,不可路由的音讯被发现后,就被送到了报警的备份队列外面。

而且这种配置的优先级,比 mandatory 参数更高。

退出移动版