人生终将是场单人旅途,孤单之前是迷茫,孤单过后是成长。
楔子
本篇是音讯队列 RabbitMQ
的第四弹。
RabbitMQ
我曾经写了三篇了,根底的收发音讯和根底的概念我都曾经写了,学任何货色都是这样,先根底的上手能用,而后遇到问题再去解决,无奈了解就去深刻源码,随着工夫的积攒对这一门技术的了解也会随之进步。
根底操作曾经纯熟后,置信大家不可避免的会生出向那更高处攀登的心来,明天我就列举一些 RabbitMQ
比拟高级的用法,有些用失去有些用不上,然而肯定要有所理解,因为大部分状况咱们都是面向面试学习~
- 如何保障音讯的可靠性?
- 音讯队列如何进行限流?
-
如何设置延时队列进行延时生产?
祝有好播种,先赞后看,高兴有限。
本文代码: 码云地址 GitHub 地址
1. ???? 如何保障音讯的可靠性?
先来看看咱们的万年老图,从图上咱们大略能够看进去一个音讯会经验四个节点,只有保障这四个节点的可靠性能力保障整个零碎的可靠性。
- 生产者收回后保障达到了 MQ。
- MQ 收到音讯保障散发到了音讯对应的 Exchange。
- Exchange 散发音讯入队之后保障音讯的持久性。
- 消费者收到音讯之后保障音讯的正确生产。
经验了这四个保障,咱们能力保障音讯的可靠性,从而保障音讯不会失落。
2. ???? 生产者发送音讯到 MQ 失败
咱们的生产者发送音讯之后可能因为网络闪断等各种起因导致咱们的音讯并没有发送到 MQ 之中,然而这个时候咱们生产端又不晓得咱们的音讯没有收回去,这就会造成音讯的失落。
为了解决这个问题,RabbitMQ
引入了 事务机制 和发送方确认机制(publisher confirm),因为事务机制过于消耗性能所以个别不必,这里我着重讲述 发送方确认机制。
这个机制很好了解,就是 音讯发送到 MQ 那端之后,MQ 会回一个确认收到的音讯给咱们。
关上此性能须要配置,接下来我来演示一下配置:
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 关上音讯确认机制
publisher-confirm-type: correlated
咱们只须要在配置外面关上音讯确认即可 (true 是返回客户端,false是主动删除)。
生产者:
public void sendAndConfirm() {User user = new User();
log.info("Message content :" + user);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user,correlationData);
log.info("音讯发送结束。");
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("CorrelationData content :" + correlationData);
log.info("Ack status :" + ack);
log.info("Cause content :" + cause);
if(ack){log.info("音讯胜利发送,订单入库,更改订单状态");
}else{log.info("音讯发送失败:"+correlationData+", 出现异常:"+cause);
}
}
});
}
生产者代码里咱们看到又多了一个参数:CorrelationData
,这个参数是用来做音讯的惟一标识,同时咱们关上音讯确认之后须要对 rabbitTemplate
多设置一个setConfirmCallback
,参数是一个匿名类,咱们音讯确认胜利 or 失败之后的解决就是写在这个匿名类外面。
比方一条订单音讯,当音讯确认达到 MQ 确认之后再行入库或者批改订单的节点状态,如果音讯没有胜利达到 MQ 能够进行一次记录或者将订单状态批改。
Tip:音讯确认失败不只有音讯没发过来会触发,音讯发过来然而找不到对应的 Exchange,也会触发。
3. ????MQ 接管失败或者路由失败
生产者的发送音讯解决好了之后,咱们就能够来看看 MQ 端的解决,MQ 可能呈现两个问题:
- 音讯找不到对应的 Exchange。
- 找到了 Exchange 然而找不到对应的 Queue。
这两种状况都能够用 RabbitMQ
提供的 mandatory
参数来解决,它会设置音讯投递失败的策略,有两种策略:主动删除或返回到客户端。
咱们既然要做可靠性,当然是设置为返回到客户端。
配置:
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 关上音讯确认机制
publisher-confirm-type: correlated
# 关上音讯返回
publisher-returns: true
template:
mandatory: true
咱们只须要在配置外面关上音讯返回即可,template.mandatory: true
这一步不要少~
生产者:
public void sendAndReturn() {User user = new User();
log.info("Message content :" + user);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("被退回的音讯为:{}", message);
log.info("replyCode:{}", replyCode);
log.info("replyText:{}", replyText);
log.info("exchange:{}", exchange);
log.info("routingKey:{}", routingKey);
});
rabbitTemplate.convertAndSend("fail",user);
log.info("音讯发送结束。");
}
这里咱们能够拿到被退回音讯的所有信息,而后再进行解决,比方放到一个新的队列独自解决,路由失败个别都是配置问题了。
4. ???? 音讯入队之后 MQ 宕机
到这一步根本都是一些很小概率的问题了,比方 MQ 忽然宕机了或者被敞开了,这种问题就必须要对音讯做长久化,以便 MQ 重新启动之后音讯还能从新恢复过来。
音讯的长久化要做,然而不能只做音讯的长久化,还要做队列的长久化和 Exchange 的长久化。
@Bean
public DirectExchange directExchange() {
// 三个结构参数:name durable autoDelete
return new DirectExchange("directExchange", false, false);
}
@Bean
public Queue erduo() {
// 其三个参数:durable exclusive autoDelete
// 个别只设置一下长久化即可
return new Queue("erduo",true);
}
创立 Exchange 和队列时只有设置好长久化,发送的音讯默认就是长久化音讯。
设置长久化时肯定要将 Exchange 和队列都设置上长久化:
单单只设置 Exchange 长久化,重启之后队列会失落。单单只设置队列的长久化,重启之后 Exchange 会隐没,既而音讯也失落,所以如果不两个一块设置长久化将毫无意义。
Tip: 这些都是 MQ 宕机引起的问题,如果呈现服务器宕机或者磁盘损坏则下面的伎俩通通有效,必须引入镜像队列,做异地多活来抵挡这种不可抗因素。
5. ???? 消费者无奈失常生产
最初一步会出问题的中央就在消费者端了,不过这个解决问题的办法咱们之前的文章曾经说过了,就是消费者的音讯确认。
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 手动确认音讯
listener:
simple:
acknowledge-mode: manual
关上手动音讯确认之后,只有咱们这条音讯没有胜利生产,无论两头是呈现消费者宕机还是代码异样,只有连贯断开之后这条信息还没有被生产那么这条音讯就会被从新放入队列再次被生产。
当然这也可能会呈现反复生产的状况,不过在分布式系统中幂等性是肯定要做的,所以个别反复生产都会被接口的幂等给拦掉。
所谓幂等性就是:一个操作屡次执行产生的后果与一次执行产生的后果统一。
幂等性相干内容不在本章探讨范畴~ 所以我就不多做论述了。
6. ???? 音讯可靠性案例
这个图是我很早之前画的,是为了记录过后应用 RabbitMQ
做音讯可靠性的具体做法,这里我正好拿进去做个例子给大家看一看。
这个例子中的音讯是先入库的,而后生产者从 DB 外面拿到数据包装成音讯发给 MQ,通过消费者生产之后对 DB 数据的状态进行更改,而后从新入库。
这两头有任何步骤失败,数据的状态都是没有更新的,这时通过一个定时工作不停的去刷库,找到有问题的数据将它从新扔到生产者那里进行从新投递。
这个计划其实和网上的很多计划大同小异,根底的可靠性保障之后,定时工作做一个兜底进行一直的扫描,力求 100% 可靠性。
后记
越写越长,因为篇幅缘故限流和延时队列放到下一篇了,我会尽快收回来供大家浏览,讲真,我真的不是故意多水一篇的!!!
最初再给优狐打个广告,最近掘金在 GitHub 下面建设了一个开源打算 – open-source,旨在收录各种好玩的好用的开源库,如果大家有想要自荐或者分享的开源库都能够参加进去,为这个开源打算做一份奉献,同时这个开源库的 Start
也在稳步增长中,参加进去也能够减少本人我的项目的曝光度,两全其美。
同时这个开源库还有一个兄弟我的项目 – open-source-translation,旨在招募技术文章翻译志愿者进行技术文章的翻译工作,
争做最棒开源翻译,翻译业界高质量文稿,为技术人的成长献一份力。
最近这段时间事件挺多,优狐令我八月底之前降级到三级,所以各位读者的赞对我很重要,心愿大家可能高抬贵手,帮我一哈~
好了,以上就是本期的全部内容,感激你能看到这里,欢送对本文点赞珍藏与评论,???? 你们的每个点赞都是我创作的最大能源。
我是耳朵,一个始终想做常识输入的伪文艺程序员,咱们下期见。
本文代码:码云地址 GitHub 地址