乐趣区

RabbitMQ-消息拒绝

RabbitMQ – 消息确认中提到了当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和 rabbitmq 的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加 rabbitmq 的负担。rabbitmq 提供了另外一种优雅的方式来拒绝消息,方法如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException

第一个参数 deliveryTag,消息确认中提过,传递标识。第二个参数 requeue,为 true 的话,消息会重新发给下一个消费者,为 false 的话,就不发给消费者,相当于说,消息我确认了。

重新投递

重新投递,就是 requeue 为 true 的情况。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("reject", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("reject message'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
    };
    // 接收消息
    channel.basicConsume("reject", false, deliverCallback1, consumerTag -> {});
}

我们通过 web 控制台发送一条消息,在控制台打印如下,由于当前消费者把消息拒绝了,所以 rabbitmq 重新投递,又发给这个消费者,消费者又拒绝,所以一直打印,相对于死循环了。从 deliveryTag 可以看出,这条消息每次重新投递,就会递增。

消费者端口连接,web 控制有一条消息

不重新投递

不重新投递,就是 requeue 为 false 的情况。把上面的代码 requeue 设置为 false 再运行一次。
从控制台可以看出,就打印了一次。

而 web 控制台,消息数是 0

批量拒绝

rabbitmq 提供了批量确认,也提供了批量拒绝。方法如下,deliveryTag 是传递标识,multiple 是否批量确认,requeue 是否重新投递。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

当 multiple 为 true 的时候,开启批量确认,我们看看下面的例子。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("reject", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("reject message'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), true, false);
        }
    };
    // 接收消息
    channel.basicConsume("reject", false, deliverCallback1, consumerTag -> {});
}

先发送两条消息

web 控制台如下:

再发送一条消息

web 控制台

虽然我们只拒绝了第三条,还是把所有的都拒绝了。
如果把 multiple 设置为 false,可以看到,仅仅拒绝了第三条消息,这边不做演示。

退出移动版