乐趣区

关于算法:算法与数据结构体系课完结无密

download:算法与数据结构体系课 | 完结无密

RabbitMq 消息丢失原因及其解决打算

一、RabbitMQ 消息丢失原因

1.1、消费者丢失消息
消费者将数据发送到 rabbitmq 的时候,可能因为网络问题导致数据就在半路给搞丢了。

1. 使用事务(性能差)

​ RabbitMQ 客户端中与事务机制相干的方法有三个:channel.txSelect、channel.txCommit 和 channel.txRollback。channel.txSelect 用于将以后的信道设置成事务模式,channel.txCommit 用于提交事务,channel.txRollback 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,咱们便可能公布消息给 RabbitMQ 了,如果事务提交胜利,则消息肯定到达了 RabbitMQ 中,如果在事务提交执行之前因为 RabbitMQ 异样崩溃或者其余原因抛出异样,这个时候咱们便可能将其捕捉,进而通过执行 channel.txRollback 方法来实现事务回滚。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意分别。

​ 事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息胜利被 RabbitMQ 接收,事务才能提交胜利,否则便可在捕捉异样之后进行事务回滚,与此同时可能进行消息重发。然而使用事务机制会“吸干”RabbitMQ 的性能。

2. 发送回执确认(推荐)

​ 消费者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道下面公布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给消费者(蕴含消息的唯一 ID),这就使得消费者通晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后收回。RabbitMQ 回传给消费者的确认消息中的 deliveryTag 蕴含了确认消息的序号,此外 RabbitMQ 也可能设置 channel.basicAck 方法中的 multiple 参数,示意到这个序号之前的所有消息都已经失去了处理,注意辨别这里的确认和生产时候的确认之间的异同。

注意要点:

事务机制和 publisher confirm 机制两者是互斥的,不能共存。

事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含意是指消息被正确地发往至 RabbitMQ 的交换器,假如此交换器没有匹配的队列,那么消息也会丢失。

1.2、RabbitMQ 弄丢了数据 - 开启 RabbitMQ 的数据持久化
​ 为了防止 rabbitmq 自己弄丢了数据,这个你必须开启 rabbitmq 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 rabbitmq 自己挂了,复原之后会主动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq 还没持久化,自己就挂了,可能导致大量数据会丢失的,然而这个概率较小。

设置持久化有两个步骤,第一个是创建 queue 的时候将其设置为持久化的,这样就可能保障 rabbitmq 持久化 queue 的元数据,然而不会持久化 queue 里的数据;第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 rabbitmq 就会将消息持久化到磁盘下来。必须要同时设置这两个持久化才行,rabbitmq 哪怕是挂了,再次重启,也会从磁盘上重启复原 queue,复原这个 queue 里的数据。

而且持久化可能跟消费者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会告诉消费者 ack 了,所以哪怕是在持久化到磁盘之前,rabbitmq 挂了,数据丢了,消费者收不到 ack,你也是可能自己重发的。

若消费者那边的 confirm 机制未开启的情况下,哪怕是你给 rabbitmq 开启了持久化机制,也有一种可能,就是这个消息写到了 rabbitmq 中,然而还没来得及持久化到磁盘上,后果不巧,此时 rabbitmq 挂了,就会导致内存里的一点点数据会丢失。

1.3、生产端弄丢了数据
​ 为了保障消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可能指定 autoAck 参数,当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会主动把发送进来的消息置为确认,而后从内存(或者磁盘)中删除,而不管消费者是否真正地生产到了这些消息。

​ 采纳消息确认机制后,只需设置 autoAck 参数为 false,消费者就有足够的工夫处理消息(工作),不必担心处理消息过程中消费者过程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。

​ 这里咱们可能通过 RabbtiMQ 的 Web 治理平台上可能看到以后队列中的“Ready”状态和“Unacknowledged”状态的消息数,别离对应上文中的等待投递给消费者的消息数和已经投递给消费者然而未收到确认信号的消息数。

二、模拟生产端丢失数据
2.1、初始化工程
这里不做说了,可能看我《SpringBoot 集成 RabbitMQ》中的相干步骤。这里咱们间接写消费者和消费者。

2.2、编写消费者
咱们在之前的工程上进行相应的修改,通过 http 形式生产消息。咱们间接在 IndexController 中新增 simpleProducer 方法;

内容如下:

/**

  • @Author julyWhj
  • @Description IndexController$
  • @Date 2021/10/7 10:18 上午
    **/

@RestController
public class IndexController {

@Autowired
private SimpleProducer simpleProducer;
@GetMapping("/index")
public String index() {return "Hello RabbitMQ";}
@GetMapping("/producer")
public String simpleProducer() {for (int i = 0; i < 10; i++) {simpleProducer.sendOrderMessage(Simple.builder()
                .createTime(new Date())
                .name("JulyWhj")
                .age(i)
                .no("ID-000" + i)
                .phone("138XXXXXXXX")
                .build());
    }
    return "消息发送胜利";
}

}
复制代码
注意:

​ 这里咱们使用 Debug 形式启动,在消费者中加上断点,因为咱们的消费者和消费者在同一个服务中,生产的消息会被消费者间接生产掉。所以咱们加上断点,阻止消费者生产消息。

这里咱们看到,队列中消息条数为 0,出现了消息丢失的问题。

​ 因为这里咱们使用了消费者主动消息确认机制。当 autoAck 等于 true 时,RabbitMQ 会主动把发送进来的消息置为确认,而后从内存(或者磁盘)中删除,而不管消费者是否真正地生产到了这些消息。

2.4、使用手动消息确认机制
​ 咱们通过配置 spring.rabbitmq.listener.simple.acknowledge-mode=manual 属性,开启手动 ack 模式。默认配置 auto 主动确认。

在 application 配置文件中新增如下配置:

spring.rabbitmq.host=host
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

开启手动 ack 确认消息

spring.rabbitmq.listener.simple.acknowledge-mode=manual

三、RabbitMQ 消息可靠性保障策略
1、消费者开启消息确认机制

2、消息队列数据持久化

3、消费者手动 ack

4、消费者消息记录 + 定期弥补机制

5、服务幂等处理

6、消息挤压处理等

这里的内容咱们在后续文章中独自开展分析,这里只是给大家提供下需要注意的事项。包含 TTL 和 DLX 队列的使用等。

退出移动版