唠个嗑

网络上收罗了屡次想晓得 RabbitMQ 事实业务种怎么实现音讯的可靠性的,然而大多都不太现实,站在各位大佬伟人的肩膀上钻研了一段时间,我也整顿了一套简略可行性的计划,包含音讯异样解决。这篇文章想次要讲一些业务解决计划,我的项目中退出 RabbitMQ 中间件很简略,然而依据具体业务实现音讯的可靠性,这个须要多加思考。当然上面也会通过测试代码来剖析,文末也会附上源码地址。

1、筹备

1.1、环境筹备

之前博客上写过一篇编译装置的办法 地址,大家能够参考,因为 RabbitMQ 底层语言的起因可能略微麻烦点,那就没有方法了吗?如果你是先搞测试,再在我的项目中应用的话,那能够应用 docker 装置,2 行代码,如下

docker pull rabbitmqdocker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

1.2、实践筹备

RabbitMQ 的类型包含:direct、topic、fanout、headers、system(翻源码看到的)

这里次要通过 topic 来剖析,bindingkey 能够通过通配符 # 和 * 来匹配多个 路由键 (routingKey),
bindingkey 是绑定交换机(exchange)和队列(queue)的, 生产者(publisher)发消息的时候会携带 routingKey、exchange 和 音讯发送给 RabbitMQ,
连贯胜利后理论是组件 exchange 接管了生产者的音讯,而后通过 bindingkey 匹配 routingKey,决定送给哪个 queue,每个消费者都会有 queue,所以 queue接管到音讯 后就能够确保消费者接能够收到音讯了,最初消费者再生产。

再具体的内容能够查看大佬 erlie 的总结 地址

2、音讯确认

RabbitMQ 根底配置

pom

<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.2.5.RELEASE</version>    <relativePath/> <!-- lookup parent from repository --></parent>    <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

application.yml

spring:  rabbitmq:    host: 192.168.1.105    port: 5672    username: guest    password: guest    virtual-host: /# 交换机、队列和绑定键申明test:  exchange: test.exchange  one:    queue: one.test.queue    binding-key: one.test.key

consumer

