乐趣区

关于rabbitmq:如何保证-RabbitMQ-的消息可靠性

前言

我的项目开发中常常会应用音讯队列来实现异步解决、利用解耦、流量管制等性能。尽管音讯队列的呈现解决了一些场景下的问题,然而同时也引出了一些问题,其中应用音讯队列时如何保障音讯的可靠性就是一个常见的问题。如果在我的项目中遇到须要保障音讯肯定被生产的场景时,如何保障音讯不失落,如何保障音讯的可靠性?

最近在写我的项目时应用 RabbitMQ 音讯队列中间件时也遇到了须要保障音讯可靠性的场景,所以将如何放弃音讯可靠性的计划记录下来,上面将解说一下如何保障 RabbitMQ 的音讯可靠性。

如何保障 RabbitMQ 的音讯可靠性

先放一张 RabbitMQ 是如何消息传递的图:

生产者 Producer 将音讯发送到指定的 交换机 Exchange,交换机依据路由规定路由到绑定的 队列 Queue 中,最初和消费者建设连贯后,将音讯推送给 消费者 Consumer

那么音讯会在哪些环节失落呢,列出可能呈现音讯失落的场景有:

  • 生产者将音讯发送到 RabbitMQ Server 异样:可能因为网络问题造成 RabbitMQ 服务端无奈收到音讯,造成生产者发送音讯失落场景。
  • RabbitMQ Server 中音讯在交换机中无奈路由到指定队列:可能因为代码层面或配置层面谬误导致音讯路由到指定队列失败,造成生产者发送音讯失落场景。
  • RabbitMQ Server 中存储的音讯失落:可能因为 RabbitMQ Server 宕机导致音讯未齐全长久化或队列失落导致音讯失落等长久化问题,造成 RabbitMQ Server 存储的音讯失落场景。
  • 消费者生产音讯异样:可能在消费者接管到音讯后,还没来得及生产音讯,消费者宕机或故障等问题,造成消费者无奈生产音讯导致音讯失落的场景。

以上就是 RabbitMQ 可能呈现音讯失落的场景,接下来将顺次解说如何防止这些音讯失落的场景问题。


因为在我的项目开发中应用的是 RabbitMQ,所以应用 Spring Boot 集成的 AMQP 依赖即可应用 RabbitMQ。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1. 保障生产者发送音讯到 RabbitMQ Server

为了防止因为网络故障或闪断问题导致音讯无奈失常发送到 RabbitMQ Server 的状况,RabbitMQ 提供了两种计划让生产者能够感知到音讯是否正确无误的发送到 RabbitMQ Server 中,这两种计划别离是 事务机制 发送方确认机制。上面别离介绍一下这两种机制如何实现。

事务机制

先说配置和应用:

  1. 配置类中配置事务管理器

    /**
     * 音讯队列配置类
     *
     * @author 单程车票
     */
    @Configuration
    public class RabbitMQConfig {
        /**
         * 配置事务管理器
         */
        @Bean
        public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
        }
    }
  2. 通过增加事务注解 + 开启事务实现事务机制

    /**
     * 音讯业务实现类
     *
     * @author 单程车票
     */
    @Service
    public class RabbitMQServiceImpl {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Transactional // 事务注解
        public void sendMessage() {
            // 开启事务
            rabbitTemplate.setChannelTransacted(true);
            // 发送音讯
            rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
        }
    }

通过下面的配置即可实现事务机制,执行流程为:在生产者发送音讯之前,开启事务,而后发送音讯,如果音讯发送至 RabbitMQ Server 失败后,进行事务回滚,从新发送。如果 RabbitMQ Server 接管到音讯,则提交事务。

能够发现事务机制其实是同步操作,存在阻塞生产者的状况直到 RabbitMQ Server 应答,这样其实会很大水平上 升高发送音讯的性能 ,所以 个别不会应用事务机制来保障生产者的音讯可靠性,而是应用发送方确认机制。


发送方确认机制

先说配置和应用:

  1. 配置文件

    spring:
      rabbitmq:
        publisher-confirm-type: correlated  # 开启发送方确认机制

    配置属性有三种别离为:

    • none:示意禁用发送方确认机制
    • correlated:示意开启发送方确认机制
    • simple:示意开启发送方确认机制,并反对 waitForConfirms()waitForConfirmsOrDie() 的调用。

    这里个别应用 correlated 开启发送方确认机制即可,至于 simplewaitForConfirms() 办法调用是指 串行确认办法,即生产者发送音讯后,调用该办法期待 RabbitMQ Server 确认,如果返回 false 或超时未返回则进行音讯重传。因为串行性能较差,这里个别都是用异步 confirm 模式。

  2. 通过调用 setConfirmCallback() 实现异步 confirm 模式感知音讯发送后果

    /**
     * 音讯业务实现类
     *
     * @author 单程车票
     */
    @Service
    public class RabbitMQServiceImpl {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void sendMessage() {
            // 发送音讯
            rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
            // 设置音讯确认回调办法
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 * MQ 确认回调办法
                 * @param correlationData 音讯的惟一标识
                 * @param ack 音讯是否胜利收到
                 * @param cause 失败起因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    // 记录日志
                    log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
                    if (!ack) {
                        // 出错解决
                        ...
                    }
                }
            });
        }
    }

