发布订阅模式
在之前的文章里,创建了 work queue。work queue 中,每一个 task 都会派发给一个 worker。在本章中,我们会完成完全不一样的事情 – 我们会派发一条 message 给多个消费者。我们称之为发布订阅模式。
为了更好来说明,我们将要构建一个简单的日志系统。会由两部分代码构成,第一部分来发送日志 message,第二部分会接受并打印日志。
在我们的日志系统中,每一个接收程序都会收到日志 message。这种方式下,我们可以运行一个接收程序将日志保存到磁盘,同时使用另外一个接收程序将日志打印到屏幕。
本质上来说,发布的日志 message 会广播到所有运行的接收者。
Exchanges
在之前的章节我们通过 queue 收发 message。现在开始介绍 Rabbit 中的 full messaging model。
首先让我们快速的回忆一下之前的章节
producer 是一个发送 message 的用户程序。
queue 是保存 message 的缓冲区
consumer 是接收 message 的用户程序
RabbitMQ 的 messaging model 的核心思想是 producer 不会直接向 queue 发送 message。实际上,很多时候 producer 也不知道 message 会发送到哪些 queue。
这里,producer 将 message 发送到 exchange。exchange 是一个非常简单的东西。一方面它从 producer 侧接收 message,另一方面它把 message 推送到 queue 去。exchange 必须知道对接收到的 message 接着要去做什么。是转发到特定的 queue?还是转发到多个 queue?还是干脆丢弃掉。这个规则取决于定义时 exchange 的类型。
exchange 有四种可选的类型:direct, topic, headers 和 fanout. 今天我们聚焦于最后一种 -fanout。让我们创建一个 fanout 类型的 exchange,命名为 logs
channel.exchangeDeclare(“logs”,”fanout”);
fanout 类型的 exchange 是非常简单的。可以从名字上大概猜出其用途,它广播所有的 message 到它所知道的 queue 去。这也正是日志应用所期望的。
列出所有的 exhange,可以使用 rabbitmqctl 命令 sudo rabbitmqctl list_exchanges,在列表总会出现一些 amq.* 的 exchange,和默认的 exchange。这些是默认自动创建的,我们不会使用到它们。没有名字的 exchange。在之前的章节里我们没有提到过 exchanges,我们直接将 message 发送到 queue。其实我们是用到了默认的 exchange,用空字符串”“来标识。回想一下,我们像下面这样发布 message:channel.basicPublish(“”,”hello”,null,message.getBytes()); 第一个参数就是 exchange 的名字。空字符串代表了没有名字的 exchange:message 被路由到了由 routingKey 指定名字的 queue。
现在,我们可以向有名字的 exchange 发布 message。
channel.basicPublish(“logs”,””,null,message.getBytes());
Temporary Queue
之前我们使用 queue 时都会指定名字,如 hello 和 task_queue。给一个 queue 命名是很重要的,因为我们要给 worker 指出相同的 queue。当需要在生产者和消费者间共享一个 queue 时,就必须给 queue 取好名字。
但是在我们日志应用中,情况却有所不同。我们需要接收到所有的 log message。我们也关注当前流动的 message。我们需要搞定 2 个事情。
首先,当连接到 Rabbit 时,我们需要一个全新的,空的 queue。因此我们可以自己创建一个随意名字的 queue,或是由服务器选择随意的 queue 名字,这当然是更好的选择。
其次,当我们断开接收者时,该 queue 可以被自动删除。
在 java 客户端中,当我们使用无参的 queueDeclare() 时,我们创建的是使用自动生成名字的一个不持久的,自动删除 queue:
String queueName = channel.queueDeclare().getQueue();
可以通过这里来学习到 exclusive 标志和其他 queue 的相关属性。
这时 queue 就具有一个随机的名字,比如像 amq.gen-JzTY20BRgKO-HjmUJj0wLg.
Bindings
我们已经创建了一个 fanout exchange 和 queue. 现在我们要设置 exchange,让它把 message 发送到我们的 queue。exchange 和 queue 这种关系的建立我们称之为 binding.
channel.queueBind(queueName,”logs”,””);
从现在开始 logs 这个 exchange 就会将 message 推向我们的队列了。
可以使用命令 rabbitmqctl list_bindings 来列出当前所有的 binding。
开始执行
生产者程序,和之前章节的代码变化不大,主要的变化是我们将 message 发送到 exchange 而不是一个 queue。你发现我们在发送的时候会填上一个 routingKey, 这个值在 fanout 类型的 exchange 中是被忽略的。下面是生产者 EmitLog.java 的代码
public class EmitLog {
private static final String EXCHANGE_NAME = “logs”;
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);
String message = argv.length < 1 ? “info: Hello World!” :
String.join(” “, argv);
channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes(“UTF-8″));
System.out.println(” [x] Sent ‘” + message + “‘”);
}
}
}
如你所见,在建立 connection 之后我们声明了 exchange. 这一步是必要的,发布 Message 到一个不存在的 exchange 是不允许的。
如果没有 queue 绑定到 exchange 的时候,发布的 message 是会丢失的,但在现在这个场景是 OK 的。下面是 ReceiveLogs.java 的代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = “logs”;
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, “”);
System.out.println(” [*] Waiting for messages. To exit press CTRL+C”);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), “UTF-8″);
System.out.println(” [x] Received ‘” + message + “‘”);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
首先进行编译
javac -cp $CP EmitLog.java ReceiveLog.java
如果要把日志保存到文件,则
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果要在控制台看日志, 在另一个终端
java -cp $CP ReceiveLogs
最后来发送日志
java -cp $CP EmitLog
使用 rabbitmqctl list_bindings, 来确认程序创建了我们在代码中指定的 binding 和 queue. 运行两个 ReceiveLogs 程序,你会看到像下面的输出
sudo rabbitmqctl list_bindings
# => Listing bindings …
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => …done.