作者:海向\
出处:www.cnblogs.com/haixiang/p/10905189.html

1. 为什么要对生产端限流

假如一个场景,首先,咱们 Rabbitmq 服务器积压了有上万条未解决的音讯,咱们轻易关上一个消费者客户端,会呈现这样状况: 巨量的音讯霎时全副推送过去,然而咱们单个客户端无奈同时解决这么多数据!

当数据量特地大的时候,咱们对生产端限流必定是不迷信的,因为有时候并发量就是特地大,有时候并发量又特地少,咱们无奈束缚生产端,这是用户的行为。所以咱们应该对生产端限流,用于放弃生产端的稳固,当音讯数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至间接解体。

2.限流的 API 解说

RabbitMQ 提供了一种 qos (服务质量保障)性能,即在非主动确认音讯的前提下,如果肯定数目的音讯(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行生产新的音讯。

/*** Request specific "quality of service" settings.* These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • prefetchSize:0,单条音讯大小限度,0代表不限度
  • prefetchCount:一次性生产的音讯数量。会通知 RabbitMQ 不要同时给一个消费者推送多于 N 个音讯,即一旦有 N 个音讯还没有 ack,则该 consumer 将 block 掉,直到有音讯 ack。
  • global:true、false 是否将下面设置利用于 channel,简略点说,就是下面限度是 channel 级别的还是 consumer 级别。当咱们设置为 false 的时候失效,设置为 true 的时候没有了限流性能,因为 channel 级别尚未实现。
  • 留神:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不钻研。特地留神一点,prefetchCount 在 no_ask=false 的状况下才失效,即在自动应答的状况下这两个值是不失效的。

3.如何对生产端进行限流

  • 首先第一步,咱们既然要应用生产端限流,咱们须要敞开主动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步咱们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
  • 第三步在消费者的 handleDelivery 生产办法中手动 ack,并且设置批量解决 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

这是生产端代码,与前几章的生产端代码没有做任何扭转,次要的操作集中在生产端。

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class QosProducer {    public static void main(String[] args) throws Exception {        //1. 创立一个 ConnectionFactory 并进行设置        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        factory.setVirtualHost("/");        factory.setUsername("guest");        factory.setPassword("guest");        //2. 通过连贯工厂来创立连贯        Connection connection = factory.newConnection();        //3. 通过 Connection 来创立 Channel        Channel channel = connection.createChannel();        //4. 申明        String exchangeName = "test_qos_exchange";        String routingKey = "item.add";        //5. 发送        String msg = "this is qos msg";        for (int i = 0; i < 10; i++) {            String tem = msg + " : " + i;            channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());            System.out.println("Send message : " + tem);        }        //6. 敞开连贯        channel.close();        connection.close();    }}

这里咱们创立一个消费者,通过以下代码来验证限流成果以及 global 参数设置为 true 时不起作用.。咱们通过Thread.sleep(5000); 来让 ack 即解决音讯的过程慢一些,这样咱们就能够从后盾管理工具中清晰察看到限流状况。

import com.rabbitmq.client.*;import java.io.IOException;public class QosConsumer {    public static void main(String[] args) throws Exception {        //1. 创立一个 ConnectionFactory 并进行设置        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        factory.setVirtualHost("/");        factory.setUsername("guest");        factory.setPassword("guest");        factory.setAutomaticRecoveryEnabled(true);        factory.setNetworkRecoveryInterval(3000);        //2. 通过连贯工厂来创立连贯        Connection connection = factory.newConnection();        //3. 通过 Connection 来创立 Channel        final Channel channel = connection.createChannel();        //4. 申明        String exchangeName = "test_qos_exchange";        String queueName = "test_qos_queue";        String routingKey = "item.#";        channel.exchangeDeclare(exchangeName, "topic", true, false, null);        channel.queueDeclare(queueName, true, false, false, null);        channel.basicQos(0, 3, false);        //个别不必代码绑定,在治理界面手动绑定        channel.queueBind(queueName, exchangeName, routingKey);        //5. 创立消费者并接管音讯        Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                String message = new String(body, "UTF-8");                System.out.println("[x] Received '" + message + "'");                channel.basicAck(envelope.getDeliveryTag(), true);            }        };        //6. 设置 Channel 消费者绑定队列        channel.basicConsume(queueName, false, consumer);        channel.basicConsume(queueName, false, consumer1);    }}

咱们从下图中发现 Unacked值始终都是 3 ,每过 5 秒 生产一条音讯即 Ready 和 Total 都缩小 3,而 Unacked的值在这里代表消费者正在解决的音讯,通过咱们的试验发现了消费者一次性最多解决 3 条音讯,达到了消费者限流的预期性能。

当咱们将void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 设置为 true的时候咱们发现并没有了限流的作用。

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2022最新版)

2.劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4.Spring Boot 2.6 正式公布,一大波新个性。。

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!