生产者发送音讯后通过调用 setConfirmCallback() 能够将信道设置为 confirm 模式,所有音讯会被指派一个音讯惟一标识,当音讯被发送到 RabbitMQ Server 后,Server 确认音讯后生产者会回调设置的办法,从而实现生产者能够感知到音讯是否正确无误的投递,从而实现发送方确认机制。并且该模式是异步的,发送音讯的吞吐量会失去很大晋升。

下面就是发送放确认机制的配置和应用,应用这种机制能够保障生产者的音讯可靠性投递,并且性能较好。


2. 保障音讯能从交换机路由到指定队列

在确保生产者能将音讯投递到交换机的前提下,RabbitMQ 同样提供了音讯投递失败的策略配置来确保音讯的可靠性,接下来通过配置来介绍一下音讯投递失败的策略。

先说配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 开启发送方确认机制
    publisher-returns: true   # 开启音讯返回
    template:
      mandatory: true     # 音讯投递失败返回客户端

mandatory 分为 true 失败后返回客户端 false 失败后主动删除 两种策略。显然设置为 false 无奈保障音讯的可靠性。

到这里的配置是能够保障生产者发送音讯的可靠性投递。

通过调用 setReturnCallback() 办法设置路由失败后的回调办法:

/**
 * 音讯业务实现类
 *
 * @author 单程车票
 */
@Service
public class RabbitMQServiceImpl {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage() {
        // 发送音讯
        rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
        // 设置音讯确认回调办法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * MQ 确认回调办法
             * @param correlationData 音讯的惟一标识
             * @param ack 音讯是否胜利收到
             * @param cause 失败起因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // 记录日志
                log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
                if (!ack) {
                    // 出错解决
                    ...
                }
            }
        });

        // 设置路由失败回调办法
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * MQ 没有将音讯投递给指定的队列回调办法
             * @param message 投递失败的音讯详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 音讯发给哪个交换机
             * @param routingKey 音讯用哪个路邮键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                // 记录日志
                log.info("Fail Message["+message+"]==>replyCode["+replyCode+"]" +"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
                // 出错解决
                ...
            }
        });
    }
}

通过调用 setReturnCallback() 办法即可实现当交换机路由到指定队列失败后回调办法,拿到被退回的音讯信息,进行相应的解决如记录日志或重传等等。


3. 保障音讯在 RabbitMQ Server 中的长久化

对于音讯的长久化,只须要在发送音讯时将音讯长久化,并且在创立交换机和队列时也保障长久化即可。

配置如下:

/**
 * 音讯队列
 */
@Bean
public Queue queue() {
    // 四个参数:name(队列名)、durable(长久化)、exclusive(独占)、autoDelete(主动删除)return new Queue(MESSAGE_QUEUE, true);
}

/**
 * 间接交换机
 */
@Bean
public DirectExchange exchange() {
    // 四个参数:name(交换机名)、durable(长久化)、autoDelete(主动删除)、arguments(额定参数)return new DirectExchange(Direct_Exchange, true, false);
}

在创立交换机和队列时通过构造方法将长久化的参数都设置为 true 即可实现交换机和队列的长久化。

@Override
public void sendMessage() {
    // 结构音讯(将音讯长久化)Message message = MessageBuilder.withBody("单程车票".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
    // 向 MQ 发送音讯(音讯内容都为音讯表记录的 id)rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
}

在发送音讯前通过调用 MessageBuildersetDeliveryMode(MessageDeliveryMode.PERSISTENT) 在结构音讯时设置音讯长久化(MessageDeliveryMode.PERSISTENT)即可实现对音讯的长久化。

通过确保音讯、交换机、队列的长久化操作能够保障音讯的在 RabbitMQ Server 中不失落,从而保障可靠性,其实除了长久化之外还须要保障 RabbitMQ 的高可用性,否则 MQ 都宕机或磁盘受损都无奈确保音讯的可靠性,对于高可用性这里就不作过多阐明,有趣味的能够去理解一下。


4. 保障消费者生产的音讯不失落

在保障发送方和 RabbitMQ Server 的音讯可靠性的前提下,只须要保障消费者在生产音讯时异样音讯不失落即可保障音讯的可靠性。

RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 可能感知到消费者是否生产胜利音讯,默认状况下,消费者应答机制是自动应答的,也就是 RabbitMQ 将音讯推送给消费者,便会从队列删除该音讯,如果消费者在生产过程失败时,音讯就存在失落的状况。所以须要将消费者应答机制设置为手动应答,只有消费者确认生产胜利后才会删除音讯,从而防止音讯的失落。

