RabbitMQ-消息预取

7次阅读

共计 3684 个字符,预计需要花费 10 分钟才能阅读完成。

RabbitMQ – 消息确认这篇文章中,提到了消息预取,避免了 rabbitmq 一直往消费端发送数据,导致消费端出现无限制的缓冲区问题。消息预取定义了信道上或者消费者允许的最大未确认的消息数量。一旦未确认数达到了设置的值,RabbitMQ 将停止传递更多消息,除非至少有一条未完成的消息得到确认。
使用消息预取的时候,会调用 chanel 的 basicQos 方法,prefetchCount 是未确认的消息数,global 默认值为 false,是限制消费者未确认的消息数,设置为 true 的时候,是限制信道上的未确认消息数。

void basicQos(int prefetchCount, boolean global) throws IOException;

消费者限制

global 设置为 false,当每个消费者有 2 个未确认的消息时,不能再发消息给消费者。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("qos", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback1 Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
   /* DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback2 Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };*/
    channel.basicQos(2, false);
    // 接收消息
    channel.basicConsume("qos", false, deliverCallback1, consumerTag -> {});
    /*channel.basicConsume("qos", false, deliverCallback2, consumerTag -> {});*/
}

运行后,往队列发送了 4 条消息,可以看到,未发送(ready)有 2 个,未确认 2 个。

控制台确实只收到了两个消息。

如果把注释放开,同时有两个消费者,可以看到,未发送(ready)有 0 个,未确认 4 个。

控制台结果如下,两个都消费了两个。

信道限制

把上面两个消费者的 global 改为 true,改为信道限制的方式。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("qos", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback1 Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback2 Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    channel.basicQos(2, true);
    // 接收消息
    channel.basicConsume("qos", false, deliverCallback1, consumerTag -> {});
    channel.basicConsume("qos", false, deliverCallback2, consumerTag -> {});
}

可以看到,未发送(ready)有 2 个,未确认 2 个。

每个消费者都只消费了一个。因为此时,信道上未确认的消息数是 2。

混合模式

即设置了信道限制又设置了消费者限制,那结果是怎么样的呢?
先设置消费端只能有 2 个未确认的消息,通道只能有 3 个未确认的消息。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("qos", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback1 Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("deliverCallback2 Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
    };
    channel.basicQos(2, false);
    channel.basicQos(3, true);
    // 接收消息
    channel.basicConsume("qos", false, deliverCallback1, consumerTag -> {});
    channel.basicConsume("qos", false, deliverCallback2, consumerTag -> {});
}

运行后控制台如下,打印了三个消息,说明整个信道就只能有三个未确认的消息,第一个消费者有两个未确认的消息后不再接收,由第二个消费者接收。

web 控制台信息,确实只有 3 个未确认的消息,还有 1 个待发送。

注意,如果换了顺序呢?
把什么的代码

channel.basicQos(2, false);
channel.basicQos(3, true);

换成,先控制信道的未确认的消息是 3 个,再控制消费者未确认的消息是 2 个

channel.basicQos(3, true);
channel.basicQos(2, false);

运行后,控制台如下,每个消费者都 2 个未确认的消息。此时信道的限制不生效了。

正文完
 0