关于java:RabbitMQ教程-3发布订阅PublishSubscribe

37次阅读

共计 4376 个字符,预计需要花费 11 分钟才能阅读完成。

为不便更好交换,可关注公众号:Java 课代表,每日一更,等你来呦!

3 公布 / 订阅(Publish/Subscribe)

在上一节中,咱们创立了一个工作队列。其目标是将每个工作只分发给一个 worker。本节咱们将换一种玩法:咱们投递一条音讯,让所有消费者都能接管到。这种模式称为公布 / 订阅(Publish/Subscribe)。

为了演示这种模式,咱们将构建一个日志记录零碎。它蕴含两个利用——第一个发送日志音讯,第二个接管并打印日志音讯。

在咱们的日志记录零碎中,每个运行中的接管程序都能接管到消(课代表注:雷同的一条音讯会被每个消费者收到)。这样一来,咱们就能够让一个接受者保留日志到硬盘;另一个在屏幕上打印日志。

实际上,已公布的日志音讯将会被播送给所有接收者。

替换(Exchanges)

在后面的教程中,咱们间接通过队列 (queue) 来发送和接管音讯。当初是时候介绍一下 RabbitMQ 中的残缺音讯模型了。

先对后面介绍过的内容做个简略回顾:

  • 生产者 (producer) 是用来发送音讯的利用。
  • 队列 (queue) 是一个音讯缓冲区。
  • 消费者 (consumer) 是用来接管音讯的利用。

RabbitMQ 音讯模型的核心思想是:生产者 (producer) 从不间接将音讯发送给队列(queue)。实际上在大多数状况下,生产者甚至不晓得音讯会被散发到哪个队列。

相同,生产者只能够发消息给替换 (exchange)。替换非常简单,一方面它从生产者接管音讯,另一方面它将音讯推送到队列。替换必须确切地晓得如何解决收到的音讯。是该把音讯发给某个队列?还是发给多个队列?或者扔掉音讯?路由类型(exchange type) 定义了具体的行为规定。

有如下几种路由类型:direct, topic, headers 和 fanout。咱们先看一下最初一种,fanout:

channel.exchangeDeclare("logs", "fanout");

fanout 类型的替换非常简单。正如其名,它就是把收到的音讯播送给所有它所晓得的队列。这正是咱们的日志记录零碎须要的形式。

列出所有替换

为了列出服务器上的所有替换,能够应用 rabbitmqctl 命令:

sudo rabbitmqctl list_exchanges

列表中将会呈现一些名如 amq.* 的替换和默认 (没名字的) 替换。这些是默认创立的,目前不须要应用他们。

没名字的替换

在后面的教程中,咱们并不知道替换的存在,然而仍然能够发送音讯到队列。这是因为咱们应用了默认替换,用空字符 (“”) 来标识.

回忆一下咱们之前如公布音讯:

channel.basicPublish("","hello", null, message.getBytes());

第一个参数是替换的名字,空字符表明应用默认替换:如果音讯存在,则通过指定的 routingKey 将音讯路由到队列中。

当初咱们能够发送给指定名称的替换了:

channel.basicPublish("logs", "", null, message.getBytes());

长期队列(Temporary queues)

你可能还记得之前咱们应用有名称的队列(记得 hello 和 task_queue 吗?)。给队列命名至关重要,因为咱们须要让 worker 监听相应队列。当你想把队列在生产者和消费者之间共享时,必须给队列命名。

但这并不适用于咱们的日志记录零碎。咱们须要监听全副日志音讯,而非局部。而且咱们只关怀以后正在发送的音讯,历史音讯并不关怀。为此,咱们须要做两点:

首先,每次当咱们连贯到 RabbitMQ,咱们须要一个全新的队列。为此咱们能够每次创立一个随机命名的队列,或者更好的抉择是让服务器创立一个随机命名的队列。

其次,一旦队列没有消费者连贯,它将主动删除。

在 Java 客户端中,当咱们调用无参办法 queueDeclare() 时,就创立了一个非长久化,专用的(课代表注:连贯敞开时主动删除队列),主动删除的队列:

String queueName = channel.queueDeclare().getQueue();

理解更多对于 exclusive 标记和其余属性,查看 guide on queue。

此时,变量 queueName 是一个随机生成的队列名称字符串。它的值可能是:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

绑定(Bindings)

咱们曾经创立了一个 fanout 类型的替换,当初咱们须要通知替换该把音讯发送给哪个队列。替换和队列之间的这种关系咱们称为 绑定(binding)

channel.queueBind(queueName, "logs", "");

如上代码会将名为 ”logs” 的替换的音讯发送到咱们的队列中。

列出绑定(Listing bindings)

猜猜看用什么工具能够列出绑定关系?

rabbitmqctl list_bindings

代码整合(Putting it all together)

公布日志音讯的生产者程序和后面教程中的代码没有多大区别。最大的改变是当初咱们把音讯发送给名为 ”logs” 的替换,而以前咱们发送给默认的匿名替换。发送音讯的时候,须要提供 routingKey,不过对于 fanout 类型的替换,它会疏忽routingKey 的值。上面是发送日志程序的代码EmitLog.java:

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println("[x] Sent'" + message + "'");
    }
  }
}

(EmitLog.java 源文件)

如你所见,当咱们建设连贯之后,申明了替换。这一步是十分必要的

如果还没有队列被绑定到替换,音讯将会失落,不过这并不影响咱们以后的利用场景,如果以后没有消费者,咱们能够释怀地抛弃音讯。

ReceiveLogs.java:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println("[*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("[x] Received'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
  }
}

(ReceiveLogs.java 源文件)

像之前那样编译。

javac -cp $CP EmitLog.java ReceiveLogs.java

如果想保留日志到文件,能够关上终端并输出:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果想在屏幕上输入日志,关上一个新终端并运行:

java -cp $CP ReceiveLogs

要收回日志类型,输出:

java -cp $CP EmitLog

应用rabbitmqctl list_bindings 能够验证代码创立的绑定和队列是否正确。运行两个 ReceiveLogs.java 程序后,你应该能看到如下输入:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对此的解释也很简略:logs 替换的音讯发送给了两个由服务端生成名字的队列。这正是咱们冀望的后果。
想要晓得如何监听泛滥音讯中的一部分(子集),请查阅教程 4。


举荐浏览
RabbitMQ 教程 1.“Hello World”

RabbitMQ 教程 2. 工作队列(Work Queue)

Freemarker 教程(一)- 模板开发手册

下载的附件名总乱码?你该去读一下 RFC 文档了!

深入浅出 MySQL 优先队列(你肯定会踩到的 order by limit 问题)


码字不易,欢送点赞分享。
搜寻:【Java 课代表】,关注公众号,及时获取更多 Java 干货。

正文完
 0