乐趣区

关于java:RabbitMQ消息丢失-消息重复-消息积压的原因解决方案网上学不到的使用心得

前言

首先说一点,企业中最罕用的实际上既不是 RocketMQ,也不是 Kafka,而是 RabbitMQ。

RocketMQ 很弱小,但次要是阿里推广本人的云产品而开源进去的一款音讯队列,其实中小企业用 RocketMQ 的没有设想中那么多。

深层次的起因在于兔宝在中小企业遍及更早,禁受的考验也更久,很容易产生「回头客」,当初随 RabbitMQ 成长的一批人才现在大部分都已成为企业中的中坚骨干,技术选型亲睐 RabbitMQ 的几率就更高。

至于 Kafka,次要还是用在大数据和日志采集方面,除了一些公司有特定的需要会应用外,对音讯收发准确率要求较高的公司仍然是以 RabbitMQ 作为企业级音讯队列的首选。

工作这么多年我本身的感触是,RabbitMQ 经久不衰,除非后续其余消息中间件有不同凡响的应用体验,否则仍然是 RabbitMQ 的占有率更高。

所以筹备进入软件行业的小伙伴,我倡议有必要零碎的先把 RabbitMQ 学好,而后再学习其余消息中间件扩大视线,他们的原理大同小异,是能够举一反三的。

两个概念

RabbitMQ 防止音讯失落的办法次要是利用音讯确认机制和手动签收机制,所以有必要把这两个概念搞清楚。

1、音讯确认机制

次要是生产者应用的机制,用来确认音讯是否被胜利生产。

配置如下:

spring: 
    rabbitmq:
        address: 192.168.x.x:xxxx
        virtual-host: /
        username: guest
        password: guest
        connection-timeout: 5000
        publisher-confirms: true # 音讯胜利确认
        publisher-returns: true # 音讯失败确认
        template: 
            mandatory: true # 手动签收机制

这样,当你实现 RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback 这两个接口的办法后,就能够针对性地进行音讯确认的日志记录,之后做进一步的音讯发送弥补,以达到靠近 100% 投递的目标。

伪代码如下:

