关于java:RabbitMQ-如何对消费端限流

33次阅读

共计 4297 个字符,预计需要花费 11 分钟才能阅读完成。

作者:海向 \
出处:www.cnblogs.com/haixiang/p/10905189.html

1. 为什么要对生产端限流

假如一个场景,首先,咱们 Rabbitmq 服务器积压了有上万条未解决的音讯,咱们轻易关上一个消费者客户端,会呈现这样状况: 巨量的音讯霎时全副推送过去,然而咱们单个客户端无奈同时解决这么多数据!

当数据量特地大的时候,咱们对生产端限流必定是不迷信的,因为有时候并发量就是特地大,有时候并发量又特地少,咱们无奈束缚生产端,这是用户的行为。所以咱们应该对生产端限流,用于放弃生产端的稳固,当音讯数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至间接解体。

2. 限流的 API 解说

RabbitMQ 提供了一种 qos(服务质量保障)性能,即在非主动确认音讯的前提下,如果肯定数目的音讯(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行生产新的音讯。

/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • prefetchSize:0,单条音讯大小限度,0 代表不限度
  • prefetchCount:一次性生产的音讯数量。会通知 RabbitMQ 不要同时给一个消费者推送多于 N 个音讯,即一旦有 N 个音讯还没有 ack,则该 consumer 将 block 掉,直到有音讯 ack。
  • global:true、false 是否将下面设置利用于 channel,简略点说,就是下面限度是 channel 级别的还是 consumer 级别。当咱们设置为 false 的时候失效,设置为 true 的时候没有了限流性能,因为 channel 级别尚未实现。
  • 留神:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不钻研。特地留神一点,prefetchCount 在 no_ask=false 的状况下才失效,即在自动应答的状况下这两个值是不失效的。

3. 如何对生产端进行限流

  • 首先第一步,咱们既然要应用生产端限流,咱们须要敞开主动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步咱们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
  • 第三步在消费者的 handleDelivery 生产办法中手动 ack,并且设置批量解决 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

这是生产端代码,与前几章的生产端代码没有做任何扭转,次要的操作集中在生产端。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class QosProducer {public static void main(String[] args) throws Exception {
        //1. 创立一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2. 通过连贯工厂来创立连贯
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创立 Channel
        Channel channel = connection.createChannel();

        //4. 申明
        String exchangeName = "test_qos_exchange";
        String routingKey = "item.add";

        //5. 发送
        String msg = "this is qos msg";
        for (int i = 0; i < 10; i++) {
            String tem = msg + ":" + i;
            channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
            System.out.println("Send message :" + tem);
        }

        //6. 敞开连贯
        channel.close();
        connection.close();}
}

这里咱们创立一个消费者,通过以下代码来验证限流成果以及 global 参数设置为 true 时不起作用.。咱们通过Thread.sleep(5000); 来让 ack 即解决音讯的过程慢一些,这样咱们就能够从后盾管理工具中清晰察看到限流状况。

import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {public static void main(String[] args) throws Exception {
        //1. 创立一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. 通过连贯工厂来创立连贯
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创立 Channel
        final Channel channel = connection.createChannel();

        //4. 申明
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);

        channel.basicQos(0, 3, false);

        // 个别不必代码绑定,在治理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        //5. 创立消费者并接管音讯
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {Thread.sleep(5000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                System.out.println("[x] Received'" + message + "'");

                channel.basicAck(envelope.getDeliveryTag(), true);
            }
        };
        //6. 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, false, consumer);
        channel.basicConsume(queueName, false, consumer1);
    }
}

咱们从下图中发现 Unacked值始终都是 3,每过 5 秒 生产一条音讯即 Ready 和 Total 都缩小 3,而 Unacked的值在这里代表消费者正在解决的音讯,通过咱们的试验发现了消费者一次性最多解决 3 条音讯,达到了消费者限流的预期性能。

当咱们将 void basicQos(int prefetchSize, int prefetchCount, boolean global) 中的 global 设置为 true的时候咱们发现并没有了限流的作用。

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿(2022 最新版)

2. 劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4.Spring Boot 2.6 正式公布,一大波新个性。。

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0