关于java:四种策略确保-RabbitMQ-消息发送可靠性你用哪种

40次阅读

共计 5556 个字符,预计需要花费 14 分钟才能阅读完成。

@[toc]
微服务能够设计成音讯驱动的微服务,响应式零碎也能够基于消息中间件来做,从这个角度来说,在互联网利用开发中,消息中间件真的是太重要了。

明天,以 RabbitMQ 为例,松哥来和大家聊一聊音讯两头音讯发送可靠性的问题。

留神,以下内容我次要和大家探讨如何确保音讯生产者将音讯发送胜利,并不波及音讯生产的问题。

1. RabbitMQ 音讯发送机制

大家晓得,RabbitMQ 中的音讯发送引入了 Exchange(交换机)的概念,音讯的发送首先达到交换机上,而后再依据既定的路由规定,由交换机将音讯路由到不同的 Queue(队列)中,再由不同的消费者去生产。

大抵的流程就是这样,所以要确保音讯发送的可靠性,次要从两方面去确认:

  1. 音讯胜利达到 Exchange
  2. 音讯胜利达到 Queue

如果能确认这两步,那么咱们就能够认为音讯发送胜利了。

如果这两步中任一步骤呈现问题,那么音讯就没有胜利送达,此时咱们可能要通过重试等形式去从新发送音讯,多次重试之后,如果音讯还是不能到达,则可能就须要人工染指了。

通过下面的剖析,咱们能够确认,要确保音讯胜利发送,咱们只须要做好三件事就能够了:

  1. 确认音讯达到 Exchange。
  2. 确认音讯达到 Queue。
  3. 开启定时工作,定时投递那些发送失败的音讯。

2. RabbitMQ 的致力

下面提出的三个步骤,第三步须要咱们本人实现,前两步 RabbitMQ 则有现成的解决方案。

如何确保音讯胜利达到 RabbitMQ?RabbitMQ 给出了两种计划:

  1. 开启事务机制
  2. 发送方确认机制

这是两种不同的计划,不能够同时开启,只能抉择其中之一,如果两者同时开启,则会报如下谬误:

咱们别离来看。以下所有案例都在 Spring Boot 中开展,文末能够下载相干源码。

2.1 开启事务机制

Spring Boot 中开启 RabbitMQ 事务机制的形式如下:

首先须要先提供一个事务管理器,如下:

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}

接下来,在音讯生产者下面做两件事:增加事务注解并设置通信信道为事务模式:

@Service
public class MsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Transactional
    public void send() {rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
        int i = 1 / 0;
    }
}

这里留神两点:

  1. 发送音讯的办法上增加 @Transactional 注解标记事务。
  2. 调用 setChannelTransacted 办法设置为 true 开启事务模式。

这就 OK 了。

在下面的案例中,咱们在结尾来了个 1/0,这在运行时必然抛出异样,咱们能够尝试运行该办法,发现音讯并未发送胜利。

当咱们开启事务模式之后,RabbitMQ 生产者发送音讯会多出四个步骤:

  1. 客户端发出请求,将信道设置为事务模式。
  2. 服务端给出回复,批准将信道设置为事务模式。
  3. 客户端发送音讯。
  4. 客户端提交事务。
  5. 服务端给出响应,确认事务提交。

下面的步骤,除了第三步是原本就有的,其余几个步骤都是平白无故多进去的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。咱们能够想想,什么我的项目会用到消息中间件?一般来说都是一些高并发的我的项目,这个时候并发性能尤为重要。

所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保音讯发送胜利,这种形式,性能要远远高于事务模式,一起来看下。

2.2 发送方确认机制

2.2.1 单条音讯解决

首先咱们移除刚刚对于事务的代码,而后在 application.properties 中配置开启音讯发送方确认机制,如下:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

第一行是配置音讯达到交换器的确认回调,第二行则是配置音讯达到队列的回调。

第一行属性的配置有三个取值:

  1. none:示意禁用公布确认模式,默认即此。
  2. correlated:示意胜利公布音讯到交换器后会触发的回调办法。
  3. simple:相似 correlated,并且反对 waitForConfirms()waitForConfirmsOrDie() 办法的调用。

