RabbitMQ音讯的事务机制

 在应用RabbitMQ的时候,咱们能够通过音讯长久化操作来解决因为服务器的异样奔溃导致的音讯失落,除此之外咱们还会遇到一个问题,当音讯的发布者在将音讯发送进来之后,音讯到底有没有正确达到broker代理服务器呢?如果不进行非凡配置的话,默认状况下公布操作是不会返回任何信息给生产者的,也就是默认状况下咱们的生产者是不晓得音讯有没有正确达到broker的,如果在音讯达到broker之前曾经失落的话,长久化操作也解决不了这个问题,因为音讯基本就没达到代理服务器,你怎么进行长久化,那么这个问题该怎么解决呢?

RabbitMQ为咱们提供了两种形式:

  • 通过AMQP事务机制实现,这也是AMQP协定层面提供的解决方案;
  • 通过将channel设置成confirm模式来实现;

AMQP事物机制管制

 RabbitMQ中与事务机制无关的办法有三个:txSelect(), txCommit()以及txRollback(), txSelect()用于将以后channel设置成transaction模式,txCommit()用于提交事务,txRollback()用于回滚事务,在通过txSelect()开启事务之后,咱们便能够公布音讯给broker代理服务器了,如果txCommit()提交胜利了,则音讯肯定达到了broker了,如果在txCommit()执行之前broker异样解体或者因为其余起因抛出异样,这个时候咱们便能够捕捉异样通过txRollback()回滚事务。

SendTx.java

