共计 2297 个字符,预计需要花费 6 分钟才能阅读完成。
RabbitMQ – 消息确认中提到了当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和 rabbitmq 的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加 rabbitmq 的负担。rabbitmq 提供了另外一种优雅的方式来拒绝消息,方法如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException
第一个参数 deliveryTag,消息确认中提过,传递标识。第二个参数 requeue,为 true 的话,消息会重新发给下一个消费者,为 false 的话,就不发给消费者,相当于说,消息我确认了。
重新投递
重新投递,就是 requeue 为 true 的情况。
public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与 rabbitmq 服务器的连接
Connection connection = factory.newConnection();
// 创建一个 Channel
Channel channel = connection.createChannel();
// 通过 Channel 定义队列
channel.queueDeclare("reject", false, false, false, null);
// 异步回调处理
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("reject message'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
};
// 接收消息
channel.basicConsume("reject", false, deliverCallback1, consumerTag -> {});
}
我们通过 web 控制台发送一条消息,在控制台打印如下,由于当前消费者把消息拒绝了,所以 rabbitmq 重新投递,又发给这个消费者,消费者又拒绝,所以一直打印,相对于死循环了。从 deliveryTag 可以看出,这条消息每次重新投递,就会递增。
消费者端口连接,web 控制有一条消息
不重新投递
不重新投递,就是 requeue 为 false 的情况。把上面的代码 requeue 设置为 false 再运行一次。
从控制台可以看出,就打印了一次。
而 web 控制台,消息数是 0
批量拒绝
rabbitmq 提供了批量确认,也提供了批量拒绝。方法如下,deliveryTag 是传递标识,multiple 是否批量确认,requeue 是否重新投递。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
当 multiple 为 true 的时候,开启批量确认,我们看看下面的例子。
public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与 rabbitmq 服务器的连接
Connection connection = factory.newConnection();
// 创建一个 Channel
Channel channel = connection.createChannel();
// 通过 Channel 定义队列
channel.queueDeclare("reject", false, false, false, null);
// 异步回调处理
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("reject message'" + message + "'" + delivery.getEnvelope().getDeliveryTag());
if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), true, false);
}
};
// 接收消息
channel.basicConsume("reject", false, deliverCallback1, consumerTag -> {});
}
先发送两条消息
web 控制台如下:
再发送一条消息
web 控制台
虽然我们只拒绝了第三条,还是把所有的都拒绝了。
如果把 multiple 设置为 false,可以看到,仅仅拒绝了第三条消息,这边不做演示。