RabbitMQ-消息确认

8次阅读

共计 2858 个字符,预计需要花费 8 分钟才能阅读完成。

RabbitMQ – 队列中提到,接收消息的时候,有两个方式,一个是 consume,一个是 get,这两个方法都有一个 autoAck 的参数。当我们设置为 true 的时候,说明消费者会通过 AMQP 显示的向 rabbitmq 发送一个确认,rabbitmq 自动视其确认了消息,然后把消息从队列中删除。下面用 consume 的方式做些例子来理解 autoAck 的参数设置。

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

不确认

先往 ack 队列发送 5 条数据,可以看到 ready 是 5,total 是 5。

运行以下代码,autoAck 设置为 false,且不对消息确认。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("ack", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ack Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };
    // 接收消息
    channel.basicConsume("ack", false, deliverCallback, consumerTag -> {});
}

运行结果如下,打印了 5 条消息:

在 web 控制台可以看出,ready 是 0,unacked 是 5,即未确认的消息数是 5 条。

把应用停止掉,即关闭消费者和 rabbitmq 的连接,web 控制台如下,unacked 的 5 条数据回到了 ready。

综上,当 autoAck 为 false 时,消息分为两个部分,一个是未投放放消费者的(ready),一个是投放给消费者但未确认的。如果未确认信息的消费者断开了连接,这部分消息会回到 ready 重新投递给消费者,确保了消息的可靠性。需要注意的是,如果消费者一直没有断开连接,也没有进行确认,那这个消息会一直等待确认中。

确认

确认有两种,一个是自动确认,一个是手动确认。自动确认的话,就是把 autoAck 设置为 true。
当 rabbitmq 向消费者传递消息的时候,会带有一个 deliveryTag 的传递标识,标记惟一地标识信道上的传递,交付标记的作用域是每个信道,所以必须在接收消息的信道上进行确认。交付标记是递增的正整数,所以我们看到是 1,2,3 这样的递增数字。
上面例子中,有注释了一行代码,就是用来手动确认的,第一个参数就是传递标记,第二个参数是是否批量确认。
上面的打印结果输出了 deliveryTag 的值,从 1 到 5。把注释删了再运行后,可以看到 rabbitmq 把确认后的消息删除。

void basicAck(long deliveryTag, boolean multiple) throws IOException;

批量确认

basicAck 方法中,有个参数是 multiple,是用来批量确认的。当设置为 true 的时候,RabbitMQ 将确认所有未完成的传递标记,包括确认中指定的标记。与其他与确认相关的内容一样,这是按每个信道确定范围的。比如收到标记为 1、2 的没确认,标记为 3 的确认,那前面两个也会一起确认。
我们看看下面的例子,每处理三个消息确认一次。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("multiple", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("multiple Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
        }
    };
    // 接收消息
    channel.basicConsume("multiple", false, deliverCallback, consumerTag -> {});
}

启动后,发送两个消息,可以看到打印了两次

控制台显示未确认 2 个

再发送一条数据,打印了第三个

web 控制台看出,已经确认并删除了队列的消息

使用哪种确认方式

自动确认,这种模式通常被称为“发了就忘”,当消费端处理异常时,则服务器发送的消息将得不到正确的处理。因此,自动消息确认应该被认为是不安全的。
手动确认模式中,通常使用消息预取,它限制了通道上未完成 (“进行中”) 交付的数量。但是,对于自动确认,没有这样的限制,所以消费者有可能由于处理消息太慢,导致内存积压、堆耗尽,导致程序无法运行。因此,自动确认模式仅适用于能够高效、稳定地处理配送的消费者。

正文完
 0