try {    // 通过工厂创立连贯    connection = factory.newConnection();    // 获取通道    channel = connection.createChannel();    // 开启事务    channel.txSelect();    // 申明队列    channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 创立音讯    String message = "Hello World!";    // 将产生的音讯放入队列    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));    System.out.println(" [x] Sent '" + message + "'");    // 模拟程序异样    int i = 1 / 0;    // 提交事务    channel.txCommit();} catch (IOException | TimeoutException e) {    e.printStackTrace();    try {        // 回滚事务        channel.txRollback();    } catch (IOException e1) {        e1.printStackTrace();    }}

 事务的确可能解决producer与broker之间音讯确认的问题,只有音讯胜利被broker承受,事务提交能力胜利,否则咱们便能够在捕捉异样进行事务回滚操作同时进行音讯重发,然而应用事务机制的话会升高RabbitMQ的性能,那么有没有更好的办法既能保障producer晓得音讯曾经正确送到,又能基本上不带来性能上的损失呢?从AMQP协定的层面看是没有更好的办法,然而RabbitMQ提供了一个更好的计划,行将channel信道设置成confirm模式。

confirm确认模式

 通过AMQP协定层面为咱们提供了事务机制解决了这个问题,然而采纳事务机制实现会升高RabbitMQ的音讯吞吐量,此时解决AMQP协定层面可能实现音讯事物管制外,咱们还有第二种形式即:Confirm模式。

Confirm确认模式原理

 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道下面公布的音讯都会被指派一个惟一的ID(从1开始),一旦音讯被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(蕴含音讯的惟一ID),这就使得生产者晓得音讯曾经正确达到目标队列了,如果音讯和队列是可长久化的,那么确认音讯会将音讯写入磁盘之后收回,broker回传给生产者的确认音讯中deliver-tag域蕴含了确认音讯的序列号,此外broker也能够设置basic.ack的multiple域,示意到这个序列号之前的所有音讯都曾经失去了解决。

 confirm模式最大的益处在于他是异步的,一旦公布一条音讯,生产者应用程序就能够在等信道返回确认的同时持续发送下一条音讯,当音讯最终失去确认之后,生产者利用便能够通过回调办法来解决该确认音讯,如果RabbitMQ因为本身外部谬误导致音讯失落,就会发送一条nack音讯,生产者应用程序同样能够在回调办法中解决该nack音讯。

 在channel 被设置成 confirm 模式之后,所有被 publish 的后续音讯都将被 confirm(即 ack) 或者被nack一次。然而没有对音讯被 confirm 的快慢做任何保障,并且同一条音讯不会既被 confirm又被nack 。

 留神:两种事物管制模式不能同时开启!

Confirm确认机制代码实现

 实现生产者confirm 机制有三种形式:

  • 一般confirm模式:每发送一条音讯后,调用waitForConfirms()办法,期待服务器端confirm。实际上是一种串行confirm了。
  • 批量confirm模式:每发送一批音讯后,调用waitForConfirmsOrDie()办法,期待服务器端confirm。
  • 异步confirm模式:提供一个回调办法,服务端confirm了一条或者多条音讯后Client端会回调这个办法。

同步Confirm

SendConfirmSync.java

try {    // 通过工厂创立连贯    connection = factory.newConnection();    // 获取通道    channel = connection.createChannel();    // 开启confirm确认模式    channel.confirmSelect();    // 申明队列    channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 创立音讯    String message = "Hello World!";    // 将产生的音讯放入队列    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));    System.out.println(" [x] Sent '" + message + "'");    // 确认音讯是否发送胜利-单条    if (channel.waitForConfirms())        System.out.println("音讯发送胜利!");    else        System.out.println("音讯发送失败!");    // 确认音讯是否发送胜利-多条    // 直到所有音讯都确认,只有有一个未确认就会IOException    channel.waitForConfirmsOrDie();    System.out.println("音讯发送胜利!");}

 以上代码能够看出,应用同步的形式须要等所有的音讯发送胜利当前才会执行前面代码,只有有一个音讯未被确认就会抛出IO异样。解决办法能够应用异步确认。

异步confirm

 异步confirm模式的编程实现最简单,Channel对象提供的ConfirmListener()回调办法只蕴含deliveryTag(以后Chanel收回的音讯序号),咱们须要本人为每一个Channel保护一个unconfirm的音讯序号汇合,每publish一条数据,汇合中元素加1,每回调一次handleAck办法,unconfirm汇合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm汇合最好采纳有序汇合SortedSet存储构造。实际上,waitForConfirms()办法也是通过SortedSet保护音讯序号的。

SendConfirmAsync.java

package com.xxxx.confirm.async.send;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Collections;import java.util.SortedSet;import java.util.TreeSet;import java.util.concurrent.TimeoutException;/** * 信道确认模式-异步-生产者 */public class Send {    // 队列名称    public static final String QUEUE_NAME = "confirm_async";    public static void main(String[] args) {        // 定义连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setPort(5672);        factory.setHost("192.168.10.100");        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        Connection connection = null;        Channel channel = null;        try {            // 保护信息发送回执deliveryTag            final SortedSet<Long> confirmSet=Collections.synchronizedSortedSet(new TreeSet<Long>());            // 创立连贯            connection = factory.newConnection();            // 获取通道            channel = connection.createChannel();            // 开启confirm确认模式            channel.confirmSelect();            // 申明队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            // 增加channel 监听            channel.addConfirmListener(new ConfirmListener() {                // 已确认                @Override                public void handleAck(long deliveryTag, boolean multiple) throws IOException {                    // multiple=true已确认多条 false已确认单条                    if (multiple) {                        System.out.println("handleAck--success-->multiple" + deliveryTag);                        // 革除前 deliveryTag 项标识id                        confirmSet.headSet(deliveryTag + 1L).clear();                    } else {                        System.out.println("handleAck--success-->single" + deliveryTag);                        confirmSet.remove(deliveryTag);                    }                }                // 未确认                @Override                public void handleNack(long deliveryTag, boolean multiple) throws IOException {                    // multiple=true未确认多条 false未确认单条                    if (multiple) {                        System.out.println("handleNack--failed-->multiple-->" + deliveryTag);                        // 革除前 deliveryTag 项标识id                        confirmSet.headSet(deliveryTag + 1L).clear();                    } else {                        System.out.println("handleNack--failed-->single" + deliveryTag);                        confirmSet.remove(deliveryTag);                    }                }            });            // 循环发送音讯演示音讯确认            while (true) {                // 创立音讯                String message = "Hello World!";                // 获取unconfirm的音讯序号deliveryTag                Long seqNo = channel.getNextPublishSeqNo();                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));                // 将音讯序号deliveryTag增加至SortedSet                confirmSet.add(seqNo);            }        } catch (IOException | TimeoutException e) {            e.printStackTrace();        } finally {            try {                // 敞开通道                if (null != channel && channel.isOpen())                    channel.close();                // 敞开连贯                if (null != connection && connection.isOpen())                    connection.close();            } catch (TimeoutException e) {                e.printStackTrace();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

异步模式的长处就是执行效率高,不须要期待音讯执行完,只须要监听音讯即可。

Spring集成RabbitMQ

官网:https://spring.io/projects/sp...

为什么应用spring AMQP?

  • 基于Spring之上社区沉闷
  • 对AMQP协定进行了高度的封装
  • 极大的简化了RabbitMQ的操作
  • 易用性、可扩大

创立聚合我的项目

创立父我的项目spring-rabbitmq

创立rabbitmq-provider

鼠标右键spring-rabbitmq我的项目new -> Module

创立rabbitmq-consumer

鼠标右键spring-rabbitmq我的项目new -> Module

父我的项目依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.1.7.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <modules>        <module>rabbitmq-provider</module>        <module>rabbitmq-consumer</module>    </modules>    <groupId>com.xxxx</groupId>    <artifactId>spring-rabbitmq</artifactId>    <version>0.0.1-SNAPSHOT</version>    <packaging>pom</packaging>    <name>srping-rabbitmq</name>    <description>Demo project for Spring Boot</description>    <properties>        <java.version>1.8</java.version>    </properties></project>

编写生产者

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>com.xxxx</groupId>        <artifactId>spring-rabbitmq</artifactId>        <version>0.0.1-SNAPSHOT</version>    </parent>    <groupId>com.xxxx</groupId>    <artifactId>rabbitmq-provider</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>rabbitmq-provider</name>    <description>Demo project for Spring Boot</description>    <properties>        <java.version>1.8</java.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>        </dependency>    </dependencies></project>

application.yml

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: shop    password: shop    virtual-host: /shopserver:  port: 8081

RabbitmqConfig.java

package com.xxxx.rabbitmqprovider.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitmqConfig {    /**     * 申明队列     * @return     */    @Bean    public Queue queue(){        return new Queue("topics");    }    /**     * 申明交换机(主题模式)     * @return     */    @Bean    public TopicExchange topicExchange(){        return new TopicExchange("topicExchange");    }    /**     * 将队列绑定到交换机     * @return     */    @Bean    public Binding binding(){        return BindingBuilder.bind(queue()).to(topicExchange()).with("topic.msg");    }}

Send.java

package com.xxxx.rabbitmqprovider.send;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Sender {    @Autowired    private RabbitTemplate rabbitTemplate;    public void send() {        String message = "Hello World!";        /**         * 第一个参数:交换机名称         * 第二个参数:路由key名称         * 第三个参数:发送的音讯         */        rabbitTemplate.convertAndSend("topicExchange", "topic.msg", message);        System.out.println("发送:" + message);    }}

RabbitmqProviderTestApplication.java

package com.xxxx.rabbitmqprovider;import com.xxxx.rabbitmqprovider.send.Sender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTest(classes = RabbitmqProviderApplication.class)public class  {    @Autowired    private Sender sender;    @Test    public void testSend(){        sender.send();    }}

编写消费者

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>com.xxxx</groupId>        <artifactId>spring-rabbitmq</artifactId>        <version>0.0.1-SNAPSHOT</version>    </parent>    <groupId>com.xxxx</groupId>    <artifactId>rabbitmq-consumer</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>rabbitmq-consumer</name>    <description>Demo project for Spring Boot</description>    <properties>        <java.version>1.8</java.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>        </dependency>    </dependencies></project>

application.yml

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: shop    password: shop    virtual-host: /shopserver:  port: 8082

Consumer.java

package com.xxxx.rabbitmqconsumer.revc;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component//监听队列@RabbitListener(queues = "topics")public class Consumer {    //示意接管音讯后的解决办法    @RabbitHandler    public void recv(String message){        System.out.println("接管音讯:"+message);    }}

测试

总结

当然这是官网最简略的例子,当前如果我的项目是基于配置来做的话要把握以下:

  1. pom中援用jar
  2. 先配置rabbitmq的配置

    1. 先配置ConnectionFactory
    2. 配置RabbitAmdmin
  3. 配置RabbitTemplate这里通常在配置一个Message Convert应用JSON进行数据格式的传输
  4. 配置Exchange
  5. 配置Queue
  6. 配置一个音讯解决的bean或者通过Spring扫描,这个Bean最初继承MessageListener 来解决JSON数据
  7. 配置Listener Container

ion.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class {

@Autowiredprivate Sender sender;@Testpublic void testSend(){    sender.send();}

}

### 编写消费者pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent>    <groupId>com.xxxx</groupId>    <artifactId>spring-rabbitmq</artifactId>    <version>0.0.1-SNAPSHOT</version></parent><groupId>com.xxxx</groupId><artifactId>rabbitmq-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-consumer</name><description>Demo project for Spring Boot</description><properties>    <java.version>1.8</java.version></properties><dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>    </dependency></dependencies>

</project>

application.yml

spring:
rabbitmq:

host: 127.0.0.1port: 5672username: shoppassword: shopvirtual-host: /shop

server:
port: 8082

Consumer.java

package com.xxxx.rabbitmqconsumer.revc;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
//监听队列
@RabbitListener(queues = "topics")
public class Consumer {

//示意接管音讯后的解决办法@RabbitHandlerpublic void recv(String message){    System.out.println("接管音讯:"+message);}

}

### 测试![](https://img-blog.csdnimg.cn/20201224151341348.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0Jsb2dfcm9va2ll,size_16,color_FFFFFF,t_70#pic_center)![](https://img-blog.csdnimg.cn/20201224151400394.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0Jsb2dfcm9va2ll,size_16,color_FFFFFF,t_70#pic_center)### 总结当然这是官网最简略的例子,当前如果我的项目是基于配置来做的话要把握以下:1. pom中援用jar2. 先配置rabbitmq的配置   1. 先配置ConnectionFactory   2. 配置RabbitAmdmin3. 配置RabbitTemplate这里通常在配置一个Message Convert应用JSON进行数据格式的传输4. 配置Exchange5. 配置Queue6. 配置一个音讯解决的bean或者通过Spring扫描,这个Bean最初继承MessageListener 来解决JSON数据