共计 4297 个字符,预计需要花费 11 分钟才能阅读完成。
作者:海向 \
出处:www.cnblogs.com/haixiang/p/10905189.html
1. 为什么要对生产端限流
假如一个场景,首先,咱们 Rabbitmq 服务器积压了有上万条未解决的音讯,咱们轻易关上一个消费者客户端,会呈现这样状况: 巨量的音讯霎时全副推送过去,然而咱们单个客户端无奈同时解决这么多数据!
当数据量特地大的时候,咱们对生产端限流必定是不迷信的,因为有时候并发量就是特地大,有时候并发量又特地少,咱们无奈束缚生产端,这是用户的行为。所以咱们应该对生产端限流,用于放弃生产端的稳固,当音讯数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至间接解体。
2. 限流的 API 解说
RabbitMQ 提供了一种 qos(服务质量保障)性能,即在非主动确认音讯的前提下,如果肯定数目的音讯(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行生产新的音讯。
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
- prefetchSize:0,单条音讯大小限度,0 代表不限度
- prefetchCount:一次性生产的音讯数量。会通知 RabbitMQ 不要同时给一个消费者推送多于 N 个音讯,即一旦有 N 个音讯还没有 ack,则该 consumer 将 block 掉,直到有音讯 ack。
- global:true、false 是否将下面设置利用于 channel,简略点说,就是下面限度是 channel 级别的还是 consumer 级别。当咱们设置为 false 的时候失效,设置为 true 的时候没有了限流性能,因为 channel 级别尚未实现。
- 留神:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不钻研。特地留神一点,prefetchCount 在 no_ask=false 的状况下才失效,即在自动应答的状况下这两个值是不失效的。
3. 如何对生产端进行限流
- 首先第一步,咱们既然要应用生产端限流,咱们须要敞开主动 ack,将 autoAck 设置为 false
channel.basicConsume(queueName, false, consumer);
- 第二步咱们来设置具体的限流大小以及数量。
channel.basicQos(0, 15, false);
- 第三步在消费者的 handleDelivery 生产办法中手动 ack,并且设置批量解决 ack 回应为 true
channel.basicAck(envelope.getDeliveryTag(), true);
这是生产端代码,与前几章的生产端代码没有做任何扭转,次要的操作集中在生产端。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {public static void main(String[] args) throws Exception {
//1. 创立一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通过连贯工厂来创立连贯
Connection connection = factory.newConnection();
//3. 通过 Connection 来创立 Channel
Channel channel = connection.createChannel();
//4. 申明
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";
//5. 发送
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + ":" + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message :" + tem);
}
//6. 敞开连贯
channel.close();
connection.close();}
}
这里咱们创立一个消费者,通过以下代码来验证限流成果以及 global
参数设置为 true
时不起作用.。咱们通过Thread.sleep(5000);
来让 ack 即解决音讯的过程慢一些,这样咱们就能够从后盾管理工具中清晰察看到限流状况。
import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {public static void main(String[] args) throws Exception {
//1. 创立一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通过连贯工厂来创立连贯
Connection connection = factory.newConnection();
//3. 通过 Connection 来创立 Channel
final Channel channel = connection.createChannel();
//4. 申明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(0, 3, false);
// 个别不必代码绑定,在治理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 创立消费者并接管音讯
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {Thread.sleep(5000);
} catch (InterruptedException e) {e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("[x] Received'" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(queueName, false, consumer1);
}
}
咱们从下图中发现 Unacked
值始终都是 3,每过 5 秒 生产一条音讯即 Ready 和 Total 都缩小 3,而 Unacked
的值在这里代表消费者正在解决的音讯,通过咱们的试验发现了消费者一次性最多解决 3 条音讯,达到了消费者限流的预期性能。
当咱们将 void basicQos(int prefetchSize, int prefetchCount, boolean global)
中的 global 设置为 true
的时候咱们发现并没有了限流的作用。
近期热文举荐:
1.1,000+ 道 Java 面试题及答案整顿(2022 最新版)
2. 劲爆!Java 协程要来了。。。
3.Spring Boot 2.x 教程,太全了!
4.Spring Boot 2.6 正式公布,一大波新个性。。
5.《Java 开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞 + 转发哦!