唠个嗑
网络上收罗了屡次想晓得 RabbitMQ 事实业务种怎么实现音讯的可靠性的,然而大多都不太现实,站在各位大佬伟人的肩膀上钻研了一段时间,我也整顿了一套简略可行性的计划,包含音讯异样解决。这篇文章想次要讲一些业务解决计划,我的项目中退出 RabbitMQ 中间件很简略,然而依据具体业务实现音讯的可靠性,这个须要多加思考。当然上面也会通过测试代码来剖析,文末也会附上源码地址。
1、筹备
1.1、环境筹备
之前博客上写过一篇编译装置的办法 地址,大家能够参考,因为 RabbitMQ 底层语言的起因可能略微麻烦点,那就没有方法了吗?如果你是先搞测试,再在我的项目中应用的话,那能够应用 docker 装置,2 行代码,如下
docker pull rabbitmqdocker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
1.2、实践筹备
RabbitMQ 的类型包含:direct、topic、fanout、headers、system(翻源码看到的)
这里次要通过 topic 来剖析,bindingkey 能够通过通配符 # 和 * 来匹配多个 路由键 (routingKey),
bindingkey 是绑定交换机(exchange)和队列(queue)的, 生产者(publisher)发消息的时候会携带 routingKey、exchange 和 音讯发送给 RabbitMQ,
连贯胜利后理论是组件 exchange 接管了生产者的音讯,而后通过 bindingkey 匹配 routingKey,决定送给哪个 queue,每个消费者都会有 queue,所以 queue接管到音讯 后就能够确保消费者接能够收到音讯了,最初消费者再生产。
再具体的内容能够查看大佬 erlie 的总结 地址
2、音讯确认
RabbitMQ 根底配置
pom
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --></parent> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
application.yml
spring: rabbitmq: host: 192.168.1.105 port: 5672 username: guest password: guest virtual-host: /# 交换机、队列和绑定键申明test: exchange: test.exchange one: queue: one.test.queue binding-key: one.test.key
consumer
@RabbitListener(bindings=@QueueBinding( //配置交换器 exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC), //配置路由键 key="${test.one.binding-key}", //配置队列名称 value=@Queue(value="${test.one.queue}",autoDelete="true")))public void test(String msg) { log.info("test 收到的音讯为:[{}]", msg); //业务代码...}
publisher
@Autowiredprivate RabbitTemplate rabbitTemplate;@Value("${test.exchange}")private String exchange;@Value("${test.one.binding-key}")private String routingKey;public void test() { rabbitTemplate.convertAndSend(exchange, routingKey, "test msg");}
通过下面默认的配置基本上就能够应用 RabbitMQ 了,然而这不是本篇的重点.咱们要晓得音讯发送和到生产的过程中呈现问题怎么办?这就须要咱们分段确认音讯是否接管胜利,如果失败了该如何解决,先想想能够怎么做。
咱们先捋一下思路,音讯发送给 RabbitMQ ,如果连贯 RabbitMQ 失败,则记录该音讯,如果连贯胜利然而 exchange 接管失败则记录下该音讯,如果 exchange 接管胜利然而 queue 接管 exchange 的音讯失败则记录下该音讯,音讯从生产者到 queue 有 3 个地位可能因网络抖动或其余起因呈现问题,那咱们在这三个地位记录下问题后,对立通过打算定时获取记录的音讯并且从新发送,如果重发三次还没有胜利则标记该记录为异样音讯。
2.1、生产者音讯确认和回调
这里分两步:
- 音讯确认指的是 RabbitMQ(exchange) 确认接管到了消费者发送的音讯
- 音讯回调指的是 queue 接管 exchange 的音讯失败,则回调通知 RabbitMQ 失败的音讯
2.1.1、音讯确认
开启配置
spring: rabbitmq: #publisher-confirms: true #已过期 publisher-confirm-type: correlated #开启生产者音讯确认
还有另外 2 种模式:
- none 值是禁用公布确认模式,是默认值;
- simple 值经测试有两种成果,其一成果和 correlated 值一样会触发回调办法,其二在公布音讯胜利后应用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 办法期待 broker 节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是 waitForConfirmsOrDie 办法如果返回false则会敞开 channel,则接下来无奈发送音讯到 broker。
当然还需新建一个类实现 RabbitTemplate.ConfirmCallback,重写办法 confirm,该办法有三个参数 correlationData、ack、cause,次要说下 ack,值为 true 示意 exchange 胜利接管到音讯,false 示意 exchange 接管音讯失败,这里 2 种后果能够别离解决,比方 false 能够把接管失败的音讯入库,而后通过定时器来解决,比拟懂的同学当初可能就有疑难了,correlationData 只能失去 msgId,基本没有具体的音讯,这里能够施展你杰出的想象力,能够通过对象封装失去,也能够通过存内存或者磁盘存储失去,办法总比艰难多。
@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData.getId(); if (ack) { log.info("胜利发送给 mq, msgId:[{}]", msgId); } else { log.error("发送给 mq 失败, msgId:[{}], 失败理由cause:[{}]", msgId, cause); //音讯从生产者没有到 exchange,那存库 saveToDB(msgId,...); }}
这里还差一步,就是原生的 rabbitTemplate 怎么晓得音讯确认时应用刚建的类呢,注入即可
rabbitTemplate.setConfirmCallback(刚建的类);如有疑难可看文末源码。
2.1.2、音讯回调
spring: rabbitmq: publisher-returns: true #开启生产者音讯回调
同上,须要新建类并实现 RabbitTemplate.ReturnCallback,并且重写办法 returnedMessage, 最初须要注入如下内容
rabbitTemplate.setReturnCallback(刚建的类);// 要想使 returnCallback 失效,必须设置为truerabbitTemplate.setMandatory(true);
@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); String msg = new String(message.getBody()); log.error("音讯回调 msgId:[{}], msg:[{}] 不能被正确路由,replyCode:[{}], replyText:[{}], exchange:[{}], routingKey:[{}]", msgId, msg, replyCode, replyText, exchange, routingKey); //音讯从 exchange 没有到 queue, 那存库 saveToDB(msgId, exchange, routingKey, msg);}
如果音讯发送时走到了回调办法 returnedMessage 中,阐明目前的音讯有问题是须要解决的,同上,入库。定时器来解决。
当然音讯的发送办法 rabbitTemplate.convertAndSend() 会多一个参数 correlationData
具体解决办法能够参考源码,这里只提供思路。
2.2、消费者音讯确认
消费者音讯确认是确认生产了队列中的音讯,如果呈现问题 RabbitMQ 会有重试机制,长时间失败则须要人工干预,这个和生产者的确认是先后关系,理论是没有关联关系的,说到这,说下我之前转的牛角尖,始终想寻找 exchange 如何确认消费者胜利生产音讯,然而无果,起初细想,RabbitMQ 应该设计的就是消费者和 queue 交互,没必要和 exchange 交互。如有大佬晓得 exchange 如何 ack 消费者生产音讯能够通知小弟,不胜感激。
圆规正转,上消费者代码
spring: rabbitmq: listener: simple: acknowledge-mode: manual #开启消费者音讯确认; none:主动确认、auto:依据状况确认
@RabbitListener(bindings=@QueueBinding( //配置交换器 exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC), //配置路由键 key="${test.one.binding-key}", //配置队列名称 value=@Queue(value="${test.one.queue}",autoDelete="true"))) public void test(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("test 收到的音讯为:[{}], msgId:[{}]", msg, message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")); try { //业务解决 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (IOException ioException) { ioException.printStackTrace(); } } }
次要说 3 个办法:
1、basicAck 是确认音讯,须要传递两个参数
- deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会建设起一个 Channel ,RabbitMQ 会用 basic.deliver 办法向消费者推送音讯,这个办法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条音讯的惟一标识 ID,是一个枯燥递增的正整数,delivery tag 的范畴仅限于 Channel
- multiple:为了缩小网络流量,手动确认能够被批处理,当该参数为 true 时,则能够一次性确认 delivery_tag 小于等于传入值的所有音讯,false则只确认传入值等于 delivery_tag 的音讯
2、basicNack 是回绝音讯,能够回绝多条,比 basicAck 多一个布尔值的参数,如果为 true,被 nack 后从新入队列而后从新生产生产;如果为 false 被 nack 就丢了。
3、basicReject 只能回绝一条音讯,reject 后音讯间接丢了。
总结
这里简略实现 RabbitMQ 音讯牢靠的形式是通过把音讯发送时呈现问题后间接入库,而后通过打算定时查问再从新发送给 RabbitMQ,如果 exchange 胜利 ack 后则标记为重发胜利,如果重发 3 次还是失败则标记异样,须要人工解决。
探讨
这种解决其实不算是最优的计划,技术上还能够有如下计划
- 生产者发消息时记录该条音讯,并设该记录 1 分钟后失效,留 1 分钟给exchange 确认并间接标记该音讯记录为胜利,而后打算工作定时扫无效且未确认的音讯,再发送给 RabbitMQ ,如果确认后则标记为胜利,否则 3 次后标记为失败。
- 还和小编写的计划相似,记录失败的音讯,然而定时工作获取到失败的音讯后,间接调用消费者的服务解决,不通过 RabbitMQ, 然而这就须要保护音讯和消费者服务的关系了,略微简单些。
最初心愿能够帮到看官,如果记录的不对烦请评论指出,一起探讨
https://github.com/charmsongo/springboot-samples/tree/master/springboot-rabbitmq-songo