关于rabbitmq:三RabbitMQ消息确认机制AMQP事务

43次阅读

共计 3091 个字符,预计需要花费 8 分钟才能阅读完成。

咱们晓得能够通过长久化(交换机、队列和音讯长久化)来保障咱们在服务器解体是,重启服务器音讯数据不会失落,然而咱们无奈确认当音讯的发送者在将音讯发送进来之后,音讯到底有没有正确达到 Broker 代理服务器,如果不进行非凡配置的话,默认状况下公布操作是不会返回任何信息给生产者的,也就是默认状况下咱们的生产者是不晓得音讯有没有正确达到 Broker 的,如果音讯在达到 Broker 之前失落的话,长久化操作也解决不了这个问题,因为音讯基本就没达到代理服务器,这个是没有方法进行长久化的。这篇介绍应用 AMQP 协定层面提供的解决方案。

一、应用 java 原生事务

RabbitMQ 的事务机制与事务无关的次要有三个办法:

  • txSelect()
  • txCommit()
  • txRollback()

txSelect 次要用于将以后 channel 设置成 transaction 模式,txCommit 用于提交事务,txRollback 用于回滚事务。

当咱们应用 txSelect 提交开始事务之后,咱们就能够公布音讯给 Broke 代理服务器,如果 txCommit 提交胜利了,则音讯肯定达到了 Broke 了,如果在 txCommit 执行之前 Broker 出现异常解体或者因为其余起因抛出异样,这个时候咱们便能够捕捉异样通过 txRollback 办法进行回滚事务了。这套事务次要代码为:

    channel.txSelect();
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    channel.txCommit();

即音讯的散发过程为:

  1. Client 发送 Tx.Select
  2. Broker 发送 Tx.Select-Ok(在它之后,发送音讯)
  3. Client 发送 Tx.Commit
  4. Broker 发送 Tx.Commit-Ok

二、联合 SpringBoot 应用事务

在 SpringBoot 中次要是通过封装的 RabbitTemplate 模板来实现音讯的发送,这里次要也是分为两种状况,应用 RabbitTemplate 同步发送,或者异步发送。

留神:公布确认和事务。(两者不可同时应用) 在 channel 为事务时,不可引入确认模式;同样 channel 为确认模式下,不可应用事务。

所以在应用事务时,在 application.properties 中,须要将确认模式更改为 false。

# 反对公布确认
spring.rabbitmq.publisher-confirms=false

A、同步

通过设置 RabbitTemplate 的 channelTransacted 为 true,来设置事务环境,使得能够应用 RabbitMQ 事务。如下:

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplateNew() {RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setChannelTransacted(true);
        return template;
    }

这里与后面咱们解说的原生事务是统一的,而当发送音讯出现异常时,就会响应执行事务回滚。

B、异步

方才咱们解说的是同步的状况,当初咱们解说一下异步的模式。在异步当中,次要应用 MessageListener 接口,它是 Spring AMQP 异步音讯投递的监听器接口。而 MessageListener 的实现类 SimpleMessageListenerContainer 则是作为了整个异步音讯投递的外围类存在。

接下来咱们开始介绍应用异步的办法,同样示意须要的内部事务,用户须要在容器配置的时候指定 PlatformTransactionManager 的实现。代码如下:

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setTransactionManager(rabbitTransactionManager());
        container.setChannelTransacted(true);
        // 开启手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueues(transitionQueue());
        container.setMessageListener(new TransitionConsumer());
        return container;
    }

这段代码咱们是增加在 config 下的 RabbitConfig.java 下,通过配置事务管理器,将 channelTransacted 属性被设置为 true。

在容器中配置事务时,如果提供了 transactionManager,channelTransaction 必须为 true,使得如果监听器解决失败,并且抛出异样,那么事务将进行回滚,那么音讯将返回给音讯代理;如果为 false,内部的事务依然能够提供给监听容器,造成的影响是在回滚的业务操作中也会提交音讯传输的操作。

通过应用 RabbitTransactionManager,这个事务管理器是 PlatformTransactionManager 接口的实现,它只能在一个 Rabbit ConnectionFactory 中应用。

留神:这种策略不可能提供 XA 事务,例如在音讯和数据库之间共享事务。

除了下面的代码外,还有 RabbitTransactionManager 和 TransitionConsumer 须要增加,代码如下:

    /**
     * 申明 transition2 队列
     * 
     * @return
     */
    @Bean
    public Queue transitionQueue() {return new Queue("transition2");
    }
    
    /**
     * 事务管理
     * 
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager() {return new RabbitTransactionManager(connectionFactory());
    }

    /**
     * 自定义消费者
     */
    public class TransitionConsumer implements ChannelAwareMessageListener {

        @Override
        public void onMessage(Message message, Channel channel) throws Exception {byte[] body = message.getBody();
            System.out.println("TransitionConsumer:" + new String(body));
            // 确认音讯胜利生产
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // 除以 0,模仿异样,进行事务回滚
            // int t = 1 / 0;
        }
    }

正文完
 0