接下来咱们要开启两个监听,具体配置如下:

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
    public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    Queue queue() {return new Queue(JAVABOY_QUEUE_NAME);
    }
    @Bean
    DirectExchange directExchange() {return new DirectExchange(JAVABOY_EXCHANGE_NAME);
    }
    @Bean
    Binding binding() {return BindingBuilder.bind(queue())
                .to(directExchange())
                .with(JAVABOY_QUEUE_NAME);
    }

    @PostConstruct
    public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {logger.info("{}: 音讯胜利达到交换器",correlationData.getId());
        }else{logger.error("{}: 音讯发送失败", correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {logger.error("{}: 音讯未胜利路由到队列",returned.getMessage().getMessageProperties().getMessageId());
    }
}

对于这个配置类,我说如下几点:

  1. 定义配置类,实现 RabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定音讯达到交换器,后者则会在音讯路由到队列失败时被调用。
  2. 定义 initRabbitTemplate 办法并增加 @PostConstruct 注解,在该办法中为 rabbitTemplate 别离配置这两个 Callback。

这就能够了。

接下来咱们对音讯发送进行测试。

首先咱们尝试将音讯发送到一个不存在的交换机中,像上面这样:

rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));

留神第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下谬误:

接下来咱们给定一个实在存在的交换器,然而给一个不存在的队列,像上面这样:

rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));

留神此时第二个参数是一个字符串,不是变量。

能够看到,音讯尽管胜利达到交换器了,然而没有胜利路由到队列(因为队列不存在)。

这是一条音讯的发送,咱们再来看看音讯的批量发送。

2.2.2 音讯批量解决

如果是音讯批量解决,那么发送胜利的回调监听是一样的,这里不再赘述。

这就是 publisher-confirm 模式。

相比于事务,这种模式下的音讯吞吐量会失去极大的晋升。

3. 失败重试

失败重试分两种状况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,然而音讯发送失败了。

两种重试咱们别离来看。

3.1 自带重试机制

后面所说的事务机制和发送方确认机制,都是发送方确认音讯发送胜利的方法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,然而这个重试机制就和 MQ 自身没有关系了,这是利用 Spring 中的 retry 机制来实现的,具体配置如下:

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2

从上往下配置含意顺次是:

  • 开启重试机制。
  • 重试起始间隔时间。
  • 最大重试次数。
  • 最大重试间隔时间。
  • 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)

配置实现后,再次启动 Spring Boot 我的项目,而后关掉 MQ,此时尝试发送音讯,就会发送失败,进而导致主动重试。

3.2 业务重试

业务重试次要是针对音讯没有达到交换器的状况。

如果音讯没有胜利达到交换器,依据咱们第二大节的解说,此时就会触发音讯发送失败回调,在这个回调中,咱们就能够做文章了!

整体思路是这样:

  1. 首先创立一张表,用来记录发送到中间件上的音讯,像上面这样:

每次发送音讯的时候,就往数据库中增加一条记录。这里的字段都很好了解,有三个我额定说下:

  • status:示意音讯的状态,有三个取值,0,1,2 别离示意音讯发送中、音讯发送胜利以及音讯发送失败。
  • tryTime:示意音讯的第一次重试工夫(音讯收回去之后,在 tryTime 这个工夫点还未显示发送胜利,此时就能够开始重试了)。
  • count:示意音讯重试次数。

其余字段都很好了解,我就不一一啰嗦了。

  1. 在音讯发送的时候,咱们就往该表中保留一条音讯发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
  2. 在 confirm 回调办法中,如果收到音讯发送胜利的回调,就将该条音讯的 status 设置为 1(在音讯发送时为音讯设置 msgId,在音讯发送胜利回调时,通过 msgId 来惟一锁定该条音讯)。
  3. 另外开启一个定时工作,定时工作每隔 10s 就去数据库中捞一次音讯,专门去捞那些 status 为 0 并且曾经过了 tryTime 工夫记录,把这些音讯拎进去后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则批改该条音讯的 status 为 2,示意这条音讯发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则从新去发送音讯,并且为其 count 的值 +1。

大抵的思路就是下面这样,松哥这里就不给出代码了,松哥的 vhr 里边邮件发送就是这样的思路来解决的,残缺代码大家能够参考 vhr 我的项目(https://github.com/lenve/vhr)。

当然这种思路有两个弊病:

  1. 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候咱们并不需要 MQ 有很高的 Qos,所以这个利用时要看具体情况。
  2. 依照下面的思路,可能会呈现同一条音讯反复发送的状况,不过这都不是事,咱们在音讯生产时,解决好幂等性问题就行了。

当然,大家也要留神,音讯是否要确保 100% 发送胜利,也要看具体情况。

4. 小结

好啦,这就是对于音讯生产者的一些常见问题以及对应的解决方案,下篇文章松哥和大家探讨如果保障音讯生产胜利并解决幂等性问题。

本文波及到的相干源代码大家能够在这里下载:https://github.com/lenve/java…。

正文完
 0