共计 3091 个字符,预计需要花费 8 分钟才能阅读完成。
咱们晓得能够通过长久化(交换机、队列和音讯长久化)来保障咱们在服务器解体是,重启服务器音讯数据不会失落,然而咱们无奈确认当音讯的发送者在将音讯发送进来之后,音讯到底有没有正确达到 Broker 代理服务器,如果不进行非凡配置的话,默认状况下公布操作是不会返回任何信息给生产者的,也就是默认状况下咱们的生产者是不晓得音讯有没有正确达到 Broker 的,如果音讯在达到 Broker 之前失落的话,长久化操作也解决不了这个问题,因为音讯基本就没达到代理服务器,这个是没有方法进行长久化的。这篇介绍应用 AMQP 协定层面提供的解决方案。
一、应用 java 原生事务
RabbitMQ 的事务机制与事务无关的次要有三个办法:
- txSelect()
- txCommit()
- txRollback()
txSelect 次要用于将以后 channel 设置成 transaction 模式,txCommit 用于提交事务,txRollback 用于回滚事务。
当咱们应用 txSelect 提交开始事务之后,咱们就能够公布音讯给 Broke 代理服务器,如果 txCommit 提交胜利了,则音讯肯定达到了 Broke 了,如果在 txCommit 执行之前 Broker 出现异常解体或者因为其余起因抛出异样,这个时候咱们便能够捕捉异样通过 txRollback 办法进行回滚事务了。这套事务次要代码为:
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
即音讯的散发过程为:
- Client 发送 Tx.Select
- Broker 发送 Tx.Select-Ok(在它之后,发送音讯)
- Client 发送 Tx.Commit
- Broker 发送 Tx.Commit-Ok
二、联合 SpringBoot 应用事务
在 SpringBoot 中次要是通过封装的 RabbitTemplate 模板来实现音讯的发送,这里次要也是分为两种状况,应用 RabbitTemplate 同步发送,或者异步发送。
留神:公布确认和事务。(两者不可同时应用) 在 channel 为事务时,不可引入确认模式;同样 channel 为确认模式下,不可应用事务。
所以在应用事务时,在 application.properties 中,须要将确认模式更改为 false。
# 反对公布确认
spring.rabbitmq.publisher-confirms=false
A、同步
通过设置 RabbitTemplate 的 channelTransacted 为 true,来设置事务环境,使得能够应用 RabbitMQ 事务。如下:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplateNew() {RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setChannelTransacted(true);
return template;
}
这里与后面咱们解说的原生事务是统一的,而当发送音讯出现异常时,就会响应执行事务回滚。
B、异步
方才咱们解说的是同步的状况,当初咱们解说一下异步的模式。在异步当中,次要应用 MessageListener 接口,它是 Spring AMQP 异步音讯投递的监听器接口。而 MessageListener 的实现类 SimpleMessageListenerContainer 则是作为了整个异步音讯投递的外围类存在。
接下来咱们开始介绍应用异步的办法,同样示意须要的内部事务,用户须要在容器配置的时候指定 PlatformTransactionManager 的实现。代码如下:
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setTransactionManager(rabbitTransactionManager());
container.setChannelTransacted(true);
// 开启手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueues(transitionQueue());
container.setMessageListener(new TransitionConsumer());
return container;
}
这段代码咱们是增加在 config 下的 RabbitConfig.java 下,通过配置事务管理器,将 channelTransacted 属性被设置为 true。
在容器中配置事务时,如果提供了 transactionManager,channelTransaction 必须为 true,使得如果监听器解决失败,并且抛出异样,那么事务将进行回滚,那么音讯将返回给音讯代理;如果为 false,内部的事务依然能够提供给监听容器,造成的影响是在回滚的业务操作中也会提交音讯传输的操作。
通过应用 RabbitTransactionManager,这个事务管理器是 PlatformTransactionManager 接口的实现,它只能在一个 Rabbit ConnectionFactory 中应用。
留神:这种策略不可能提供 XA 事务,例如在音讯和数据库之间共享事务。
除了下面的代码外,还有 RabbitTransactionManager 和 TransitionConsumer 须要增加,代码如下:
/**
* 申明 transition2 队列
*
* @return
*/
@Bean
public Queue transitionQueue() {return new Queue("transition2");
}
/**
* 事务管理
*
* @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager() {return new RabbitTransactionManager(connectionFactory());
}
/**
* 自定义消费者
*/
public class TransitionConsumer implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {byte[] body = message.getBody();
System.out.println("TransitionConsumer:" + new String(body));
// 确认音讯胜利生产
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 除以 0,模仿异样,进行事务回滚
// int t = 1 / 0;
}
}