共计 4376 个字符,预计需要花费 11 分钟才能阅读完成。
为不便更好交换,可关注公众号:Java 课代表,每日一更,等你来呦!
3 公布 / 订阅(Publish/Subscribe)
在上一节中,咱们创立了一个工作队列。其目标是将每个工作只分发给一个 worker。本节咱们将换一种玩法:咱们投递一条音讯,让所有消费者都能接管到。这种模式称为公布 / 订阅(Publish/Subscribe)。
为了演示这种模式,咱们将构建一个日志记录零碎。它蕴含两个利用——第一个发送日志音讯,第二个接管并打印日志音讯。
在咱们的日志记录零碎中,每个运行中的接管程序都能接管到消(课代表注:雷同的一条音讯会被每个消费者收到)。这样一来,咱们就能够让一个接受者保留日志到硬盘;另一个在屏幕上打印日志。
实际上,已公布的日志音讯将会被播送给所有接收者。
替换(Exchanges)
在后面的教程中,咱们间接通过队列 (queue) 来发送和接管音讯。当初是时候介绍一下 RabbitMQ 中的残缺音讯模型了。
先对后面介绍过的内容做个简略回顾:
- 生产者 (producer) 是用来发送音讯的利用。
- 队列 (queue) 是一个音讯缓冲区。
- 消费者 (consumer) 是用来接管音讯的利用。
RabbitMQ 音讯模型的核心思想是:生产者 (producer) 从不间接将音讯发送给队列(queue)。实际上在大多数状况下,生产者甚至不晓得音讯会被散发到哪个队列。
相同,生产者只能够发消息给替换 (exchange)。替换非常简单,一方面它从生产者接管音讯,另一方面它将音讯推送到队列。替换必须确切地晓得如何解决收到的音讯。是该把音讯发给某个队列?还是发给多个队列?或者扔掉音讯?路由类型(exchange type) 定义了具体的行为规定。
有如下几种路由类型:direct, topic, headers 和 fanout。咱们先看一下最初一种,fanout:
channel.exchangeDeclare("logs", "fanout");
fanout 类型的替换非常简单。正如其名,它就是把收到的音讯播送给所有它所晓得的队列。这正是咱们的日志记录零碎须要的形式。
列出所有替换
为了列出服务器上的所有替换,能够应用 rabbitmqctl 命令:
sudo rabbitmqctl list_exchanges
列表中将会呈现一些名如 amq.* 的替换和默认 (没名字的) 替换。这些是默认创立的,目前不须要应用他们。
没名字的替换
在后面的教程中,咱们并不知道替换的存在,然而仍然能够发送音讯到队列。这是因为咱们应用了默认替换,用空字符 (“”) 来标识.
回忆一下咱们之前如公布音讯:
channel.basicPublish("","hello", null, message.getBytes());
第一个参数是替换的名字,空字符表明应用默认替换:如果音讯存在,则通过指定的 routingKey 将音讯路由到队列中。
当初咱们能够发送给指定名称的替换了:
channel.basicPublish("logs", "", null, message.getBytes());
长期队列(Temporary queues)
你可能还记得之前咱们应用有名称的队列(记得 hello 和 task_queue 吗?)。给队列命名至关重要,因为咱们须要让 worker 监听相应队列。当你想把队列在生产者和消费者之间共享时,必须给队列命名。
但这并不适用于咱们的日志记录零碎。咱们须要监听全副日志音讯,而非局部。而且咱们只关怀以后正在发送的音讯,历史音讯并不关怀。为此,咱们须要做两点:
首先,每次当咱们连贯到 RabbitMQ,咱们须要一个全新的队列。为此咱们能够每次创立一个随机命名的队列,或者更好的抉择是让服务器创立一个随机命名的队列。
其次,一旦队列没有消费者连贯,它将主动删除。
在 Java 客户端中,当咱们调用无参办法 queueDeclare() 时,就创立了一个非长久化,专用的(课代表注:连贯敞开时主动删除队列),主动删除的队列:
String queueName = channel.queueDeclare().getQueue();
理解更多对于 exclusive 标记和其余属性,查看 guide on queue。
此时,变量 queueName
是一个随机生成的队列名称字符串。它的值可能是:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
绑定(Bindings)
咱们曾经创立了一个 fanout 类型的替换,当初咱们须要通知替换该把音讯发送给哪个队列。替换和队列之间的这种关系咱们称为 绑定(binding)。
channel.queueBind(queueName, "logs", "");
如上代码会将名为 ”logs” 的替换的音讯发送到咱们的队列中。
列出绑定(Listing bindings)
猜猜看用什么工具能够列出绑定关系?
rabbitmqctl list_bindings
代码整合(Putting it all together)
公布日志音讯的生产者程序和后面教程中的代码没有多大区别。最大的改变是当初咱们把音讯发送给名为 ”logs” 的替换,而以前咱们发送给默认的匿名替换。发送音讯的时候,须要提供 routingKey
,不过对于 fanout 类型的替换,它会疏忽routingKey
的值。上面是发送日志程序的代码EmitLog.java
:
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("[x] Sent'" + message + "'");
}
}
}
(EmitLog.java 源文件)
如你所见,当咱们建设连贯之后,申明了替换。这一步是十分必要的
如果还没有队列被绑定到替换,音讯将会失落,不过这并不影响咱们以后的利用场景,如果以后没有消费者,咱们能够释怀地抛弃音讯。
ReceiveLogs.java
:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
(ReceiveLogs.java 源文件)
像之前那样编译。
javac -cp $CP EmitLog.java ReceiveLogs.java
如果想保留日志到文件,能够关上终端并输出:
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果想在屏幕上输入日志,关上一个新终端并运行:
java -cp $CP ReceiveLogs
要收回日志类型,输出:
java -cp $CP EmitLog
应用rabbitmqctl list_bindings
能够验证代码创立的绑定和队列是否正确。运行两个 ReceiveLogs.java
程序后,你应该能看到如下输入:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
对此的解释也很简略:logs 替换的音讯发送给了两个由服务端生成名字的队列。这正是咱们冀望的后果。
想要晓得如何监听泛滥音讯中的一部分(子集),请查阅教程 4。
举荐浏览
RabbitMQ 教程 1.“Hello World”
RabbitMQ 教程 2. 工作队列(Work Queue)
Freemarker 教程(一)- 模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
深入浅出 MySQL 优先队列(你肯定会踩到的 order by limit 问题)
码字不易,欢送点赞分享。
搜寻:【Java 课代表】,关注公众号,及时获取更多 Java 干货。