上面来看看如何配置消费者手动应答:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 开启发送方确认机制
    publisher-returns: true   # 开启音讯返回
    template:
      mandatory: true     # 音讯投递失败返回客户端
    listener:
      simple:
        acknowledge-mode: manual  # 开启手动确认生产机制

通过 listener.simple.acknowledge-mode = manual 即可将消费者应答机制设置为手动应答。

之后只须要在生产音讯时,通过调用 channel.basicAck()channel.basicNack() 来依据业务的执行胜利抉择是手动确认生产还是手动抛弃音讯。

/**
 * 监听生产队列的音讯
 */
@RabbitListener(queues = RabbitMQConfig.MESSAGE_QUEUE)
public void onMessage(Message message, Channel channel) {
    // 获取音讯索引
    long index = message.getMessageProperties().getDeliveryTag();
    // 解析音讯
    byte[] body = message.getBody();
    ...
    try {
        // 业务解决
        ...
        // 业务执行胜利则手动确认
        channel.basicAck(index, false);
    }catch (Exception e) {
        // 记录日志
        log.info("出现异常:{}", e.getMessage());
        try {
            // 手动抛弃信息
            channel.basicNack(index, false, false);
        } catch (IOException ex) {log.info("抛弃音讯异样");
        }
    }
}

这里阐明一下 basicAck()basicNack() 的参数阐明:

  • void basicAck(long deliveryTag, boolean multiple) 办法(会抛异样):

    • deliveryTag:该音讯的 index
    • multiple:是否批量解决(true 示意将一次性 ack 所有小于 deliveryTag 的音讯)
  • void basicNack(long deliveryTag, boolean multiple, boolean requeue) 办法(会抛异样):

    • deliveryTag:该音讯的 index
    • multiple:是否批量解决(true 示意将一次性 ack 所有小于 deliveryTag 的音讯)
    • requeue:被回绝的是否从新入队列(true 示意增加在队列的末端;false 示意抛弃)

通过设置手动确认消费者应答机制即可保障消费者在生产信息时的音讯可靠性。

Spring Boot 提供的音讯重试机制

除了消费者应答机制外,Spring Boot 也提供了一种重试机制,只须要通过配置即可实现音讯重试从而确保音讯的可靠性,这里简略介绍一下:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 开启主动确认生产机制
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 初始失败期待时长为 5 秒
          multiplier: 1  # 失败的期待时长倍数(下次期待时长 = multiplier * 上次等待时间)max-attempts: 3 # 最大重试次数
          stateless: true # true 无状态;false 有状态(如果业务中蕴含事务,这里改为 false)

通过配置在消费者的办法上如果执行失败或执行异样 只须要抛出异样(肯定要出现异常才会触发重试,留神:不要捕捉异样) 即可实现音讯重试,这样也能够保障音讯的可靠性。


总要有总结

下面就是我在我的项目中对于如何保障 RabbitMQ 的音讯可靠性的配置和实现计划了。上面想聊聊我在理论应用音讯队列实现音讯可靠性时遇到的问题。

消费者生产音讯须要保障幂等性

因为实现了音讯可靠性导致音讯重发或音讯重试造成消费者可能会存在音讯被反复生产的状况,这种状况就须要保障音讯不被反复生产,也就是音讯保障幂等性。

实现幂等性的办法有很多:借助数据库的乐观锁或乐观锁、借助 redis 的分布式锁、借助 redis 实现 token 机制等等都能够很好的保障音讯的幂等性。

应用音讯队列很难做到 100% 的音讯可靠性

我在我的项目理论开发中应用 RabbitMQ 实现音讯可靠性,实际后的感触是音讯队列很难能做到 100% 的音讯可靠性,下面的实现计划中 RabbitMQ 提供的机制做到的是尽可能地减小音讯失落的几率。

大多数状况下音讯失落都是因为代码呈现谬误,那么这样无论进行多少次重发都是无奈解决问题的,这样只会减少 CPU 的开销,所以我认为更好的解决办法是通过记录日志的形式期待后续回溯时更好的发现问题并解决问题。对于一些不是很须要保障百分百可靠性的场景,都能够通过记录日志的形式来保障音讯可靠性即可。

我在我的项目中采纳的是音讯落库的形式,先将音讯落库,而后生产者将音讯发送给 MQ,应用数据库记录音讯的生产状况,对于重试屡次依然无奈生产胜利的音讯,后续通过定时任务调度的形式对这些无奈生产胜利的音讯进行弥补。我认为这样能够尽可能地保障音讯的可靠性。然而同样这样也带来了问题就是音讯落库须要数据库磁盘 IO 的开销,增大数据库压力同时升高了性能。

总之,在实现音讯的可靠性时,应该依据我的项目的需要来思考如何解决。对于音讯要求可靠性低的只须要在出错时记录日志不便后续回溯解决出错问题即可,对于音讯可靠性要求高的则能够采纳音讯落库 + 定时工作的形式尽可能保障百分百的可靠性。

退出移动版