共计 4078 个字符,预计需要花费 11 分钟才能阅读完成。
为不便更好交换,可关注公众号:Java 课代表,每日一更,等你来呦!
5 Topics
在后面的教程中,咱们改良了日志零碎,为了解决 fanout exchange
的无脑播送,咱们应用direct
替换,从而实现了选择性接管日志。
只管应用 direct exchange 改良了零碎,他还是有局限性——它不能基于多种准则来路由音讯。
在咱们的日志零碎中,咱们既想依据日志级别订阅日志,还想依据日志源订阅日志。你可能从 syslog unix 工具中理解过这个概念,它基于日志的级别和设施来路由日志。
这将带来极大的灵活性——咱们可能想要监听来自 ’cron’ 的严重错误日志和来自 ’kern’ 的所有日志。
为了在日志零碎中实现该性能,咱们须要学习一下略微简单一点的 topic exchange。
Topic exchange
发送给 topic exchange 的音讯不能应用随便的 routing_key
——它必须是一组逗号宰割的单词。能够是任意单词,但通常与音讯的某些个性相干。这些routing key
都是非法的:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。routing key
里的单词数量没有限度,但最长不能超过 255 字节。
binding key
的格局也是一样。topic exchange
的解决逻辑和 direct
差不多——通过指定 routing key
发送的音讯会被散发到所有与匹配的binding key
绑定的队列上。
- *(星号) 代表一个特定的单词
-
(井号) 代表 0 个或多个单词
上面的例子能够很容易地解释这一点:
图中,咱们将发送一些形容动物特色的音讯。音讯将会应用蕴含三个单词 (两个点宰割) 的routing key
。第一个单词形容速度,第二个形容色彩,第三个形容种类: "<speed>.<colour>.<species>"
。
创立三个绑定:Q1
应用 "*.orange.*"
,Q2
应用 "*.*.rabbit"
和"lazy.#"
作为bingding key
。
下面的绑定总结如下:
Q1
对所有黄色动物感兴趣Q2
想订阅所有兔子和所有懒惰动物的音讯
routing key
为 "quick.orange.rabbit"
的音讯会同时发送给两个队列,"lazy.orange.elephant"
的音讯也会发给两个队列。另外,"quick.orange.fox"
会发给 Q1
,"lazy.brown.fox"
会发给 Q2
,"lazy.pink.rabbit"
只会发给 Q2
一次,即便有两条匹配到 Q2
的绑定,"quick.brown.fox"
匹配不到任何绑定,所以会被抛弃。
如果咱们突破规定,应用一个或四个单词 (作为routing key
) 发送音讯会怎么,比方“orange”
或者“quick.orange.male.rabbit”
?当然,这些音讯不会匹配任何binding
,会被抛弃。
不过,对于 "lazy.orange.male.rabbit"
,只管它有四个单词,依然会匹配最初一个bingding
(lazy.#) 并发送到Q2
Topic exchange
Topic exchange 十分弱小,它能够变成和其余 exchange 一样工作
当一个 queue 被
binding key
“#” 绑定后——他将会收到所有音讯,疏忽routing key
——就像 fanout exchange 一样当绑定中不存在特殊字符:“*”和“#”时,topic exchange 会像 direct exchange 一样工作
整合代码(Putting it all together)
咱们将会在日志零碎中应用 topic exchange。咱们假如日志的 routing key 由两个单词组成:”< 设施 >.< 级别 >”
代码简直和后面的教程一样
EmitLogTopic.java 源码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("[x] Sent'" + routingKey + "':'" + message + "'");
}
}
//..
}
ReceiveLogsTopic.java 源码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received'" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
编译并运行样例代码,记得像教程 1 那样引入 classpath——windows 应用 %CP%。
编译:
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接管日志音讯
java -cp $CP ReceiveLogsTopic "#"
接管设施 ”kern” 的所有日志音讯
java -cp $CP ReceiveLogsTopic "kern.*"
或者只关怀“critical”级别的日志
java -cp $CP ReceiveLogsTopic "*.critical"
能够创立多个绑定
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
应用routing key
“kern.critical” 发送日志音讯:
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
祝你程序玩得欢快。留神代码并没有设置 routing key 或者 binding key,你须要玩的时候通过参数指定。
(残缺源码参考:EmitLogTopic.java 和 ReceiveLogsTopic.java)
下一篇,在 教程 6 中将介绍如何利用往返音讯实现近程过程调用(Remote Procedure Call)
【举荐浏览】
RabbitMQ 教程 4. 路由 (Routing)
RabbitMQ 教程 3. 公布 / 订阅(Publish/Subscribe)
RabbitMQ 教程 2. 工作队列(Work Queue)
RabbitMQ 教程 1.“Hello World”
Freemarker 教程(一)- 模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
深入浅出 MySQL 优先队列(你肯定会踩到的 order by limit 问题)
码字不易,欢送点赞关注和分享。
搜寻:【Java 课代表】,关注公众号,每日一更,及时获取更多 Java 干货。