共计 5556 个字符,预计需要花费 14 分钟才能阅读完成。
@[toc]
微服务能够设计成音讯驱动的微服务,响应式零碎也能够基于消息中间件来做,从这个角度来说,在互联网利用开发中,消息中间件真的是太重要了。
明天,以 RabbitMQ 为例,松哥来和大家聊一聊音讯两头音讯发送可靠性的问题。
留神,以下内容我次要和大家探讨如何确保音讯生产者将音讯发送胜利,并不波及音讯生产的问题。
1. RabbitMQ 音讯发送机制
大家晓得,RabbitMQ 中的音讯发送引入了 Exchange(交换机)的概念,音讯的发送首先达到交换机上,而后再依据既定的路由规定,由交换机将音讯路由到不同的 Queue(队列)中,再由不同的消费者去生产。
大抵的流程就是这样,所以要确保音讯发送的可靠性,次要从两方面去确认:
- 音讯胜利达到 Exchange
- 音讯胜利达到 Queue
如果能确认这两步,那么咱们就能够认为音讯发送胜利了。
如果这两步中任一步骤呈现问题,那么音讯就没有胜利送达,此时咱们可能要通过重试等形式去从新发送音讯,多次重试之后,如果音讯还是不能到达,则可能就须要人工染指了。
通过下面的剖析,咱们能够确认,要确保音讯胜利发送,咱们只须要做好三件事就能够了:
- 确认音讯达到 Exchange。
- 确认音讯达到 Queue。
- 开启定时工作,定时投递那些发送失败的音讯。
2. RabbitMQ 的致力
下面提出的三个步骤,第三步须要咱们本人实现,前两步 RabbitMQ 则有现成的解决方案。
如何确保音讯胜利达到 RabbitMQ?RabbitMQ 给出了两种计划:
- 开启事务机制
- 发送方确认机制
这是两种不同的计划,不能够同时开启,只能抉择其中之一,如果两者同时开启,则会报如下谬误:
咱们别离来看。以下所有案例都在 Spring Boot 中开展,文末能够下载相干源码。
2.1 开启事务机制
Spring Boot 中开启 RabbitMQ 事务机制的形式如下:
首先须要先提供一个事务管理器,如下:
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}
接下来,在音讯生产者下面做两件事:增加事务注解并设置通信信道为事务模式:
@Service
public class MsgService {
@Autowired
RabbitTemplate rabbitTemplate;
@Transactional
public void send() {rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
int i = 1 / 0;
}
}
这里留神两点:
- 发送音讯的办法上增加
@Transactional
注解标记事务。 - 调用 setChannelTransacted 办法设置为 true 开启事务模式。
这就 OK 了。
在下面的案例中,咱们在结尾来了个 1/0,这在运行时必然抛出异样,咱们能够尝试运行该办法,发现音讯并未发送胜利。
当咱们开启事务模式之后,RabbitMQ 生产者发送音讯会多出四个步骤:
- 客户端发出请求,将信道设置为事务模式。
- 服务端给出回复,批准将信道设置为事务模式。
- 客户端发送音讯。
- 客户端提交事务。
- 服务端给出响应,确认事务提交。
下面的步骤,除了第三步是原本就有的,其余几个步骤都是平白无故多进去的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。咱们能够想想,什么我的项目会用到消息中间件?一般来说都是一些高并发的我的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保音讯发送胜利,这种形式,性能要远远高于事务模式,一起来看下。
2.2 发送方确认机制
2.2.1 单条音讯解决
首先咱们移除刚刚对于事务的代码,而后在 application.properties 中配置开启音讯发送方确认机制,如下:
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
第一行是配置音讯达到交换器的确认回调,第二行则是配置音讯达到队列的回调。
第一行属性的配置有三个取值:
- none:示意禁用公布确认模式,默认即此。
- correlated:示意胜利公布音讯到交换器后会触发的回调办法。
- simple:相似 correlated,并且反对
waitForConfirms()
和waitForConfirmsOrDie()
办法的调用。
接下来咱们要开启两个监听,具体配置如下:
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {return new Queue(JAVABOY_QUEUE_NAME);
}
@Bean
DirectExchange directExchange() {return new DirectExchange(JAVABOY_EXCHANGE_NAME);
}
@Bean
Binding binding() {return BindingBuilder.bind(queue())
.to(directExchange())
.with(JAVABOY_QUEUE_NAME);
}
@PostConstruct
public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {logger.info("{}: 音讯胜利达到交换器",correlationData.getId());
}else{logger.error("{}: 音讯发送失败", correlationData.getId());
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {logger.error("{}: 音讯未胜利路由到队列",returned.getMessage().getMessageProperties().getMessageId());
}
}
对于这个配置类,我说如下几点:
- 定义配置类,实现
RabbitTemplate.ConfirmCallback
和RabbitTemplate.ReturnsCallback
两个接口,这两个接口,前者的回调用来确定音讯达到交换器,后者则会在音讯路由到队列失败时被调用。 - 定义 initRabbitTemplate 办法并增加 @PostConstruct 注解,在该办法中为 rabbitTemplate 别离配置这两个 Callback。
这就能够了。
接下来咱们对音讯发送进行测试。
首先咱们尝试将音讯发送到一个不存在的交换机中,像上面这样:
rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
留神第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下谬误:
接下来咱们给定一个实在存在的交换器,然而给一个不存在的队列,像上面这样:
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
留神此时第二个参数是一个字符串,不是变量。
能够看到,音讯尽管胜利达到交换器了,然而没有胜利路由到队列(因为队列不存在)。
这是一条音讯的发送,咱们再来看看音讯的批量发送。
2.2.2 音讯批量解决
如果是音讯批量解决,那么发送胜利的回调监听是一样的,这里不再赘述。
这就是 publisher-confirm 模式。
相比于事务,这种模式下的音讯吞吐量会失去极大的晋升。
3. 失败重试
失败重试分两种状况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,然而音讯发送失败了。
两种重试咱们别离来看。
3.1 自带重试机制
后面所说的事务机制和发送方确认机制,都是发送方确认音讯发送胜利的方法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,然而这个重试机制就和 MQ 自身没有关系了,这是利用 Spring 中的 retry 机制来实现的,具体配置如下:
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2
从上往下配置含意顺次是:
- 开启重试机制。
- 重试起始间隔时间。
- 最大重试次数。
- 最大重试间隔时间。
- 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
配置实现后,再次启动 Spring Boot 我的项目,而后关掉 MQ,此时尝试发送音讯,就会发送失败,进而导致主动重试。
3.2 业务重试
业务重试次要是针对音讯没有达到交换器的状况。
如果音讯没有胜利达到交换器,依据咱们第二大节的解说,此时就会触发音讯发送失败回调,在这个回调中,咱们就能够做文章了!
整体思路是这样:
- 首先创立一张表,用来记录发送到中间件上的音讯,像上面这样:
每次发送音讯的时候,就往数据库中增加一条记录。这里的字段都很好了解,有三个我额定说下:
- status:示意音讯的状态,有三个取值,0,1,2 别离示意音讯发送中、音讯发送胜利以及音讯发送失败。
- tryTime:示意音讯的第一次重试工夫(音讯收回去之后,在 tryTime 这个工夫点还未显示发送胜利,此时就能够开始重试了)。
- count:示意音讯重试次数。
其余字段都很好了解,我就不一一啰嗦了。
- 在音讯发送的时候,咱们就往该表中保留一条音讯发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
- 在 confirm 回调办法中,如果收到音讯发送胜利的回调,就将该条音讯的 status 设置为 1(在音讯发送时为音讯设置 msgId,在音讯发送胜利回调时,通过 msgId 来惟一锁定该条音讯)。
- 另外开启一个定时工作,定时工作每隔 10s 就去数据库中捞一次音讯,专门去捞那些 status 为 0 并且曾经过了 tryTime 工夫记录,把这些音讯拎进去后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则批改该条音讯的 status 为 2,示意这条音讯发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则从新去发送音讯,并且为其 count 的值 +1。
大抵的思路就是下面这样,松哥这里就不给出代码了,松哥的 vhr 里边邮件发送就是这样的思路来解决的,残缺代码大家能够参考 vhr 我的项目(https://github.com/lenve/vhr)。
当然这种思路有两个弊病:
- 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候咱们并不需要 MQ 有很高的 Qos,所以这个利用时要看具体情况。
- 依照下面的思路,可能会呈现同一条音讯反复发送的状况,不过这都不是事,咱们在音讯生产时,解决好幂等性问题就行了。
当然,大家也要留神,音讯是否要确保 100% 发送胜利,也要看具体情况。
4. 小结
好啦,这就是对于音讯生产者的一些常见问题以及对应的解决方案,下篇文章松哥和大家探讨如果保障音讯生产胜利并解决幂等性问题。
本文波及到的相干源代码大家能够在这里下载:https://github.com/lenve/java…。