@Component
@Slf4j
public class RabbitMQSender implements RabbitTemplate.ConfirmCallback, 
RabbitTemplate.ReturnCallback {
    
    /**
     * 发送音讯
     */
    public void sendOrder(Order order) {rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        
        // 发送音讯
        rabbitTemplate.convertAndSend(xx, xx, order, xx);
    }
    
    
    /**
     * 胜利接管后的回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
    
        // 如果胜利接管了,这里能够对日志表的音讯收发状态做更新。// ....
        
    }
    
    
    /**
     * 失败后的回调
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    
        // 如果失败了,这里能够对日志表的音讯收发状态做更新,之后通过任务调度去弥补发送。// ....
        
    }
}

2、音讯签收机制

RabbitMQ 的音讯是主动签收的,你能够了解为快递签收了,那么这个快递的状态就从发送变为已签收,惟一的区别是快递公司会对物流轨迹有记录,而 MQ 签收后就从队列中删除了。

企业级开发中,RabbitMQ 咱们根本都开启手动签收形式,这样能够无效防止音讯的失落。

前文中曾经在生产者开启了手动签收机制,那么作为生产方,也要设置手动签收。

配置如下:

spring: 
    rabbitmq:
        address: 192.168.x.x:xxxx
        virtual-host: /
        username: guest
        password: guest
        connection-timeout: 5000
        listener: 
            simple: 
                concurrency: 5 # 并发数量
                max-concurrency: 10 # 最大并发数量
                acknowledge-mode: manual # 开启手动签收
                prefetch: 1 # 限度每次只生产一个(一个线程),下面配置 5,也就是能一次接管 5 个

生产监听时,手动签收就一行代码,伪代码如下:

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    // ....
    
    // 手动签收
    channel.basicAck(tag, false);
    
}

音讯失落

两个概念搞清楚后,就能够来学习音讯失落的问题和解决计划了。

1、呈现起因

音讯失落的起因无非有三种:

1)、音讯收回后,中途网络故障,服务器没收到;

2)、音讯收回后,服务器收到了,还没长久化,服务器宕机;

3)、音讯收回后,服务器收到了,生产方还未解决业务逻辑,服务却挂掉了,而音讯也主动签收,等于啥也没干。

这三种状况,(1) 和 (2)是因为生产方未开启音讯确认机制导致,(3)是因为生产方未开启手动签收机制导致。

2、解决方案

1)、生产方发送音讯时,要 try…catch,在 catch 中捕捉异样,并将 MQ 发送的要害内容记录到日志表中,日志表中要有音讯发送状态,若发送失败,由定时工作定期扫描重发并更新状态;

2)、生产方 publisher 必须要退出确认回调机制,确认胜利发送并签收的音讯,如果进入失败回调办法,就批改数据库音讯的状态,期待定时工作重发;

3)、生产方要开启手动签收 ACK 机制,生产胜利才将音讯移除,失败或因异常情况而尚未解决,就从新入队。

其实这就是后面论述两个概念时曾经讲过的内容,也是靠近 100% 音讯投递的企业级计划之一,次要目标就是为了解决音讯失落的问题。

音讯反复

1、呈现起因

音讯反复大体上有两种状况会呈现:

1)、音讯生产胜利,事务已提交,签收时后果服务器宕机或网络起因导致签收失败,音讯状态会由 unack 转变为 ready,从新发送给其余生产方;

2)、音讯生产失败,因为 retry 重试机制,从新入队又将音讯发送进来。

2、解决方案

网上大体上能收罗到的办法有三种:

1)、生产方业务接口做好幂等;

2)、消息日志表保留 MQ 发送时的惟一音讯 ID,生产方能够依据这个惟一 ID 进行判断防止音讯反复;

3)、生产方的 Message 对象有个 getRedelivered()办法返回 Boolean,为 TRUE 就示意反复发送过去的。

我这里只举荐第一种,业务办法幂等这是最间接无效的形式,(2)还要和数据库产生交互,(3)有可能导致第一次生产失败但第二次生产胜利的状况被砍掉。

音讯积压

1、呈现起因

音讯积压呈现的场景个别有两种:

1)、生产方的服务挂掉,导致始终无奈生产音讯;

2)、生产方的服务节点太少,导致生产能力有余,从而呈现积压,这种状况极可能就是生产方的流量过大导致。

2、解决方案

1)、既然生产能力有余,那就扩大更多生产节点,晋升生产能力;

2)、建设专门的队列生产服务,将音讯批量取出并长久化,之后再缓缓生产。

(1)就是最间接的形式,也是音讯积压最罕用的解决方案,但有些企业思考到服务器老本压力,会抉择第(2)种计划进行曲折,先通过一个独立服务把要生产的音讯存起来,比方存到数据库,之后再缓缓解决这些音讯即可。

应用心得

这里独自讲一下自己在工作中应用 RabbitMQ 的一些心得,心愿能有所帮忙。

1)、音讯失落、音讯反复、音讯积压三个问题中,实际上次要解决的还是音讯失落,因为大部分公司遇不到音讯积压的场景,而略微有水准的公司外围业务都会解决幂等问题,所以简直不存在音讯反复的可能;

2)、音讯失落的最常见企业级计划之一就是定时工作弥补,因为不论是 SOA 还是微服务的架构,必然会有分布式任务调度的存在,天然也就成为 MQ 最间接的弥补形式,如果 MQ 肯定要实现 100% 投递,这种是最广泛的计划。但我实际上不举荐中小企业应用该计划,因为凭空减少保护老本,而且没有肯定规模的我的项目齐全没必要,大家都小看了 RabbitMQ 自身的性能,比方咱们公司,撑持一个三甲医院,也就是三台 8 核 16G 服务器的集群,上线至今 3 年毫无压力;

3)、不要科学网上和培训机构解说的生产者音讯确认机制,也就是后面两个概念中讲到的 ConfirmCallback 和 ReturnCallback,这种机制非常升高 MQ 性能,咱们团队曾遇到过一次流量高峰期带来的 MQ 传输及生产性能大幅升高的状况,起初发现是音讯确认机制导致,敞开后立马恢复正常,从此以后都不再应用这种机制,MQ 运行非常顺畅。同时咱们会建设后盾治理实现人工弥补,通过辨认业务状态判断生产方是否解决了业务逻辑,毕竟这种状况都是多数,性能和运维老本,在这一块咱们抉择了性能;

4)、我工作这些年应用 RabbitMQ 没见过主动签收形式,肯定是开启手动签收;

5)、手动签收形式你在网上看到的教程简直都是解决完业务逻辑之后再手动签收,但实际上这种用法是不迷信的,在分布式的架构中,MQ 用来解耦和转发是十分常见的,如果是领取业务,往往在回调告诉中通过 MQ 转发到其余服务,其余服务如果业务解决不胜利,那么手动签收也不执行,这个音讯又会入队发给其余消费者,这样就可能在流量洪峰阶段因为偶尔的业务解决失败造成梗塞,甚至题目所讲的三种问题同时呈现,这样就会得失相当。

不迷信的用法:在解决完业务逻辑后再手动签收,否则不签收,就好比客人进店了你得买货色,否则不让走。

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    // 解决业务
    doBusiness(order);
    
    // 手动签收
    channel.basicAck(tag, false);
    
}

迷信的用法:不管业务逻辑是否解决胜利,最终都要将音讯手动签收,MQ 的使命不是保障客人进店了必须生产,不生产就不让走,而是客人能进来就行,哪怕是轻易看看也算工作实现。

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    try {
        // 解决业务
        doBusiness(order);
    } catch(Exception ex) {// 记录日志,通过后盾治理或其余形式人工解决失败的业务。} finally {
        // 手动签收
        channel.basicAck(tag, false);
    }
    
}

可能有人会问你这样不是和主动签收没区别吗,NO,你要晓得如果主动签收,呈现音讯失落你连记录日志的可能都没有。

另外,为什么肯定要这么做,因为 MQ 是中间件,自身就是辅助工具,就是一个滴滴司机,保障给你送到顺便说个再见就行,没必要还下车给你搬东西。

如果强加给 MQ 过多压力,只会造成自身业务的畸形。咱们应用 MQ 的目标就是解耦和转发,不再做多余的事件,保障 MQ 自身是晦涩的、职责繁多的即可。

总结

本篇次要讲了 RabbitMQ 的三种常见问题及解决方案,同时分享了一些作者自己工作中应用的心得,我想网上是很难找到的,如果哪一天用到了,无妨再关上看看,兴许能防止一些生产环境可能呈现的问题。

我总结下来就是三点:

1)、音讯 100% 投递会减少运维老本,中小企业视状况应用,非必要不应用;

2)、音讯确认机制影响性能,非必要不应用;

3)、消费者先保障音讯能签收,业务解决失败能够人工弥补。

工作中怕的永远不是一个技术不会应用,而是遇到问题不晓得有什么解决思路。

分享

多年工作及学习过程中在云笔记中记录了很多内容,我空闲之余都做了下整顿,本篇也是其中之一,有感兴趣的敌人能够在评论中获取,什么时候用到了打开说不定就能节俭很多工夫。


原创文章纯手打,感觉有一滴滴帮忙就请举手之劳点个 吧~

继续分享工作中的实在教训和心得体会,喜爱的话就点个 关注 吧~

更多最新技术文章可关注 GZH:【Java 分享客栈】

退出移动版