RabbitMQ – 队列中提到,接收消息的时候,有两个方式,一个是 consume,一个是 get,这两个方法都有一个 autoAck 的参数。当我们设置为 true 的时候,说明消费者会通过 AMQP 显示的向 rabbitmq 发送一个确认,rabbitmq 自动视其确认了消息,然后把消息从队列中删除。下面用 consume 的方式做些例子来理解 autoAck 的参数设置。
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
不确认
先往 ack 队列发送 5 条数据,可以看到 ready 是 5,total 是 5。
运行以下代码,autoAck 设置为 false,且不对消息确认。
public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与 rabbitmq 服务器的连接
Connection connection = factory.newConnection();
// 创建一个 Channel
Channel channel = connection.createChannel();
// 通过 Channel 定义队列
channel.queueDeclare("ack", false, false, false, null);
// 异步回调处理
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("ack Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 接收消息
channel.basicConsume("ack", false, deliverCallback, consumerTag -> {});
}
运行结果如下,打印了 5 条消息:
在 web 控制台可以看出,ready 是 0,unacked 是 5,即未确认的消息数是 5 条。
把应用停止掉,即关闭消费者和 rabbitmq 的连接,web 控制台如下,unacked 的 5 条数据回到了 ready。
综上,当 autoAck 为 false 时,消息分为两个部分,一个是未投放放消费者的(ready),一个是投放给消费者但未确认的。如果未确认信息的消费者断开了连接,这部分消息会回到 ready 重新投递给消费者,确保了消息的可靠性。需要注意的是,如果消费者一直没有断开连接,也没有进行确认,那这个消息会一直等待确认中。
确认
确认有两种,一个是自动确认,一个是手动确认。自动确认的话,就是把 autoAck 设置为 true。
当 rabbitmq 向消费者传递消息的时候,会带有一个 deliveryTag 的传递标识,标记惟一地标识信道上的传递,交付标记的作用域是每个信道,所以必须在接收消息的信道上进行确认。交付标记是递增的正整数,所以我们看到是 1,2,3 这样的递增数字。
上面例子中,有注释了一行代码,就是用来手动确认的,第一个参数就是传递标记,第二个参数是是否批量确认。
上面的打印结果输出了 deliveryTag 的值,从 1 到 5。把注释删了再运行后,可以看到 rabbitmq 把确认后的消息删除。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
批量确认
basicAck 方法中,有个参数是 multiple,是用来批量确认的。当设置为 true 的时候,RabbitMQ 将确认所有未完成的传递标记,包括确认中指定的标记。与其他与确认相关的内容一样,这是按每个信道确定范围的。比如收到标记为 1、2 的没确认,标记为 3 的确认,那前面两个也会一起确认。
我们看看下面的例子,每处理三个消息确认一次。
public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与 rabbitmq 服务器的连接
Connection connection = factory.newConnection();
// 创建一个 Channel
Channel channel = connection.createChannel();
// 通过 Channel 定义队列
channel.queueDeclare("multiple", false, false, false, null);
// 异步回调处理
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("multiple Received'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
}
};
// 接收消息
channel.basicConsume("multiple", false, deliverCallback, consumerTag -> {});
}
启动后,发送两个消息,可以看到打印了两次
控制台显示未确认 2 个
再发送一条数据,打印了第三个
web 控制台看出,已经确认并删除了队列的消息
使用哪种确认方式
自动确认,这种模式通常被称为“发了就忘”,当消费端处理异常时,则服务器发送的消息将得不到正确的处理。因此,自动消息确认应该被认为是不安全的。
手动确认模式中,通常使用消息预取,它限制了通道上未完成 (“进行中”) 交付的数量。但是,对于自动确认,没有这样的限制,所以消费者有可能由于处理消息太慢,导致内存积压、堆耗尽,导致程序无法运行。因此,自动确认模式仅适用于能够高效、稳定地处理配送的消费者。