乐趣区

RabbitMQ系列之怎么确保消息不丢失

1. 上一篇介绍了在 SpringBoot 中怎么使用 RabbitMQ 来实现 RPC 功能,分享了可能踩到的坑及解决办法;
2. 本篇主要介绍消息可能会存在丢失的场景及解决思路,基本上涵盖了可能会遇到的所有的场景。

一. 通过设置持久化

持久化可以提高 RabbitMQ 的可靠性,以防止在异常情况(比如:重启、关机、宕机等)下的数据丢失。RabbitMQ 持久化分为三部分:交换机的持久化、队列的持久化、消息的持久化。

1. 什么是交换机持久化

交换机持久化是指将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或执行代码去创建交换机了,交换机会自动被创建,相当于一直存在。

2. 怎么将交换机持久化

在创建交换机的时候将 durable 参数设置为 true 即可。比如,我声明一个类型为 direct 的交换机:

/**

 * 设置交换机,类型为 direct

 * @return DirectExchange

 */

@Bean

DirectExchange myExchange() {return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false);

}
说明
  • 通过将 durable 参数设置为true,则交换机的元数据会被存储在磁盘上,对于一个长期使用的交换机来说,建议将其设置为持久化。

3. 队列持久化

如果不将队列设置为持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,数据也会丢失。队列都没有了,消息也找不到地方存储了。

4. 怎么将队列持久化

同样,在创建队列的时候将 durable 参数设置为 true 即可。

/**
 * 创建队列
 */
@Bean
public Queue myQueue() {return new Queue(QueueConstants.RPC_QUEUE1);
}
说明
  • durable 参数默认为 false,只针对当前连接有效,当 RabbitMQ 服务重启后数据会丢失;
  • 队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失;
  • 如果要确保消息不会丢失,就需要设置消息的持久化。

5. 消息持久化

RabbitMQ 的消息是依附于队列存在的,所以要想消息持久化,那么前提是队列也必须设置持久化。

6. 怎么将消息持久化

在创建消息的时候,添加一个持久化消息的属性(将 delivery_mode 设置为 2)。

SpringBoot 中怎么设置消息持久化

在 SpringBoot 中使用 rabbitTemplate 发送的消息默认就是持久化的,因为默认已经设置为 delivery_mode = 2,下面我们通过查看源码来验证一下。

源码分析

1> sendAndReceive
生产者发送消息的时候会使用 rabbitTemplate 的 sendAndReceive 接口来发送消息:

@Nullable
    public Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException {return this.sendAndReceive(exchange, routingKey, message, (CorrelationData)null);
    }

2> Message
第三个参数 Message 有一个 MessageProperties 属性

打开 Message.class:

public class Message implements Serializable {
    private static final long serialVersionUID = -7177590352110605597L;
    private static final String ENCODING = Charset.defaultCharset().name();
    private static final Set<String> whiteListPatterns = new LinkedHashSet(Arrays.asList("java.util.*", "java.lang.*"));
    private final MessageProperties messageProperties;
    private final byte[] body;

3> MessageProperties
打开 MessageProperties.class:

static {
        DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
        DEFAULT_PRIORITY = 0;
    }

MessageDeliveryMode.class:

public enum MessageDeliveryMode {
    NON_PERSISTENT,
    PERSISTENT;

    private MessageDeliveryMode() {}

    public static int toInt(MessageDeliveryMode mode) {switch(mode) {
        case NON_PERSISTENT:
            return 1;
        case PERSISTENT:
            return 2;
        default:
            return -1;
        }
    }
结论

通过源码查看在 SpringBoot 中使用 rabbimqTemplate 发送的消息默认就是持久化的消息。

7. 总结

  1. 设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧会存在;
  2. 仅设置队列持久化,重启之后消息会丢失;
  3. 仅设置消息持久化,重启之后队列会消失,因此消息也就丢失了,所以只设置消息持久化而不设置队列持久化是没有意义的;
  4. 将所有的消息都设置为持久化(写入磁盘的速度比写入内存的速度慢的多),可能会影响 RabbitMQ 的性能,对于可靠性不是那么高的消息可以不采用持久化来提高 RabbitMQ 的吞吐量。

二. 生产者开启发送确认

场景

1. 不知道生产者发送的消息究竟是否已经到达 RabbitMQ Server;> 2. 不知道生产者发送的消息是否已经成功的分配到队列中去。

解决办法

开启消息发送确认,通过 ConfirmCallback 接口 和 ReturnCallback 接口 来保障。

备注
具体的操作方式可以参考 RabbitMQ 系列之消息确认机制 这篇文章。

三. 消费者开启消息确认(ACK)

场景

消费者收到消息还没来得及处理服务就宕机了。

解决办法

消费端开启消息确认(ACK),将消息设置为手动确认:

# 开启 ACK(消费者接收到消息时手动确认)spring.rabbitmq.listener.simple.acknowledge-mode=manual

这样虽然服务宕机,但是在重启之后,消费者仍然会消费到该条数据。
备注
具体的操作方式可以参考 RabbitMQ 系列之消息确认机制 这篇文章。

四. 使用 RabbitMQ 的镜集群像模式进行部署 #### 场景

持久化的消息成功存入 RabbitMQ 之后,如果在存入磁盘的这个过程中 RabbitMQ 服务节点宕机、异常重启等,消息还没来得及存入磁盘。

解决办法

可以使用 RabbitMQ 的镜集群像模式进行部署,如果主节点在这个特殊的时间段内挂掉了,会自动切换到从节点,这样就保证了高可用性,除非整个集群都挂掉。

备注
了解更多 关于 RabbitMQ 镜像集群模式 可以参考 RabbitMQ 系列之部署模式 这篇文章。

五. 消息补偿机制

场景

比如设置为持久化的消息,在保存到磁盘的过程中,当前队列节点挂了,存储节点的磁盘也挂了。

解决办法

  1. 由于系统功能复杂,加上网络不确定性太多,所以消息补偿机制需要建立在系统记录了详细的日志,比如消息发送日志,消息接收日志,存入数据库日志等的前提下;
  2. 通过手动触发或者定时扫描,从这些日志中提取出符合消息补偿要求的数据,进行消息补偿。

关注微信公众号

欢迎大家关注我的微信公众号阅读更多文章:

退出移动版