RabbitMQ-总结

72次阅读

共计 8791 个字符,预计需要花费 22 分钟才能阅读完成。

Connection & Channel

Connection 代表一个 TCP 连接,Channel 是建立在 Connection 上的虚拟连接。RabbitMQ 每条指令都是通过 Channel 完成的。

对于 OS 而言,创建和销毁 TCP 连接的代价非常高,在高峰期很容易遇到瓶颈。程序中一般会有多个线程需要与 RabbitMQ 建立通信,消费或生产消息,通过 TCP 连接复用来减少性能开销。

Connection 可以创建多个 Channel,但是 Channel 不是线程安全的所以不能在线程间共享。

Connection 在创建时可以传入一个 ExecutorService,这个线程池时给该 Connection 上的 Consumer 用的。

Channel.isOpen 以及 Connection.isOpen 方法是同步的,因此如果在发送消息时频繁调用会产生竞争。我们可以认为在 createChannel 方法后 Channel 以及处于开启状态。若在使用过程中 Channel 关闭了,那么只要捕获抛出的 ShutDownSignalException 就可以了,同时建议捕获 IOException 以及 SocketException 防止连接意外关闭。

Exchange & Queue

消费者和生产者都可以声明一个已经存在的 Exchange 或者 Queue,前提是参数完全匹配现有的 Exchange 或者 Queue,否则会抛出异常。

QueueDeclare 参数:
exclusive: 排他队列,只有同一个 ConnectionChannel 可以访问,且在 Connection 关闭或者客户端退出后自动删除,即使 durabletrue

queuePurge(String queue):清空队列

Exchange 可以绑定另一个 ExchangeexchangeBind(String destination, String source, String routeKey), 从 sourcedestination

若业务允许,则最好预先创建好 Exchange 以及 Queue 并进行绑定 (rabbitmqadmin),防止 Exchange 没有绑定 Queue 或 绑定错误的 Queue 而导致消息丢失(关键信息应当使用 mandatory 参数)。

Alternate Exchange: 在 Channel.exchangeDeclare 时添加 alternate-exchange 参数或在 Policy 中声明。mandatorytrue 时,未被路由的消息会被发送到 Alternate Exchange。建议 Exchange Type 设置为 fanout,否则当 RoutingKey 依然不匹配就会被返回 Producer

P.S. 有些书上讲备份交换器和 mandatory 参数一起使用 mandatory 参数失效是错的,当 RoutingKey 不匹配 Alternate Exchange 依然会被返回 Producer
(rabbitmq v3.7 测试)

Map<String, Object> arg = new HashMap<String, Object>() {{put("alternate-exchange", "alt");
}};
channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
channel.exchangeDeclare("alt", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("notSend", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "key");
channel.queueBind("notSend", "alt", "");

Publish & Consume

Publish Confirm

消息发送到服务器后可能还没来的及刷到磁盘中,服务器就挂掉,从而造成消息丢失。Publish Confirm 能够在消息确实到达服务器(开启持久化的消息会在刷入磁盘之后)之后返回一个确认给 Publisher。

通过 channel.confirmSelectedChannel 设置为 Confirm 模式,并为 Channel 添加一个 ConfirmLister 来监听返回的确认。

SortedSet<Long> unconfirmedSet = new TreeSet<>();
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {System.out.println("handleAck:" + deliveryTag + " " + multiple);
    if (multiple) {unconfirmedSet.headSet(deliveryTag - 1).clear();} else {unconfirmedSet.remove(deliveryTag);
    }
}, (deliveryTag, multiple) -> {System.out.println("handleNack:" + deliveryTag + " " + multiple);
    if (multiple) {unconfirmedSet.headSet(deliveryTag - 1).clear();} else {unconfirmedSet.remove(deliveryTag);
    }
});
while (true) {long seq = channel.getNextPublishSeqNo();
    channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8));
    unconfirmedSet.add(seq);
    Thread.sleep(1000);
}

除了异步处理的方式之外还有批量确认以及事务的方法。批量确认的速度在大量连续发送的情况下和异步的方法差不多。不管怎样这两种消息确认的方法都要比事务的方式快 7 倍左右。

Consumer

一般应当实现 Consumer 接口或者继承 DefaultConsumer ,Consumer 通过 consumerTag 来进行区分。