@RabbitListener(bindings=@QueueBinding(                //配置交换器                exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),                //配置路由键                key="${test.one.binding-key}",                //配置队列名称                value=@Queue(value="${test.one.queue}",autoDelete="true")))public void test(String msg) {    log.info("test 收到的音讯为:[{}]", msg);    //业务代码...}

publisher

@Autowiredprivate RabbitTemplate rabbitTemplate;@Value("${test.exchange}")private String exchange;@Value("${test.one.binding-key}")private String routingKey;public void test() {    rabbitTemplate.convertAndSend(exchange, routingKey, "test msg");}

通过下面默认的配置基本上就能够应用 RabbitMQ 了,然而这不是本篇的重点.咱们要晓得音讯发送和到生产的过程中呈现问题怎么办?这就须要咱们分段确认音讯是否接管胜利,如果失败了该如何解决,先想想能够怎么做。
咱们先捋一下思路,音讯发送给 RabbitMQ ,如果连贯 RabbitMQ 失败,则记录该音讯,如果连贯胜利然而 exchange 接管失败则记录下该音讯,如果 exchange 接管胜利然而 queue 接管 exchange 的音讯失败则记录下该音讯,音讯从生产者到 queue 有 3 个地位可能因网络抖动或其余起因呈现问题,那咱们在这三个地位记录下问题后,对立通过打算定时获取记录的音讯并且从新发送,如果重发三次还没有胜利则标记该记录为异样音讯。

2.1、生产者音讯确认和回调

这里分两步:

  1. 音讯确认指的是 RabbitMQ(exchange) 确认接管到了消费者发送的音讯
  2. 音讯回调指的是 queue 接管 exchange 的音讯失败,则回调通知 RabbitMQ 失败的音讯

2.1.1、音讯确认

开启配置

spring:  rabbitmq:    #publisher-confirms: true #已过期    publisher-confirm-type: correlated #开启生产者音讯确认

还有另外 2 种模式:

  1. none 值是禁用公布确认模式,是默认值;
  2. simple 值经测试有两种成果,其一成果和 correlated 值一样会触发回调办法,其二在公布音讯胜利后应用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 办法期待 broker 节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是 waitForConfirmsOrDie 办法如果返回false则会敞开 channel,则接下来无奈发送音讯到 broker。

当然还需新建一个类实现 RabbitTemplate.ConfirmCallback,重写办法 confirm,该办法有三个参数 correlationData、ack、cause,次要说下 ack,值为 true 示意 exchange 胜利接管到音讯,false 示意 exchange 接管音讯失败,这里 2 种后果能够别离解决,比方 false 能够把接管失败的音讯入库,而后通过定时器来解决,比拟懂的同学当初可能就有疑难了,correlationData 只能失去 msgId,基本没有具体的音讯,这里能够施展你杰出的想象力,能够通过对象封装失去,也能够通过存内存或者磁盘存储失去,办法总比艰难多。

@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {   String msgId = correlationData.getId();   if (ack) {       log.info("胜利发送给 mq, msgId:[{}]", msgId);   } else {       log.error("发送给 mq 失败, msgId:[{}], 失败理由cause:[{}]", msgId, cause);       //音讯从生产者没有到 exchange,那存库       saveToDB(msgId,...);   }}

这里还差一步,就是原生的 rabbitTemplate 怎么晓得音讯确认时应用刚建的类呢,注入即可
rabbitTemplate.setConfirmCallback(刚建的类);如有疑难可看文末源码。

2.1.2、音讯回调

spring:  rabbitmq:    publisher-returns: true #开启生产者音讯回调

同上,须要新建类并实现 RabbitTemplate.ReturnCallback,并且重写办法 returnedMessage, 最初须要注入如下内容

rabbitTemplate.setReturnCallback(刚建的类);// 要想使 returnCallback 失效,必须设置为truerabbitTemplate.setMandatory(true);
@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {    String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");    String msg = new String(message.getBody());    log.error("音讯回调 msgId:[{}], msg:[{}] 不能被正确路由,replyCode:[{}], replyText:[{}], exchange:[{}], routingKey:[{}]", msgId, msg, replyCode, replyText, exchange, routingKey);    //音讯从 exchange 没有到 queue, 那存库    saveToDB(msgId, exchange, routingKey, msg);}

如果音讯发送时走到了回调办法 returnedMessage 中,阐明目前的音讯有问题是须要解决的,同上,入库。定时器来解决。

当然音讯的发送办法 rabbitTemplate.convertAndSend() 会多一个参数 correlationData

具体解决办法能够参考源码,这里只提供思路。

2.2、消费者音讯确认

消费者音讯确认是确认生产了队列中的音讯,如果呈现问题 RabbitMQ 会有重试机制,长时间失败则须要人工干预,这个和生产者的确认是先后关系,理论是没有关联关系的,说到这,说下我之前转的牛角尖,始终想寻找 exchange 如何确认消费者胜利生产音讯,然而无果,起初细想,RabbitMQ 应该设计的就是消费者和 queue 交互,没必要和 exchange 交互。如有大佬晓得 exchange 如何 ack 消费者生产音讯能够通知小弟,不胜感激。
圆规正转,上消费者代码

spring:  rabbitmq:    listener:      simple:        acknowledge-mode: manual #开启消费者音讯确认; none:主动确认、auto:依据状况确认
@RabbitListener(bindings=@QueueBinding(                    //配置交换器                    exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),                    //配置路由键                    key="${test.one.binding-key}",                    //配置队列名称                    value=@Queue(value="${test.one.queue}",autoDelete="true")))    public void test(Message message, Channel channel) {        String msg = new String(message.getBody());        log.info("test 收到的音讯为:[{}], msgId:[{}]", msg, message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));        try {            //业务解决            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {            e.printStackTrace();            try {                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);            } catch (IOException ioException) {                ioException.printStackTrace();            }        }    }

次要说 3 个办法:

1、basicAck 是确认音讯,须要传递两个参数

  1. deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会建设起一个 Channel ,RabbitMQ 会用 basic.deliver 办法向消费者推送音讯,这个办法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条音讯的惟一标识 ID,是一个枯燥递增的正整数,delivery tag 的范畴仅限于 Channel
  2. multiple:为了缩小网络流量,手动确认能够被批处理,当该参数为 true 时,则能够一次性确认 delivery_tag 小于等于传入值的所有音讯,false则只确认传入值等于 delivery_tag 的音讯

2、basicNack 是回绝音讯,能够回绝多条,比 basicAck 多一个布尔值的参数,如果为 true,被 nack 后从新入队列而后从新生产生产;如果为 false 被 nack 就丢了。

3、basicReject 只能回绝一条音讯,reject 后音讯间接丢了。

总结

这里简略实现 RabbitMQ 音讯牢靠的形式是通过把音讯发送时呈现问题后间接入库,而后通过打算定时查问再从新发送给 RabbitMQ,如果 exchange 胜利 ack 后则标记为重发胜利,如果重发 3 次还是失败则标记异样,须要人工解决。

探讨

这种解决其实不算是最优的计划,技术上还能够有如下计划

  1. 生产者发消息时记录该条音讯,并设该记录 1 分钟后失效,留 1 分钟给exchange 确认并间接标记该音讯记录为胜利,而后打算工作定时扫无效且未确认的音讯,再发送给 RabbitMQ ,如果确认后则标记为胜利,否则 3 次后标记为失败。
  2. 还和小编写的计划相似,记录失败的音讯,然而定时工作获取到失败的音讯后,间接调用消费者的服务解决,不通过 RabbitMQ, 然而这就须要保护音讯和消费者服务的关系了,略微简单些。
最初心愿能够帮到看官,如果记录的不对烦请评论指出,一起探讨

https://github.com/charmsongo/springboot-samples/tree/master/springboot-rabbitmq-songo