RabbitMQ-队列

8次阅读

共计 3667 个字符,预计需要花费 10 分钟才能阅读完成。

AMQP 消息路由必须包含三部分,交换器、队列、绑定。如下图所示,生产者把消息发送给交换器,交换器再路由到符合条件的队列上,最终被消费者接收。绑定决定了消息如何从路由器路由到相应的队列。这一篇,主要是了解一下队列。

相关概念

当新增队列的时候,需要定义一下 4 中属性,分布是 Name、Durability、Auto delete、Arguments。

  • Name:队列名称,不能用 amq. 开头命名。
  • Durability:持久化,如果为 durable,那 broker 重启不会丢失,如果为 transient,broker 重启后会丢失。(win 系统仅仅重启 rabbitmq 是不行的)
  • Auto delete:最后一个消费者退订时被自动删除
  • Arguments:队列的其他参数,如上图,比如消息的 TTL 等。

定义一个队列的方法如下,exclusive 的参数,下面的临时队列会说明。

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

命名

我们在使用队列之前,需要先声明队列。如果队列不存在,则创建队列。如果已存在则不会创建,但是和已存在的队列属性不一致,则会有 406 (PRECONDITION_FAILED)的通道级异常。

参数设置

参数的设置有两种,一种是通过分组,一个是一个个队列设置。分组的方式更加灵活、非侵入性,不需要修改和重新部署应用程序,是官方推荐的方式。参数的描述如下:

参数 描述
x-message-ttl 消息的存活时间,单位为毫秒
x-expires 队列的存活时间,单位为毫秒
x-max-length 队列的最大消息数
x-max-length-bytes 消息的最大字节数
x-overflow 消息达到最大数的策略,drop-head 或者 reject-publish
x-dead-letter-exchange 死信队列的交换器
x-dead-letter-routing-key 死信队列的路由键
x-max-priority 消息的优先级设置
x-queue-mode 消息的延迟
x-queue-master-locator 用于主从

临时队列

当我们需要一个临时队列的时候,我们可以先定义队列,使用完再删除,或者直接定义 Durability 的属性为 transient,等 broker 重启的时候就消失,但是感觉没有很方便。特别是使用后删除,如果客户端失败,这个队列就一直存在。我们可以用以下方法来自动删除:

  • Exclusive:独占队列
  • x-expires:设置队列的过期时间
  • Auto delete:队列设置自动删除
public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("queue1", false, true, false, null);
    channel.queueDeclare("queue2", false, false, true, null);
    channel.basicConsume("queue2", true, null, consumerTag -> {});
    Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-expires",5000);
    channel.queueDeclare("queue3", false, false, false, arguments);
}

queue1 是独占队列,queue2 是自动删除,queue3 设置了 5 秒的过期时间。
运行后如下图,五秒后 queue3 消失,停止程序运行,queue1 和 queue2 消失。
需要注意的是,如果把 queue2 的 basicConsume 方法调用注释掉,由于没有消费者,队列并不会消失。
独占队列只能由其声明连接使用(从声明连接使用、清除、删除等)。其他队列如果想使用独占队列将导致通道级异常 RESOURCE_LOCKED,该异常带有一条错误消息,表明无法获得对锁定队列的独占访问。

推模式和拉模式

消费者通过两种方式来接收消息:

  • 通过 consume 订阅,是队列往消费端推送消息。只要队列有消息,就可以持续收到。
  • 通过 get 获取消息,是消费者主动从队列拉取消息,每次只能获取一条,如果想获取更多消息,可以用 while 循环。但是吞吐量相对比较低。

我们先往队列里发送消息,其中 queue1 发送 4 条,queue2 发送 3 条。这边还没讲到如何发送消息,可以通过 http://127.0.0.1:15672/ 的 web 控制台来发送消息。

queue1 是拉模式,queue2 是推模式。

queue1 的代码:

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("queue1", false, false, false, null);
    GetResponse response = channel.basicGet("queue1", true);
    System.out.println("queue1 Received'" + new String(response.getBody()) + "'");
    GetResponse response2 = channel.basicGet("queue1", true);
    System.out.println("queue1 Received'" + new String(response.getBody()) + "'");
}

运行结果如下,调用 get 两次。

queue2 的代码:

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare("queue2", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("queue2 Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume("queue2", true, deliverCallback, consumerTag -> {});
}

调用 consume 一次。

最终结果如下:queue1 因为 get 两次,所以还有 2 条消息。queue2 的 3 条消息都消费完。

多个消费者

在上面的例子中,我们先把消息发送给队列,此时没有消费者,消息就会在队列里一直等。那如果有消费者,且有多个消费者,消息是如何发布的呢?
我们启动上面的 queue2 应用程序两次,再发送消息,可以看到,两个应用程序是交替消费数据的。整个步骤如下:

  1. 消息 A 发送到 queue2 队列
  2. rabbitmq 把消息 A 发送给消费者 1
  3. 消费者 1 收到消息 A 并确认
  4. rabbitmq 把消息 A 从 queue2 队列删除
  5. 消息 B 发送到 queue2 队列
  6. rabbitmq 把消息 B 发送给消费者 2
  7. 消费者 2 收到消息 B 并确认
  8. rabbitmq 把消息 B 从 queue2 队列删除
正文完
 0