唠个嗑
网络上收罗了屡次想晓得 RabbitMQ 事实业务种怎么实现音讯的可靠性的,然而大多都不太现实,站在各位大佬伟人的肩膀上钻研了一段时间,我也整顿了一套简略可行性的计划,包含音讯异样解决。这篇文章想次要讲一些业务解决计划,我的项目中退出 RabbitMQ 中间件很简略,然而依据具体业务实现音讯的可靠性,这个须要多加思考。当然上面也会通过测试代码来剖析,文末也会附上源码地址。
1、筹备
1.1、环境筹备
之前博客上写过一篇编译装置的办法 地址,大家能够参考,因为 RabbitMQ 底层语言的起因可能略微麻烦点,那就没有方法了吗?如果你是先搞测试,再在我的项目中应用的话,那能够应用 docker 装置,2 行代码,如下
docker pull rabbitmq
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
1.2、实践筹备
RabbitMQ 的类型包含:direct、topic、fanout、headers、system(翻源码看到的)
这里次要通过 topic 来剖析,bindingkey 能够通过通配符 # 和 * 来匹配多个 路由键(routingKey),
bindingkey 是绑定交换机(exchange)和队列(queue)的,生产者(publisher)发消息的时候会携带 routingKey、exchange 和 音讯发送给 RabbitMQ,
连贯胜利后理论是组件 exchange 接管了生产者的音讯,而后通过 bindingkey 匹配 routingKey,决定送给哪个 queue,每个消费者都会有 queue,所以 queue 接管到音讯 后就能够确保消费者接能够收到音讯了,最初消费者再生产。
再具体的内容能够查看大佬 erlie 的总结 地址
2、音讯确认
RabbitMQ 根底配置
pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: 192.168.1.105
port: 5672
username: guest
password: guest
virtual-host: /
# 交换机、队列和绑定键申明
test:
exchange: test.exchange
one:
queue: one.test.queue
binding-key: one.test.key
consumer
@RabbitListener(bindings=@QueueBinding(
// 配置交换器
exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
// 配置路由键
key="${test.one.binding-key}",
// 配置队列名称
value=@Queue(value="${test.one.queue}",autoDelete="true")
))
public void test(String msg) {log.info("test 收到的音讯为:[{}]", msg);
// 业务代码...
}
publisher
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${test.exchange}")
private String exchange;
@Value("${test.one.binding-key}")
private String routingKey;
public void test() {rabbitTemplate.convertAndSend(exchange, routingKey, "test msg");
}
通过下面默认的配置基本上就能够应用 RabbitMQ 了, 然而这不是本篇的重点. 咱们要晓得音讯发送和到生产的过程中呈现问题怎么办?这就须要咱们分段确认音讯是否接管胜利,如果失败了该如何解决,先想想能够怎么做。
咱们先捋一下思路,音讯发送给 RabbitMQ,如果连贯 RabbitMQ 失败,则记录该音讯,如果连贯胜利然而 exchange 接管失败则记录下该音讯,如果 exchange 接管胜利然而 queue 接管 exchange 的音讯失败则记录下该音讯,音讯从生产者到 queue 有 3 个地位可能因网络抖动或其余起因呈现问题,那咱们在这三个地位记录下问题后,对立通过打算定时获取记录的音讯并且从新发送,如果重发三次还没有胜利则标记该记录为异样音讯。
2.1、生产者音讯确认和回调
这里分两步:
- 音讯确认指的是 RabbitMQ(exchange)确认接管到了消费者发送的音讯
- 音讯回调指的是 queue 接管 exchange 的音讯失败,则回调通知 RabbitMQ 失败的音讯
2.1.1、音讯确认
开启配置
spring:
rabbitmq:
#publisher-confirms: true #已过期
publisher-confirm-type: correlated #开启生产者音讯确认
还有另外 2 种模式:
- none 值是禁用公布确认模式,是默认值;
- simple 值经测试有两种成果,其一成果和 correlated 值一样会触发回调办法,其二在公布音讯胜利后应用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 办法期待 broker 节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是 waitForConfirmsOrDie 办法如果返回 false 则会敞开 channel,则接下来无奈发送音讯到 broker。
当然还需新建一个类实现 RabbitTemplate.ConfirmCallback,重写办法 confirm,该办法有三个参数 correlationData、ack、cause,次要说下 ack,值为 true 示意 exchange 胜利接管到音讯,false 示意 exchange 接管音讯失败,这里 2 种后果能够别离解决,比方 false 能够把接管失败的音讯入库,而后通过定时器来解决,比拟懂的同学当初可能就有疑难了,correlationData 只能失去 msgId, 基本没有具体的音讯,这里能够施展你杰出的想象力,能够通过对象封装失去,也能够通过存内存或者磁盘存储失去,办法总比艰难多。
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {String msgId = correlationData.getId();
if (ack) {log.info("胜利发送给 mq, msgId:[{}]", msgId);
} else {log.error("发送给 mq 失败, msgId:[{}], 失败理由 cause:[{}]", msgId, cause);
// 音讯从生产者没有到 exchange,那存库
saveToDB(msgId,...);
}
}
这里还差一步,就是原生的 rabbitTemplate 怎么晓得音讯确认时应用刚建的类呢,注入即可
rabbitTemplate.setConfirmCallback(刚建的类); 如有疑难可看文末源码。
2.1.2、音讯回调
spring:
rabbitmq:
publisher-returns: true #开启生产者音讯回调
同上,须要新建类并实现 RabbitTemplate.ReturnCallback, 并且重写办法 returnedMessage,最初须要注入如下内容
rabbitTemplate.setReturnCallback(刚建的类);
// 要想使 returnCallback 失效,必须设置为 true
rabbitTemplate.setMandatory(true);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
String msg = new String(message.getBody());
log.error("音讯回调 msgId:[{}], msg:[{}] 不能被正确路由,replyCode:[{}], replyText:[{}], exchange:[{}], routingKey:[{}]", msgId, msg, replyCode, replyText, exchange, routingKey);
// 音讯从 exchange 没有到 queue,那存库
saveToDB(msgId, exchange, routingKey, msg);
}
如果音讯发送时走到了回调办法 returnedMessage 中,阐明目前的音讯有问题是须要解决的,同上,入库。定时器来解决。
当然音讯的发送办法 rabbitTemplate.convertAndSend() 会多一个参数 correlationData
具体解决办法能够参考源码,这里只提供思路。
2.2、消费者音讯确认
消费者音讯确认是确认生产了队列中的音讯,如果呈现问题 RabbitMQ 会有重试机制,长时间失败则须要人工干预,这个和生产者的确认是先后关系,理论是没有关联关系的,说到这,说下我之前转的牛角尖,始终想寻找 exchange 如何确认消费者胜利生产音讯,然而无果,起初细想,RabbitMQ 应该设计的就是消费者和 queue 交互,没必要和 exchange 交互。如有大佬晓得 exchange 如何 ack 消费者生产音讯能够通知小弟,不胜感激。
圆规正转,上消费者代码
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #开启消费者音讯确认; none:主动确认、auto:依据状况确认
@RabbitListener(bindings=@QueueBinding(
// 配置交换器
exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
// 配置路由键
key="${test.one.binding-key}",
// 配置队列名称
value=@Queue(value="${test.one.queue}",autoDelete="true")
))
public void test(Message message, Channel channel) {String msg = new String(message.getBody());
log.info("test 收到的音讯为:[{}], msgId:[{}]", msg, message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
try {
// 业务解决
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {e.printStackTrace();
try {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ioException) {ioException.printStackTrace();
}
}
}
次要说 3 个办法:
1、basicAck 是确认音讯,须要传递两个参数
- deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会建设起一个 Channel,RabbitMQ 会用 basic.deliver 办法向消费者推送音讯,这个办法携带了一个 delivery tag,它代表了 RabbitMQ 向该 Channel 投递的这条音讯的惟一标识 ID,是一个枯燥递增的正整数,delivery tag 的范畴仅限于 Channel
- multiple:为了缩小网络流量,手动确认能够被批处理,当该参数为 true 时,则能够一次性确认 delivery_tag 小于等于传入值的所有音讯,false 则只确认传入值等于 delivery_tag 的音讯
2、basicNack 是回绝音讯,能够回绝多条,比 basicAck 多一个布尔值的参数,如果为 true,被 nack 后从新入队列而后从新生产生产;如果为 false 被 nack 就丢了。
3、basicReject 只能回绝一条音讯,reject 后音讯间接丢了。
总结
这里简略实现 RabbitMQ 音讯牢靠的形式是通过把音讯发送时呈现问题后间接入库,而后通过打算定时查问再从新发送给 RabbitMQ, 如果 exchange 胜利 ack 后则标记为重发胜利,如果重发 3 次还是失败则标记异样,须要人工解决。
探讨
这种解决其实不算是最优的计划,技术上还能够有如下计划
- 生产者发消息时记录该条音讯,并设该记录 1 分钟后失效,留 1 分钟给 exchange 确认并间接标记该音讯记录为胜利,而后打算工作定时扫无效且未确认的音讯,再发送给 RabbitMQ,如果确认后则标记为胜利,否则 3 次后标记为失败。
- 还和小编写的计划相似,记录失败的音讯,然而定时工作获取到失败的音讯后,间接调用消费者的服务解决,不通过 RabbitMQ,然而这就须要保护音讯和消费者服务的关系了,略微简单些。
最初心愿能够帮到看官,如果记录的不对烦请评论指出,一起探讨
https://github.com/charmsongo/springboot-samples/tree/master/springboot-rabbitmq-songo