乐趣区

关于java:RabbitMQ教程-5-Topics

为不便更好交换,可关注公众号:Java 课代表,每日一更,等你来呦!

5 Topics

在后面的教程中,咱们改良了日志零碎,为了解决 fanout exchange 的无脑播送,咱们应用direct 替换,从而实现了选择性接管日志。

只管应用 direct exchange 改良了零碎,他还是有局限性——它不能基于多种准则来路由音讯。

在咱们的日志零碎中,咱们既想依据日志级别订阅日志,还想依据日志源订阅日志。你可能从 syslog unix 工具中理解过这个概念,它基于日志的级别和设施来路由日志。

这将带来极大的灵活性——咱们可能想要监听来自 ’cron’ 的严重错误日志和来自 ’kern’ 的所有日志。

为了在日志零碎中实现该性能,咱们须要学习一下略微简单一点的 topic exchange。

Topic exchange

发送给 topic exchange 的音讯不能应用随便的 routing_key——它必须是一组逗号宰割的单词。能够是任意单词,但通常与音讯的某些个性相干。这些routing key 都是非法的:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。routing key里的单词数量没有限度,但最长不能超过 255 字节。

binding key的格局也是一样。topic exchange的解决逻辑和 direct 差不多——通过指定 routing key 发送的音讯会被散发到所有与匹配的binding key 绑定的队列上。

  • *(星号) 代表一个特定的单词
  • (井号) 代表 0 个或多个单词

上面的例子能够很容易地解释这一点:

图中,咱们将发送一些形容动物特色的音讯。音讯将会应用蕴含三个单词 (两个点宰割) 的routing key。第一个单词形容速度,第二个形容色彩,第三个形容种类: "<speed>.<colour>.<species>"

创立三个绑定:Q1应用 "*.orange.*"Q2 应用 "*.*.rabbit""lazy.#"作为bingding key

下面的绑定总结如下:

  • Q1对所有黄色动物感兴趣
  • Q2想订阅所有兔子和所有懒惰动物的音讯

routing key"quick.orange.rabbit" 的音讯会同时发送给两个队列,
"lazy.orange.elephant"的音讯也会发给两个队列。另外,"quick.orange.fox"会发给 Q1"lazy.brown.fox" 会发给 Q2"lazy.pink.rabbit" 只会发给 Q2 一次,即便有两条匹配到 Q2 的绑定,"quick.brown.fox"匹配不到任何绑定,所以会被抛弃。

如果咱们突破规定,应用一个或四个单词 (作为routing key) 发送音讯会怎么,比方“orange” 或者“quick.orange.male.rabbit”?当然,这些音讯不会匹配任何binding,会被抛弃。

不过,对于 "lazy.orange.male.rabbit",只管它有四个单词,依然会匹配最初一个bingding(lazy.#) 并发送到Q2

Topic exchange

Topic exchange 十分弱小,它能够变成和其余 exchange 一样工作

当一个 queue 被binding key “#” 绑定后——他将会收到所有音讯,疏忽routing key——就像 fanout exchange 一样

当绑定中不存在特殊字符:“*”和“#”时,topic exchange 会像 direct exchange 一样工作

整合代码(Putting it all together)

咱们将会在日志零碎中应用 topic exchange。咱们假如日志的 routing key 由两个单词组成:”< 设施 >.< 级别 >”

代码简直和后面的教程一样

EmitLogTopic.java 源码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println("[x] Sent'" + routingKey + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsTopic.java 源码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println("[*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("[x] Received'" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
  }
}

编译并运行样例代码,记得像教程 1 那样引入 classpath——windows 应用 %CP%。

编译:

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接管日志音讯

java -cp $CP ReceiveLogsTopic "#"

接管设施 ”kern” 的所有日志音讯

java -cp $CP ReceiveLogsTopic "kern.*"

或者只关怀“critical”级别的日志

java -cp $CP ReceiveLogsTopic "*.critical"

能够创立多个绑定

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

应用routing key “kern.critical” 发送日志音讯:

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

祝你程序玩得欢快。留神代码并没有设置 routing key 或者 binding key,你须要玩的时候通过参数指定。

(残缺源码参考:EmitLogTopic.java 和 ReceiveLogsTopic.java)

下一篇,在 教程 6 中将介绍如何利用往返音讯实现近程过程调用(Remote Procedure Call)


【举荐浏览】
RabbitMQ 教程 4. 路由 (Routing)
RabbitMQ 教程 3. 公布 / 订阅(Publish/Subscribe)
RabbitMQ 教程 2. 工作队列(Work Queue)
RabbitMQ 教程 1.“Hello World”
Freemarker 教程(一)- 模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
深入浅出 MySQL 优先队列(你肯定会踩到的 order by limit 问题)


码字不易,欢送点赞关注和分享。
搜寻:【Java 课代表】,关注公众号,每日一更,及时获取更多 Java 干货。

退出移动版