Connection & Channel
Connection
代表一个 TCP 连接,Channel
是建立在 Connection
上的虚拟连接。RabbitMQ 每条指令都是通过 Channel
完成的。
对于 OS 而言,创建和销毁 TCP 连接的代价非常高,在高峰期很容易遇到瓶颈。程序中一般会有多个线程需要与 RabbitMQ 建立通信,消费或生产消息,通过 TCP 连接复用来减少性能开销。
Connection
可以创建多个 Channel
,但是 Channel
不是线程安全的所以不能在线程间共享。
Connection
在创建时可以传入一个 ExecutorService
,这个线程池时给该 Connection
上的 Consumer
用的。
Channel.isOpen
以及 Connection.isOpen
方法是同步的,因此如果在发送消息时频繁调用会产生竞争。我们可以认为在 createChannel
方法后 Channel
以及处于开启状态。若在使用过程中 Channel
关闭了,那么只要捕获抛出的 ShutDownSignalException
就可以了,同时建议捕获 IOException
以及 SocketException
防止连接意外关闭。
Exchange & Queue
消费者和生产者都可以声明一个已经存在的 Exchange
或者 Queue
,前提是参数完全匹配现有的 Exchange
或者 Queue,否则会抛出异常。
QueueDeclare
参数:exclusive
: 排他队列,只有同一个 Connection
的 Channel
可以访问,且在 Connection
关闭或者客户端退出后自动删除,即使 durable
为 true
。
queuePurge(String queue)
:清空队列
Exchange
可以绑定另一个 Exchange
:exchangeBind(String destination, String source, String routeKey)
, 从 source
到 destination
若业务允许,则最好预先创建好
Exchange
以及Queue
并进行绑定 (rabbitmqadmin),防止 Exchange 没有绑定Queue
或 绑定错误的Queue
而导致消息丢失(关键信息应当使用mandatory
参数)。
Alternate Exchange
: 在 Channel.exchangeDeclare
时添加 alternate-exchange
参数或在 Policy
中声明。mandatory
为 true
时,未被路由的消息会被发送到 Alternate Exchange
。建议 Exchange Type
设置为 fanout
,否则当 RoutingKey
依然不匹配就会被返回 Producer
。
P.S. 有些书上讲备份交换器和
mandatory
参数一起使用mandatory
参数失效是错的,当RoutingKey
不匹配Alternate Exchange
依然会被返回Producer
。
(rabbitmq v3.7 测试)
Map<String, Object> arg = new HashMap<String, Object>() {{put("alternate-exchange", "alt");
}};
channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
channel.exchangeDeclare("alt", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("notSend", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "key");
channel.queueBind("notSend", "alt", "");
Publish & Consume
Publish Confirm
消息发送到服务器后可能还没来的及刷到磁盘中,服务器就挂掉,从而造成消息丢失。Publish Confirm 能够在消息确实到达服务器(开启持久化的消息会在刷入磁盘之后)之后返回一个确认给 Publisher。
通过 channel.confirmSelected
把 Channel
设置为 Confirm
模式,并为 Channel
添加一个 ConfirmLister
来监听返回的确认。
SortedSet<Long> unconfirmedSet = new TreeSet<>();
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {System.out.println("handleAck:" + deliveryTag + " " + multiple);
if (multiple) {unconfirmedSet.headSet(deliveryTag - 1).clear();} else {unconfirmedSet.remove(deliveryTag);
}
}, (deliveryTag, multiple) -> {System.out.println("handleNack:" + deliveryTag + " " + multiple);
if (multiple) {unconfirmedSet.headSet(deliveryTag - 1).clear();} else {unconfirmedSet.remove(deliveryTag);
}
});
while (true) {long seq = channel.getNextPublishSeqNo();
channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8));
unconfirmedSet.add(seq);
Thread.sleep(1000);
}
除了异步处理的方式之外还有批量确认以及事务的方法。批量确认的速度在大量连续发送的情况下和异步的方法差不多。不管怎样这两种消息确认的方法都要比事务的方式快 7 倍左右。
Consumer
一般应当实现 Consumer
接口或者继承 DefaultConsumer
,Consumer
通过 consumerTag
来进行区分。
消费消息有两种方式,一种是 Push
,一种是 Get。
Push
是由 RabbitMQ
以轮询的方式将消息推送到 Consumer
,方法为 basicConsume
。一般一个 Channel
对应一个 Consumer
。
Get
由客户端主动从 RabbitMQ
拉取一条消息,方法为 basicGet
。__不能循环执行 basicGet
来代替 basicConsume
,不然会严重影响性能。__
消息确认:autoAck
为 false
,RabbitMQ
会等待 basicAck
的显式确认。除非 Consumer
连接断开否则一直等待确认。当 Consumer
显式调用 basicReject
或者 basicNack
并将 requeue
设为 true
后会将消息重新入队投递。一般我们在业务处理完之后再 ack
.mandatory
: 当 Exchange
无法匹配 Queue
或 Exchange
时,mandatory
为 true
的消息会被返回给 Producer
,否则会被丢弃。通过 Channel.addReturnListener
来添加 ReturnListener
监视器。
TTL
-
queueDeclare
时添加x-message-ttl
参数,单位毫秒。Map<String, Object> arg = new HashMap<String, Object>() {{put("x-message-ttl", "1000000"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
-
使用
AMQP.BasicProperties.Builder
创建AMQP.BasicProperties
并设置expiration
参数。AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("100000"); channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));
Dead Letter Exchange (DLX)
Dead Letter(死信):
-
Basic.Reject
/Basic.Nack
并且requeue
为true
- 消息 TTL 过期
- 队列达到最大长度
当消息成为 Dead Letter 之后,RabbitMQ 会自动把这个消息发到 DLX 上。
// 当发送到 normalQueue 中的消息成为 Dead Letter 之后会自动以
// dead-letter 为 routingKey 发送到 dlxQueue Exchange
Map<String, Object> arg = new HashMap<String, Object>() {{put("x-dead-letter-exchange", "dlx");
put("x-dead-letter-routing-key", "dead-letter");
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("dlx", "direct", true, false, false, null);
channel.queueDeclare("dlxQueue", true, false, false, null);
DLX 其他用法:延迟队列,消息 发送到一个暂存的、没有 Consumer
的 Queue
并设置 TTL,Consumer
消费 DLX 绑定的 Queue
的消息,建议给暂存的 Queue
设置一个最大的 TTL,防止消息没有设置 TTL 而一直堆积在 Queue
中。
Priority
消息的消费可以有优先级,Queue
的最大优先级可以通过 x-max-priority
进行设置。
Map<String, Object> arg = new HashMap<String, Object>() {{put("x-max-priority", 5);
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("normalExchange", "direct", true, false, null);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(2);
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Durability
Exchange
, Queue
, 消息都可以进行持久化。在消息发送到 Exchange
之后会立刻路由到 Queue
中,因此未持久化的 Exchange
在重启后会丢失 Exchange
元数据以及绑定,对 Queue
和消息的持久化无影响。
未持久化的 Queue
在重启后会丢失,包括 Queue
中的消息,不管消息是否设置了持久化。
未持久化的消息在重启后会丢失,即使所在的 Queue
已持久化。
channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化
channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化
channel.queueBind("normalQueue", "normalExchange", "key");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); // 消息持久化
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Qos
Qos 的作用时负载均衡。当一个队列有两个 Consumer,一个性能很好 A,另一个不那么好 B,RabbitMQ 会轮询,将消息平均地分给这两个 Consumer。可见 B 上的堆积的消息会越来越多,而 A 上的线程可能会空闲。Qos 的作用就是防止一个 Consumer 堆积了过多的消息,把这些消息分给其他 Consumer。
global 参数:
channel.basicQos(3, false); // each Consumer limit 3
channel.basicQos(5, true); // this channel limit to 5
global 参数会让 RabbitMQ 调用更多资源,尽量不要设置(默认值为 false)。
Relibility
RabbitMQ 支持最少一次和最多一次。
最少一次:
- 启用 Publisher Confirm 或者 事务保证消息能够到达服务器。- 启用 mandatory 参数保证消息不回被 Exchange 丢掉。- 消息和 Queue 开启持久化。- Consumer autoAck off, 并确保消息在处理完之后再 ack
Policy
Policy 可以很方便的批量设置 Exchange 以及 Queue 的属性,但是 Policy 的优先级较低,请注意。
Policy 可以通过 HTTP API,web console,以及 cli 的方式。
rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
-
vhost
: 指定 vhost -
proiority
: 如果一个 Queue 或者 Exchange 有多个 Policy 的情况下,只有 priority 最大的那个 Policy 才会生效。 -
apply-to
: 应用到- Exchange and Queue
- Exchange
- Queue
-
name
: Policy 的名字 -
pattern
: Exchange 或者 Queue 名字的正则表达式 -
defination
: 属性值,可以通过 management > Admin > Policies 的查看。
Cluster
RabbitMQ 会把所有的元数据存储到所有的节点上,但是队列是分散在集群中所有的节点上的。
Build A Cluster with docker
我们尝试使用 Docker Compose 创建一个由 3 个服务组成的集群
version: "3"
services:
node1:
image: rabbitmq:3.7-management-alpine
container_name: node1
hostname: node1
environment:
RABBITMQ_ERLANG_COOKIE: secret_cookie_here
ports:
- "5673:5672"
- "15673:15672"
node2:
image: rabbitmq:3.7-management-alpine
container_name: node2
hostname: node2
environment:
RABBITMQ_ERLANG_COOKIE: secret_cookie_here
ports:
- "5674:5672"
- "15674:15672"
node3:
image: rabbitmq:3.7-management-alpine
container_name: node3
hostname: node3
environment:
RABBITMQ_ERLANG_COOKIE: secret_cookie_here
ports:
- "5675:5672"
- "15675:15672"
通过设置 hostname,容器内部的 rabbitmq 的 nodename 就变成类似 rabbitmq@node1。同时集群中的 RabbitMQ 需要相同的 RABBITMQ_ERLANG_COOKIE 来进行互相认证。
启动服务:
docker-compose up -d
然后将 node2,node3 加入 node1 , 注意,加入集群之前 RabbitMQ 必须停止:
# 停止 rabbitmq
docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node3 rabbitmqctl stop_app
# 加入 node1
docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1
docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1
# 重新启动
docker-compose exec node2 rabbitmqctl start_app
docker-compose exec node3 rabbitmqctl start_app
在任意一个节点上查询集群状态:
docker-compose exec node2 rabbitmqctl cluster_status
可以看到如下状态:
Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
{running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]},
{cluster_name,<<"rabbit@node2">>},
{partitions,[]},
{alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]
手动下线节点
将节点从在线状态下线, 首先停止节点,然后重置节点。
docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node2 rabbitmqctl reset
docker-compose exec node2 rabbitmqctl stop_app
在重新启动服务器之后可以发现该节点已经脱离了集群。
Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node2]}]},
{running_nodes,[rabbit@node2]},
{cluster_name,<<"rabbit@node2">>},
{partitions,[]},
{alarms,[{rabbit@node2,[]}]}]
节点类型
RabbitMQ 的节点类型有两种,一种是 disc,第二种是 ram。RabbitMQ 要求集群中至少要有一个磁盘节点,储存了所有的元数据。当集群中的唯一一个磁盘节点崩溃后,集群可以继续收发消息,但是不能创建队列等操作。
RabbitMQ 在加入集群时默认为磁盘模式,如果要以内存模式加入:
docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram
更改节点类型:
docker-compose exec node 2 rabbitmqctl change cluster_node_type desc
Mirror Queue
RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 机制。请注意,开启 Publisher Confirmed 或者事务的情况下,只有所有的 Slave 都 ACK 之后才会返回 ACK 给客户端。
开启 Mirror Queue 主要通过设置 Policy 其中最主要的是 defination
:
-
ha-mode
: Mirror Queue 的模式-
all
: 默认的模式,表示在集群中的所有节点上进行镜像 -
exactly
: 在指定数量的节点上进行镜像,数量由ha-params
指定。 -
nodes
: 在指定的节点上进行镜像,节点名称由ha-params
指定。
-
-
ha-params
: 如上所述 -
ha-sync-mode
: 消息的同步模式-
automatic
: 当新的 Slave 加入集群之后会自动同步消息。 -
manual
: 默认,当加入新的 Slave 之后不会自动把消息同步到新的 Slave 上。指导调用命令显式同步。
-
-
ha-promote-on-shutdown
:-
when-synced
: 默认, 如果主动停止 master,那么 slave 不会自动接管。也就是说会期望 master 会重启启动,这可以保证消息不会丢失。 -
always
: 不管 master 是因为什么原因停止的,slave 会立刻接管,有可能有一部分数据没有从 master 同步到 slave.
-
-
ha-promote-on-failure
: 默认always
, 不推荐设置为when-synced