RabbitMQ 音讯的事务机制
在应用 RabbitMQ 的时候,咱们能够通过音讯长久化操作来解决因为服务器的异样奔溃导致的音讯失落,除此之外咱们还会遇到一个问题,当音讯的发布者在将音讯发送进来之后,音讯到底有没有正确达到 broker 代理服务器呢?如果不进行非凡配置的话,默认状况下公布操作是不会返回任何信息给生产者的,也就是默认状况下咱们的生产者是不晓得音讯有没有正确达到 broker 的,如果在音讯达到 broker 之前曾经失落的话,长久化操作也解决不了这个问题,因为音讯基本就没达到代理服务器,你怎么进行长久化,那么这个问题该怎么解决呢?
RabbitMQ 为咱们提供了两种形式:
- 通过 AMQP 事务机制实现,这也是 AMQP 协定层面提供的解决方案;
- 通过将 channel 设置成 confirm 模式来实现;
AMQP 事物机制管制
RabbitMQ 中与事务机制无关的办法有三个:txSelect()
, txCommit()
以及 txRollback(),
txSelect()
用于将以后 channel 设置成 transaction 模式,txCommit()
用于提交事务,txRollback()
用于回滚事务,在通过 txSelect()
开启事务之后,咱们便能够公布音讯给 broker 代理服务器了,如果 txCommit()
提交胜利了,则音讯肯定达到了 broker 了,如果在 txCommit()
执行之前 broker 异样解体或者因为其余起因抛出异样,这个时候咱们便能够捕捉异样通过 txRollback()
回滚事务。
SendTx.java
try {
// 通过工厂创立连贯
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启事务
channel.txSelect();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创立音讯
String message = "Hello World!";
// 将产生的音讯放入队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("[x] Sent'" + message + "'");
// 模拟程序异样
int i = 1 / 0;
// 提交事务
channel.txCommit();} catch (IOException | TimeoutException e) {e.printStackTrace();
try {
// 回滚事务
channel.txRollback();} catch (IOException e1) {e1.printStackTrace();
}
}
事务的确可能解决 producer 与 broker 之间音讯确认的问题,只有音讯胜利被 broker 承受,事务提交能力胜利,否则咱们便能够在捕捉异样进行事务回滚操作同时进行音讯重发,然而应用事务机制的话会升高 RabbitMQ 的性能,那么有没有更好的办法既能保障 producer 晓得音讯曾经正确送到,又能基本上不带来性能上的损失呢?从 AMQP 协定的层面看是没有更好的办法,然而 RabbitMQ 提供了一个更好的计划,行将 channel 信道设置成 confirm 模式。
confirm 确认模式
通过 AMQP 协定层面为咱们提供了事务机制解决了这个问题,然而采纳事务机制实现会升高 RabbitMQ 的音讯吞吐量,此时解决 AMQP 协定层面可能实现音讯事物管制外,咱们还有第二种形式即:Confirm 模式。
Confirm 确认模式原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道下面公布的音讯都会被指派一个惟一的 ID(从 1 开始),一旦音讯被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(蕴含音讯的惟一 ID), 这就使得生产者晓得音讯曾经正确达到目标队列了,如果音讯和队列是可长久化的,那么确认音讯会将音讯写入磁盘之后收回,broker 回传给生产者的确认音讯中 deliver-tag 域蕴含了确认音讯的序列号,此外 broker 也能够设置 basic.ack 的 multiple 域,示意到这个序列号之前的所有音讯都曾经失去了解决。
confirm 模式最大的益处在于他是异步的,一旦公布一条音讯,生产者应用程序就能够在等信道返回确认的同时持续发送下一条音讯,当音讯最终失去确认之后,生产者利用便能够通过回调办法来解决该确认音讯,如果 RabbitMQ 因为本身外部谬误导致音讯失落,就会发送一条 nack 音讯,生产者应用程序同样能够在回调办法中解决该 nack 音讯。
在 channel 被设置成 confirm 模式之后,所有被 publish 的后续音讯都将被 confirm(即 ack)或者被 nack 一次。然而没有对音讯被 confirm 的快慢做任何保障,并且同一条音讯不会既被 confirm 又被 nack。
留神: 两种事物管制模式不能同时开启!
Confirm 确认机制代码实现
实现生产者 confirm 机制有三种形式:
- 一般 confirm 模式:每发送一条音讯后,调用 waitForConfirms()办法,期待服务器端 confirm。实际上是一种串行 confirm 了。
- 批量 confirm 模式:每发送一批音讯后,调用 waitForConfirmsOrDie()办法,期待服务器端 confirm。
- 异步 confirm 模式:提供一个回调办法,服务端 confirm 了一条或者多条音讯后 Client 端会回调这个办法。
同步 Confirm
SendConfirmSync.java
try {
// 通过工厂创立连贯
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启 confirm 确认模式
channel.confirmSelect();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创立音讯
String message = "Hello World!";
// 将产生的音讯放入队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("[x] Sent'" + message + "'");
// 确认音讯是否发送胜利 - 单条
if (channel.waitForConfirms())
System.out.println("音讯发送胜利!");
else
System.out.println("音讯发送失败!");
// 确认音讯是否发送胜利 - 多条
// 直到所有音讯都确认,只有有一个未确认就会 IOException
channel.waitForConfirmsOrDie();
System.out.println("音讯发送胜利!");
}
以上代码能够看出,应用同步的形式须要等所有的音讯发送胜利当前才会执行前面代码,只有有一个音讯未被确认就会抛出 IO 异样。解决办法能够应用异步确认。
异步 confirm
异步 confirm 模式的编程实现最简单,Channel 对象提供的 ConfirmListener()
回调办法只蕴含 deliveryTag
(以后 Chanel 收回的音讯序号),咱们须要本人为每一个 Channel 保护一个unconfirm
的音讯序号汇合,每 publish 一条数据,汇合中元素加 1,每回调一次 handleAck
办法,unconfirm
汇合删掉相应的一条 (multiple=false)
或多条 (multiple=true)
记录。从程序运行效率上看,这个 unconfirm
汇合最好采纳有序汇合 SortedSet 存储构造。实际上,waitForConfirms()
办法也是通过 SortedSet 保护音讯序号的。
SendConfirmAsync.java
package com.xxxx.confirm.async.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
* 信道确认模式 - 异步 - 生产者
*/
public class Send {
// 队列名称
public static final String QUEUE_NAME = "confirm_async";
public static void main(String[] args) {
// 定义连贯工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setHost("192.168.10.100");
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
try {
// 保护信息发送回执 deliveryTag
final SortedSet<Long> confirmSet=Collections.synchronizedSortedSet(new TreeSet<Long>());
// 创立连贯
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启 confirm 确认模式
channel.confirmSelect();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 增加 channel 监听
channel.addConfirmListener(new ConfirmListener() {
// 已确认
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// multiple=true 已确认多条 false 已确认单条
if (multiple) {System.out.println("handleAck--success-->multiple" + deliveryTag);
// 革除前 deliveryTag 项标识 id
confirmSet.headSet(deliveryTag + 1L).clear();} else {System.out.println("handleAck--success-->single" + deliveryTag);
confirmSet.remove(deliveryTag);
}
}
// 未确认
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// multiple=true 未确认多条 false 未确认单条
if (multiple) {System.out.println("handleNack--failed-->multiple-->" + deliveryTag);
// 革除前 deliveryTag 项标识 id
confirmSet.headSet(deliveryTag + 1L).clear();} else {System.out.println("handleNack--failed-->single" + deliveryTag);
confirmSet.remove(deliveryTag);
}
}
});
// 循环发送音讯演示音讯确认
while (true) {
// 创立音讯
String message = "Hello World!";
// 获取 unconfirm 的音讯序号 deliveryTag
Long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
// 将音讯序号 deliveryTag 增加至 SortedSet
confirmSet.add(seqNo);
}
} catch (IOException | TimeoutException e) {e.printStackTrace();
} finally {
try {
// 敞开通道
if (null != channel && channel.isOpen())
channel.close();
// 敞开连贯
if (null != connection && connection.isOpen())
connection.close();} catch (TimeoutException e) {e.printStackTrace();
} catch (IOException e) {e.printStackTrace();
}
}
}
}
异步模式的长处就是执行效率高,不须要期待音讯执行完,只须要监听音讯即可。
Spring 集成 RabbitMQ
官网:https://spring.io/projects/sp…
为什么应用 spring AMQP?
- 基于 Spring 之上社区沉闷
- 对 AMQP 协定进行了高度的封装
- 极大的简化了 RabbitMQ 的操作
- 易用性、可扩大
创立聚合我的项目
创立父我的项目 spring-rabbitmq
创立 rabbitmq-provider
鼠标右键 spring-rabbitmq 我的项目 new -> Module
创立 rabbitmq-consumer
鼠标右键 spring-rabbitmq 我的项目 new -> Module
父我的项目依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modules>
<module>rabbitmq-provider</module>
<module>rabbitmq-consumer</module>
</modules>
<groupId>com.xxxx</groupId>
<artifactId>spring-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>srping-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
</project>
编写生产者
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xxxx</groupId>
<artifactId>spring-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.xxxx</groupId>
<artifactId>rabbitmq-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-provider</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: shop
password: shop
virtual-host: /shop
server:
port: 8081
RabbitmqConfig.java
package com.xxxx.rabbitmqprovider.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
/**
* 申明队列
* @return
*/
@Bean
public Queue queue(){return new Queue("topics");
}
/**
* 申明交换机(主题模式)* @return
*/
@Bean
public TopicExchange topicExchange(){return new TopicExchange("topicExchange");
}
/**
* 将队列绑定到交换机
* @return
*/
@Bean
public Binding binding(){return BindingBuilder.bind(queue()).to(topicExchange()).with("topic.msg");
}
}
Send.java
package com.xxxx.rabbitmqprovider.send;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String message = "Hello World!";
/**
* 第一个参数:交换机名称
* 第二个参数:路由 key 名称
* 第三个参数:发送的音讯
*/
rabbitTemplate.convertAndSend("topicExchange", "topic.msg", message);
System.out.println("发送:" + message);
}
}
RabbitmqProviderTestApplication.java
package com.xxxx.rabbitmqprovider;
import com.xxxx.rabbitmqprovider.send.Sender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class {
@Autowired
private Sender sender;
@Test
public void testSend(){sender.send();
}
}
编写消费者
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xxxx</groupId>
<artifactId>spring-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.xxxx</groupId>
<artifactId>rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: shop
password: shop
virtual-host: /shop
server:
port: 8082
Consumer.java
package com.xxxx.rabbitmqconsumer.revc;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听队列
@RabbitListener(queues = "topics")
public class Consumer {
// 示意接管音讯后的解决办法
@RabbitHandler
public void recv(String message){System.out.println("接管音讯:"+message);
}
}
测试
总结
当然这是官网最简略的例子,当前如果我的项目是基于配置来做的话要把握以下:
- pom 中援用 jar
-
先配置 rabbitmq 的配置
- 先配置 ConnectionFactory
- 配置 RabbitAmdmin
- 配置 RabbitTemplate 这里通常在配置一个 Message Convert 应用 JSON 进行数据格式的传输
- 配置 Exchange
- 配置 Queue
- 配置一个音讯解决的 bean 或者通过 Spring 扫描,这个 Bean 最初继承 MessageListener 来解决 JSON 数据
- 配置 Listener Container
ion.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class {
@Autowired
private Sender sender;
@Test
public void testSend(){sender.send();
}
}
### 编写消费者
pom.xml
<?xml version=”1.0″ encoding=”UTF-8″?>
<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xxxx</groupId>
<artifactId>spring-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.xxxx</groupId>
<artifactId>rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: shop
password: shop
virtual-host: /shop
server:
port: 8082
Consumer.java
package com.xxxx.rabbitmqconsumer.revc;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听队列
@RabbitListener(queues = “topics”)
public class Consumer {
// 示意接管音讯后的解决办法
@RabbitHandler
public void recv(String message){System.out.println("接管音讯:"+message);
}
}
### 测试
![](https://img-blog.csdnimg.cn/20201224151341348.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0Jsb2dfcm9va2ll,size_16,color_FFFFFF,t_70#pic_center)
![](https://img-blog.csdnimg.cn/20201224151400394.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0Jsb2dfcm9va2ll,size_16,color_FFFFFF,t_70#pic_center)
### 总结
当然这是官网最简略的例子,当前如果我的项目是基于配置来做的话要把握以下:1. pom 中援用 jar
2. 先配置 rabbitmq 的配置
1. 先配置 ConnectionFactory
2. 配置 RabbitAmdmin
3. 配置 RabbitTemplate 这里通常在配置一个 Message Convert 应用 JSON 进行数据格式的传输
4. 配置 Exchange
5. 配置 Queue
6. 配置一个音讯解决的 bean 或者通过 Spring 扫描,这个 Bean 最初继承 MessageListener 来解决 JSON 数据