关于rabbitmq:三RabbitMQ消息确认机制AMQP事务

咱们晓得能够通过长久化(交换机、队列和音讯长久化)来保障咱们在服务器解体是,重启服务器音讯数据不会失落,然而咱们无奈确认当音讯的发送者在将音讯发送进来之后,音讯到底有没有正确达到Broker代理服务器,如果不进行非凡配置的话,默认状况下公布操作是不会返回任何信息给生产者的,也就是默认状况下咱们的生产者是不晓得音讯有没有正确达到Broker的,如果音讯在达到Broker之前失落的话,长久化操作也解决不了这个问题,因为音讯基本就没达到代理服务器,这个是没有方法进行长久化的。这篇介绍应用AMQP协定层面提供的解决方案。

一、应用java原生事务

RabbitMQ的事务机制与事务无关的次要有三个办法:

  • txSelect()
  • txCommit()
  • txRollback()

txSelect次要用于将以后channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。

当咱们应用txSelect提交开始事务之后,咱们就能够公布音讯给Broke代理服务器,如果txCommit提交胜利了,则音讯肯定达到了Broke了,如果在txCommit执行之前Broker出现异常解体或者因为其余起因抛出异样,这个时候咱们便能够捕捉异样通过txRollback办法进行回滚事务了。这套事务次要代码为:

    channel.txSelect();
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    channel.txCommit();

即音讯的散发过程为:

  1. Client发送Tx.Select
  2. Broker发送Tx.Select-Ok(在它之后,发送音讯)
  3. Client发送Tx.Commit
  4. Broker发送Tx.Commit-Ok

二、联合SpringBoot应用事务

在SpringBoot中次要是通过封装的RabbitTemplate模板来实现音讯的发送,这里次要也是分为两种状况,应用RabbitTemplate同步发送,或者异步发送。

留神:公布确认和事务。(两者不可同时应用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可应用事务。

所以在应用事务时,在application.properties中,须要将确认模式更改为false。

# 反对公布确认
spring.rabbitmq.publisher-confirms=false

A、同步

通过设置RabbitTemplate的channelTransacted为true,来设置事务环境,使得能够应用RabbitMQ事务。如下:

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplateNew() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setChannelTransacted(true);
        return template;
    }

这里与后面咱们解说的原生事务是统一的,而当发送音讯出现异常时,就会响应执行事务回滚。

B、异步

方才咱们解说的是同步的状况,当初咱们解说一下异步的模式。在异步当中,次要应用MessageListener 接口,它是 Spring AMQP 异步音讯投递的监听器接口。而MessageListener的实现类SimpleMessageListenerContainer则是作为了整个异步音讯投递的外围类存在。

接下来咱们开始介绍应用异步的办法,同样示意须要的内部事务,用户须要在容器配置的时候指定PlatformTransactionManager的实现。代码如下:

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setTransactionManager(rabbitTransactionManager());
        container.setChannelTransacted(true);
        // 开启手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueues(transitionQueue());
        container.setMessageListener(new TransitionConsumer());
        return container;
    }

这段代码咱们是增加在config下的RabbitConfig.java下,通过配置事务管理器,将channelTransacted属性被设置为true。

在容器中配置事务时,如果提供了transactionManager,channelTransaction必须为true,使得如果监听器解决失败,并且抛出异样,那么事务将进行回滚,那么音讯将返回给音讯代理;如果为false,内部的事务依然能够提供给监听容器,造成的影响是在回滚的业务操作中也会提交音讯传输的操作。

通过应用RabbitTransactionManager,这个事务管理器是PlatformTransactionManager接口的实现,它只能在一个Rabbit ConnectionFactory中应用。

留神:这种策略不可能提供XA事务,例如在音讯和数据库之间共享事务。

除了下面的代码外,还有RabbitTransactionManager和TransitionConsumer须要增加,代码如下:

    /**
     * 申明transition2队列
     * 
     * @return
     */
    @Bean
    public Queue transitionQueue() {
        return new Queue("transition2");
    }
    
    /**
     * 事务管理
     * 
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager() {
        return new RabbitTransactionManager(connectionFactory());
    }

    /**
     * 自定义消费者
     */
    public class TransitionConsumer implements ChannelAwareMessageListener {

        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            byte[] body = message.getBody();
            System.out.println("TransitionConsumer: " + new String(body));
            // 确认音讯胜利生产
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // 除以0,模仿异样,进行事务回滚
            // int t = 1 / 0;
        }
    }