共计 1880 个字符,预计需要花费 5 分钟才能阅读完成。
如何保障应用 rabbitmq 时,音讯不失落?
其实这个问题实用于任何 mq,尽管不同的 mq 具体操作上有区别,但大体上须要从三个方面思考:生产者发送、broker 存储和消费者接管均保障音讯牢靠。
咱们来看看 rabbitmq 是怎么做到这三点的。
一、生产者音讯牢靠
生产者保障音讯牢靠有两种形式,事务
和confirm 机制
。
1. 事务管制
rabbitmq 容许通过事务的形式发送音讯。
要害代码如下:
// == 将信道设置成事务模式
channel.txSelect
try {// 这里发送音讯} catch (Exception e) {
// == 事务回滚
channel.txRollback
}
// == 提交事务
channel.txCommit
如果呈现问题能够回滚事务,之后做重发之类的操作。
问题在于事务是同步的,后续的操作都会期待事务执行而阻塞在这里,影响性能。
2.confirm 机制
-
单条 confirm
// == 信道开启音讯确认 channel.confirmSelect(); // 单条发送 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); if (channel.waitForConfirms()){// 音讯发送胜利}else {// 音讯发送失败}
每条数据通过
channel.waitForConfirms()
获取发送后果 -
批量 confirm
// == 信道开启音讯确认 channel.confirmSelect(); // 批量发送 for (int i = 0; i < 10; i++) {channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } // 直到所有信息都公布,只有有一个未确认就会 IOException channel.waitForConfirmsOrDie();
一个批次通过
channel.waitForConfirmsOrDie()
获取发送后果。
批次发送失败,须要整个批次重发;生产端须要做好反复音讯的解决,个别在业务层面做好幂等。
-
异步 confirm
前两种都是同步的确认,异步确认能将效率能最大化。channel.addConfirmListener(new ConfirmListener() { // == 音讯失败解决 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //deliveryTag;惟一音讯标签 //multiple:是否批量 System.err.println("-------no ack!-----------"); } // == 音讯胜利解决 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.err.println("-------ack!-----------"); } });
无论胜利还是失败,都会回调 listener 中的一个办法。
二、broker 音讯牢靠
长久化
首先要开启长久化,分为队列长久化和音讯长久化。
- 队列长久化:
在创立队列时将 channel.queueDeclare()第二个参数改为 true。 - 音讯长久化:
在应用信道发送音讯时 channel.basicPublish()将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN 示意长久化音讯。
同时开启以上两种长久化机制,这样音讯达到 broker 时,rabbitmq 会做落盘操作。
但 rabbitmq 的音讯落盘也有很小的工夫距离,有可能刚写入零碎缓存服务器就挂了。
因而还要开启 rabbitmq 的镜像集群,在写入主的同时也将数据同步到从服务。
三、消费者音讯牢靠
音讯确认模式有:
- AcknowledgeMode.NONE:主动确认(默认)。
- AcknowledgeMode.AUTO:依据状况确认。
- AcknowledgeMode.MANUAL:手动确认。
手动 ack
消费者胜利接管音讯时,默认会主动向 broker 发送 ack,示意音讯接管。
咱们须要改为手动 ack 模式。
一张图总结
通过上述探讨,音讯可靠性须要就义肯定的性能,从而升高 mq 的吞吐量,因而须要在可靠性和吞吐量之间寻求一个均衡。
考虑一下:以后业务场景是否对音讯的可靠性有这么高的要求,同时实时性也要保障呢?如果实时性要求不高,是否通过生产、生产两端核查的形式,达到可靠性要求呢?
附录
P6-P7 常识合辑