消费消息有两种方式,一种是 Push,一种是 Get。

Push 是由 RabbitMQ 以轮询的方式将消息推送到 Consumer,方法为 basicConsume。一般一个 Channel 对应一个 Consumer

Get 由客户端主动从 RabbitMQ 拉取一条消息,方法为 basicGet。__不能循环执行 basicGet 来代替 basicConsume,不然会严重影响性能。__

消息确认:autoAckfalseRabbitMQ 会等待 basicAck 的显式确认。除非 Consumer 连接断开否则一直等待确认。当 Consumer 显式调用 basicReject 或者 basicNack 并将 requeue 设为 true 后会将消息重新入队投递。一般我们在业务处理完之后再 ack .
mandatory : 当 Exchange 无法匹配 QueueExchange 时,mandatorytrue 的消息会被返回给 Producer,否则会被丢弃。通过 Channel.addReturnListener 来添加 ReturnListener 监视器。

TTL

  1. queueDeclare 时添加 x-message-ttl 参数,单位毫秒。

    Map<String, Object> arg = new HashMap<String, Object>() {{put("x-message-ttl", "1000000");
    }};
    channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
  2. 使用 AMQP.BasicProperties.Builder 创建 AMQP.BasicProperties 并设置 expiration 参数。

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    builder.expiration("100000");
    channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));

Dead Letter Exchange (DLX)

Dead Letter(死信):

  • Basic.Reject / Basic.Nack 并且 requeuetrue
  • 消息 TTL 过期
  • 队列达到最大长度

当消息成为 Dead Letter 之后,RabbitMQ 会自动把这个消息发到 DLX 上。

// 当发送到 normalQueue 中的消息成为 Dead Letter 之后会自动以
// dead-letter 为 routingKey 发送到 dlxQueue Exchange
Map<String, Object> arg = new HashMap<String, Object>() {{put("x-dead-letter-exchange", "dlx");
    put("x-dead-letter-routing-key", "dead-letter");
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("dlx", "direct", true, false, false, null);
channel.queueDeclare("dlxQueue", true, false, false, null);

DLX 其他用法:延迟队列,消息 发送到一个暂存的、没有 ConsumerQueue 并设置 TTL,Consumer 消费 DLX 绑定的 Queue 的消息,建议给暂存的 Queue 设置一个最大的 TTL,防止消息没有设置 TTL 而一直堆积在 Queue 中。

Priority

消息的消费可以有优先级,Queue 的最大优先级可以通过 x-max-priority 进行设置。

Map<String, Object> arg = new HashMap<String, Object>() {{put("x-max-priority", 5);
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("normalExchange", "direct", true, false, null);

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(2);
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));

Durability

Exchange , Queue , 消息都可以进行持久化。在消息发送到 Exchange 之后会立刻路由到 Queue 中,因此未持久化的 Exchange 在重启后会丢失 Exchange 元数据以及绑定,对 Queue 和消息的持久化无影响。

未持久化的 Queue 在重启后会丢失,包括 Queue 中的消息,不管消息是否设置了持久化。

未持久化的消息在重启后会丢失,即使所在的 Queue 已持久化。

channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化
channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化
channel.queueBind("normalQueue", "normalExchange", "key");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);    // 消息持久化
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));

Qos

Qos 的作用时负载均衡。当一个队列有两个 Consumer,一个性能很好 A,另一个不那么好 B,RabbitMQ 会轮询,将消息平均地分给这两个 Consumer。可见 B 上的堆积的消息会越来越多,而 A 上的线程可能会空闲。Qos 的作用就是防止一个 Consumer 堆积了过多的消息,把这些消息分给其他 Consumer。

global 参数:

channel.basicQos(3, false);  // each Consumer limit 3
channel.basicQos(5, true);   // this channel limit to 5

global 参数会让 RabbitMQ 调用更多资源,尽量不要设置(默认值为 false)。

Relibility

RabbitMQ 支持最少一次和最多一次。
最少一次:

- 启用 Publisher Confirm 或者 事务保证消息能够到达服务器。- 启用 mandatory 参数保证消息不回被 Exchange 丢掉。- 消息和 Queue 开启持久化。- Consumer autoAck off, 并确保消息在处理完之后再 ack

