为不便更好交换,可关注公众号:Java课代表,每日一更,等你来呦!

5 Topics

在后面的教程中,咱们改良了日志零碎,为了解决fanout exchange 的无脑播送,咱们应用direct替换,从而实现了选择性接管日志。

只管应用direct exchange 改良了零碎,他还是有局限性——它不能基于多种准则来路由音讯。

在咱们的日志零碎中,咱们既想依据日志级别订阅日志,还想依据日志源订阅日志。你可能从syslog unix 工具中理解过这个概念,它基于日志的级别和设施来路由日志。

这将带来极大的灵活性——咱们可能想要监听来自'cron'的严重错误日志和来自'kern'的所有日志。

为了在日志零碎中实现该性能,咱们须要学习一下略微简单一点的 topic exchange。

Topic exchange

发送给topic exchange的音讯不能应用随便的routing_key——它必须是一组逗号宰割的单词。能够是任意单词,但通常与音讯的某些个性相干。这些routing key都是非法的:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。routing key里的单词数量没有限度,但最长不能超过 255 字节。

binding key的格局也是一样。topic exchange的解决逻辑和direct差不多——通过指定routing key 发送的音讯会被散发到所有与匹配的binding key绑定的队列上。

  • *(星号) 代表一个特定的单词
  • (井号) 代表0个或多个单词

上面的例子能够很容易地解释这一点:

图中,咱们将发送一些形容动物特色的音讯。音讯将会应用蕴含三个单词(两个点宰割)的routing key。第一个单词形容速度,第二个形容色彩,第三个形容种类: "<speed>.<colour>.<species>"

创立三个绑定:Q1应用"*.orange.*"Q2应用"*.*.rabbit""lazy.#"作为bingding key

下面的绑定总结如下:

  • Q1对所有黄色动物感兴趣
  • Q2想订阅所有兔子和所有懒惰动物的音讯

routing key"quick.orange.rabbit"的音讯会同时发送给两个队列,
"lazy.orange.elephant"的音讯也会发给两个队列。另外, "quick.orange.fox"会发给Q1"lazy.brown.fox"会发给Q2"lazy.pink.rabbit"只会发给Q2一次,即便有两条匹配到Q2的绑定,"quick.brown.fox"匹配不到任何绑定,所以会被抛弃。

如果咱们突破规定,应用一个或四个单词(作为routing key) 发送音讯会怎么,比方“orange”或者“quick.orange.male.rabbit”?当然,这些音讯不会匹配任何binding,会被抛弃。

不过,对于"lazy.orange.male.rabbit",只管它有四个单词,依然会匹配最初一个bingding(lazy.#)并发送到Q2

Topic exchange

Topic exchange 十分弱小,它能够变成和其余 exchange 一样工作

当一个queue被binding key "#"绑定后——他将会收到所有音讯,疏忽routing key——就像 fanout exchange 一样

当绑定中不存在特殊字符:“*”和“#”时,topic exchange 会像direct exchange 一样工作

整合代码(Putting it all together)

咱们将会在日志零碎中应用 topic exchange。咱们假如日志的routing key 由两个单词组成:"<设施>.<级别>”

代码简直和后面的教程一样

EmitLogTopic.java 源码:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class EmitLogTopic {  private static final String EXCHANGE_NAME = "topic_logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    try (Connection connection = factory.newConnection();         Channel channel = connection.createChannel()) {        channel.exchangeDeclare(EXCHANGE_NAME, "topic");        String routingKey = getRouting(argv);        String message = getMessage(argv);        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");    }  }  //..}

ReceiveLogsTopic.java 源码:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic {  private static final String EXCHANGE_NAME = "topic_logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "topic");    String queueName = channel.queueDeclare().getQueue();    if (argv.length < 1) {        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");        System.exit(1);    }    for (String bindingKey : argv) {        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);    }    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");    DeliverCallback deliverCallback = (consumerTag, delivery) -> {        String message = new String(delivery.getBody(), "UTF-8");        System.out.println(" [x] Received '" +            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");    };    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });  }}

编译并运行样例代码,记得像教程1那样引入 classpath——windows 应用 %CP%。

编译:

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接管日志音讯

java -cp $CP ReceiveLogsTopic "#"

接管设施"kern"的所有日志音讯

java -cp $CP ReceiveLogsTopic "kern.*"

或者只关怀“critical”级别的日志

java -cp $CP ReceiveLogsTopic "*.critical"

能够创立多个绑定

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

应用routing key "kern.critical" 发送日志音讯:

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

祝你程序玩得欢快。留神代码并没有设置routing key 或者binding key,你须要玩的时候通过参数指定。

(残缺源码参考:EmitLogTopic.java 和 ReceiveLogsTopic.java)

下一篇,在 教程6 中将介绍如何利用往返音讯实现近程过程调用(Remote Procedure Call)


【举荐浏览】
RabbitMQ教程 4.路由(Routing)
RabbitMQ教程 3.公布/订阅(Publish/Subscribe)
RabbitMQ教程 2.工作队列(Work Queue)
RabbitMQ教程 1.“Hello World”
Freemarker 教程(一)-模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
深入浅出 MySQL 优先队列(你肯定会踩到的order by limit 问题)


码字不易,欢送点赞关注和分享。
搜寻:【Java课代表】,关注公众号,每日一更,及时获取更多Java干货。