共计 5753 个字符,预计需要花费 15 分钟才能阅读完成。
1.MQ 的 Hellow World 程序
2.MQ 的音讯应答
3.MQ 的长久化
1.MQ 的 Hellow World 程序
依据上一篇文章零碎学习音讯队列——RabbitMQ 的根底概念,咱们学习了 mq 的基础理论,实践诚然重要,实际也尤其重要,咱们先来编写一个 RabbitMq 的 Hello World 程序,来感受一下队列。
在这里咱们创立一个音讯生产者,两个音讯消费者,轮询 进行散发音讯。
pom:
<!--rabbitmq 依赖客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
连贯工具类:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author sulingfeng
* @title: RabbitMqUtils
* @date 2022/6/14 9:33
*/
public class RabbitMqUtils {
// 失去一个连贯的工具类
public static Channel getChannel() throws Exception {
// 创立一个连贯工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
生产者:
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @author sulingfeng
* @title: Producer
* @date 2022/6/13 19:55
*/
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1. 队列名称
* 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
* 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5. 其余参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 从控制台当中承受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){String message = scanner.next();
/**
* 发送一个音讯
* 1. 发送到那个交换机
* 2. 路由的 key 是哪个
* 3. 其余的参数信息
* 4. 发送音讯的音讯体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送音讯实现:"+message);
}
}
}
}
消费者:
import com.rabbitmq.client.*;
/**
* @author sulingfeng
* @title: Consumer
* @date 2022/6/13 20:06
*/
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
System.out.println("期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery)-> {String message = new String(delivery.getBody());
System.out.println(message);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
/**
* 消费者生产音讯
* 1. 生产哪个队列
* 2. 生产胜利之后是否要自动应答 true 代表自动应答 false 手动应答
* 3. 生产胜利的回调
* 4. 消费者未胜利生产的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
成果:
生产者:
消费者 1:
消费者 2:
2.MQ 的音讯应答
咱们构想这样一个问题。Rabbitmq 是如何保障音讯被稳固生产的?假如生产一个音讯要很长一段时间,然而消费者生产到一半就挂掉了,这个时候 Rabbitmq 就失落了这条音讯。
为了解决这样的问题,rabbitmq 引入了音讯应答机制 ,音讯应答就是: 消费者在承受到音讯并且生产结束之后,通知 mq 它曾经解决了,rabbitmq 能够把音讯删除了。
2.1)自动应答
自动应答就是音讯 发送之后就能够被认为发送胜利 ,这种模式须要在高吞吐量和数据传输安全性方面作出衡量,它没有对音讯是否胜利生产,发送音讯数量进行限度,可能会导致音讯失落,音讯大量积压,所以这种模式 仅实用在消费者能够高效并以某种速率解决这些音讯的状况下应用。
2.2)手动应答
每一条音讯都须要消费者手动应答 ,如果消费者 因为某种原因失去连贯 ,导致音讯未发送 ack 确认,Rabbitmq 晓得了生产未被解决, 会将其从新插入队列 ,如果此时 有其它的消费者能够解决,就将音讯发送给另外一个消费者。保障了音讯被确认生产的稳定性,也能够避免消费者音讯的积压。
手动应答代码演示:
生产者:
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
System.out.println("期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery)-> {String message = new String(delivery.getBody());
try {Thread.sleep(5000); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println(message);
/**
* 1. 音讯 tag
* 2. 是否批量应答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),true);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
/**
* 消费者生产音讯
* 1. 生产哪个队列
* 2. 生产胜利之后是否要自动应答 true 代表自动应答 false 手动应答
* 3. 生产胜利的回调
* 4. 消费者未胜利生产的回调
*/
Boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
消费者:
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个队列
* 1. 队列名称
* 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
* 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5. 其余参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 从控制台当中承受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){String message = scanner.next();
/**
* 发送一个音讯
* 1. 发送到那个交换机
* 2. 路由的 key 是哪个
* 3. 其余的参数信息
* 4. 发送音讯的音讯体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送音讯实现:"+message);
}
}
}
}
咱们当初开启两个消费者,而后生产者给消费者发送音讯,等发送了足够多的音讯之后,关掉一个消费者,让其中一个正在生产的音讯失落,看看队列会不会再进行重发。
生产者一共发了 14 条音讯:
消费者 2 生产到一半,进行服务 :
消费者 1 帮消费者 2 生产了消费者 2 未生产完的数据:
音讯确认的 api:
Channel.basicAck(用于必定确认)
RabbitMQ 已晓得该音讯并且胜利的解决音讯,能够将其抛弃了
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,代表不解决该音讯了间接回绝,能够将其抛弃了
咱们在下面编写代码的时候,还发现了一个是否批量应答的标记位:
Multiple 的 true 和 false 代表的含意不同:
true:
代表批量应答
false:
代表单个应答
2.3)音讯的不偏心散发
咱们发现发送音讯的时候,消费者解决的速度 有快有慢 ,在这种场景下, 轮询发送的效率如同不是最优的 ,所以咱们能够进行 不偏心的散发,哪个消费者解决地快,就先发送给哪个消费者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
有了这个值,rabbitmq 就会把该任务分配给没有那么忙的消费者,谁解决地快,谁就生产。
留神,这个配置要在生产者和消费者同时配置。
咱们发送二十条音讯:
消费者 1:
消费者 2:
此时就是依照消费者的能力生产音讯了。
同时咱们也要留神一下,这个值还有一个概念,那就是 预取值:
音讯在发送的时候,实质上是异步生产的,因而消费者这里必定会有一个音讯的缓冲区,开发人员心愿限度该缓冲区的大小,以防止缓冲区外面无限度寄存未确认音讯的问题。这个时候,就能够应用 basic.qos 办法设置 ” 预取值 ”, 该值就定义了容许未确认音讯的最大数量 。例如取值为 4,则 rabbitMQ 会在未确认信息为 4 之后,就不发送信息。 找到一个适合的值是一个重复试探的过程,大略在 100~300 之间,取值为 1 最激进,然而这样吞吐量会变得很低。
3.MQ 的长久化
刚刚咱们解决了消费者如何不失落生产的状况,然而咱们无奈防止 Rabbitmq 自身挂掉的状况,如何保障 Rabbitmq 服务停掉后,自身存储在 Rabbotmq 外面的音讯不失落,咱们须要把队列和音讯都标记为长久化。
3.1)队列如何长久化
咱们之前创立的队列都是非长久化的,所以如果要创立同名的长久化队列,咱们须要把以前的队列删除了才能够。
在申明队列的时候,同时要把 durable 参数设置为 true.
/**
* 申明一个队列
* 1. 队列名称
* 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
* 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
* 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
* 5. 其余参数
*/
Boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
咱们会发现队列曾经长久化了:
3.2)音讯如何长久化
要想让音讯长久化,须要在发送音讯的时候,减少 MessageProperties.PERSISTENT_BASIC 参数。
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,message.getBytes());
System.out.println("发送音讯实现:"+message);
其实将音讯标记为长久化并不能保障齐全不丢音讯,可能 Rabbitmq 在将音讯保留到磁盘的时候,音讯还没存储完,然而队列就挂了,如果须要更无效的长久化机制,能够参考前面的公布确认章节。