Policy

Policy 可以很方便的批量设置 Exchange 以及 Queue 的属性,但是 Policy 的优先级较低,请注意。

Policy 可以通过 HTTP API,web console,以及 cli 的方式。

rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
  • vhost : 指定 vhost
  • proiority : 如果一个 Queue 或者 Exchange 有多个 Policy 的情况下,只有 priority 最大的那个 Policy 才会生效。
  • apply-to : 应用到

    • Exchange and Queue
    • Exchange
    • Queue
  • name : Policy 的名字
  • pattern : Exchange 或者 Queue 名字的正则表达式
  • defination : 属性值,可以通过 management > Admin > Policies 的查看。

Cluster

RabbitMQ 会把所有的元数据存储到所有的节点上,但是队列是分散在集群中所有的节点上的。

Build A Cluster with docker

我们尝试使用 Docker Compose 创建一个由 3 个服务组成的集群

version: "3"
services:
  node1:
    image: rabbitmq:3.7-management-alpine
    container_name: node1
    hostname: node1
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5673:5672"
      - "15673:15672"
  node2:
    image: rabbitmq:3.7-management-alpine
    container_name: node2
    hostname: node2
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5674:5672"
      - "15674:15672"
  node3:
    image: rabbitmq:3.7-management-alpine
    container_name: node3
    hostname: node3
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5675:5672"
      - "15675:15672"

通过设置 hostname,容器内部的 rabbitmq 的 nodename 就变成类似 rabbitmq@node1。同时集群中的 RabbitMQ 需要相同的 RABBITMQ_ERLANG_COOKIE 来进行互相认证。

启动服务:

docker-compose up -d

然后将 node2,node3 加入 node1 , 注意,加入集群之前 RabbitMQ 必须停止:

# 停止 rabbitmq
docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node3 rabbitmqctl stop_app
# 加入 node1
docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1
docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1
# 重新启动 
docker-compose exec node2 rabbitmqctl start_app
docker-compose exec node3 rabbitmqctl start_app

在任意一个节点上查询集群状态:

docker-compose exec node2 rabbitmqctl cluster_status

可以看到如下状态:

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]

手动下线节点

将节点从在线状态下线, 首先停止节点,然后重置节点。

docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node2 rabbitmqctl reset
docker-compose exec node2 rabbitmqctl stop_app

在重新启动服务器之后可以发现该节点已经脱离了集群。

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node2]}]},
 {running_nodes,[rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]}]}]

节点类型

RabbitMQ 的节点类型有两种,一种是 disc,第二种是 ram。RabbitMQ 要求集群中至少要有一个磁盘节点,储存了所有的元数据。当集群中的唯一一个磁盘节点崩溃后,集群可以继续收发消息,但是不能创建队列等操作。

RabbitMQ 在加入集群时默认为磁盘模式,如果要以内存模式加入:

docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram

更改节点类型:

docker-compose exec node 2 rabbitmqctl change cluster_node_type desc

Mirror Queue

RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 机制。请注意,开启 Publisher Confirmed 或者事务的情况下,只有所有的 Slave 都 ACK 之后才会返回 ACK 给客户端。

开启 Mirror Queue 主要通过设置 Policy 其中最主要的是 defination

  • ha-mode: Mirror Queue 的模式

    • all : 默认的模式,表示在集群中的所有节点上进行镜像
    • exactly : 在指定数量的节点上进行镜像,数量由 ha-params 指定。
    • nodes : 在指定的节点上进行镜像,节点名称由 ha-params 指定。
  • ha-params : 如上所述
  • ha-sync-mode : 消息的同步模式

    • automatic : 当新的 Slave 加入集群之后会自动同步消息。
    • manual: 默认,当加入新的 Slave 之后不会自动把消息同步到新的 Slave 上。指导调用命令显式同步。
  • ha-promote-on-shutdown:

    • when-synced: 默认, 如果主动停止 master,那么 slave 不会自动接管。也就是说会期望 master 会重启启动,这可以保证消息不会丢失。
    • always: 不管 master 是因为什么原因停止的,slave 会立刻接管,有可能有一部分数据没有从 master 同步到 slave.
  • ha-promote-on-failure: 默认 always , 不推荐设置为 when-synced

正文完
 0