共计 4263 个字符,预计需要花费 11 分钟才能阅读完成。
为不便更好交换,可关注公众号:Java 课代表,每日一更,等你来呦!
4 路由(Routing)
在上一篇教程中咱们创立了一个繁难的日志零碎。能够将日志音讯播送给多个接收者。
本教程中,咱们将给它增加一个新个性——让独自订阅某一部分音讯 (子集) 成为可能。比方,咱们只把严重错误写入到磁盘文件中(只保留严重错误日志能够节俭磁盘),同时依然将所有日志都在终端输入。
绑定(Bindings)
在后面的例子中,咱们曾经创立了绑定,你可能还记得如下代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定是替换 (exchange) 和队列 (queue) 之间的一种关系。能够简略地了解为:这个队列对来自这个替换的音讯感兴趣。
绑定能够有一个 routingKey
参数。为了防止和 basic_publish
办法的参数混同 (课代表注:Channel#queueBind(String queue, String exchange, String routingKey) 办法的 routingKey
参数用于实现绑定关系,而 Channel#basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)办法的 routingKey 用来路由音讯),咱们将称其为binding key
:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key 的含意取决于替换的类型。咱们后面应用过的 fanout
类型的替换会疏忽它的值。
Direct 替换(Direct exchange)
咱们后面教程中的日志记录零碎会播送所有音讯给所有消费者。咱们心愿能够依据音讯的紧急性进行过滤。比方,心愿某个利用只记录严重错误到磁盘,不记录正告和信息类日志音讯以节约磁盘。
之前咱们应用的 fanout 替换灵活性并不高——他只是无脑播送。
接下来将应用 direct exchange
代替,direct exchange
的路由算法很简略——音讯会进入那个队列的 binding key
与音讯的 routing key
齐全匹配的队列。
为了说明该点,思考如下设置:
在上图的设置中,能够看到 direct exchange X
绑定了两个queue
.Q1
应用 orange
作为 binding key
,Q2
有两个绑定,一个是black
,另一个是green
.
在该设置下,应用 orange
作为 routing key
的音讯,发送到 exchange
后将被路由到 Q1
. 应用black
或green
作为 routing key
的音讯将被路由到Q2
。其余音讯将会被抛弃。
多重绑定(Multiple bindings)
应用雷同的 binding key
绑定到多个 queue
是齐全非法的。在上图中,咱们将 X
和Q1
通过 black
作为 binding key
绑定起来。这样一来,direct exchange
会像 fanout
一样播送音讯到所有匹配的队列,(课代表注:fanout
是无脑播送到所有 queue
,direct
是发送给所有匹配的queue
)。
发送日志(Emitting logs)
咱们将把这个模型利用到日志零碎上。应用 direct exchange
替换 fanout
来发送音讯。咱们将提供日志级别作为routing key
。这样一来,接管程序就能够依据他想要的级别来接管音讯。上面先来看一下发送日志。
跟之前一样,首先要创立一个exchange
:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
而后就能够发送音讯了:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化起见,咱们假如日志级别分为:’info’, ‘warning’, ‘error’。
订阅(Subscribing)
接管音讯程序和先前教程中的一样失常工作,不过有一点例外,咱们须要为感兴趣的日志级别创立绑定。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
代码整合(Putting it all together)
EmitLogDirect.java 残缺代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println("[x] Sent'" + severity + "':'" + message + "'");
}
}
//..
}
ReceiveLogsDirect.java 残缺代码:
import com.rabbitmq.client.*;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received'" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
跟后面一样编译(参考第一节的编译和 classpath 设置)。为了使用方便,咱们在运行例子时应用环境变量 $CP(Windows 下是 %CP%)作为 classpath。
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果你只想保留 ‘warning’ 和 ’error’ (不想要 ‘info’) 级别的日志到文件,只须要关上终端,输出:
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果想在屏幕上看到所有音讯,关上新终端,输出:
java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C
最初作为例子,发送一个 ’error’ 级别的日志音讯:
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(残缺源码参见 (EmitLogDirect.java source) 和 (ReceiveLogsDirect.java source))
持续学习第 5 篇教程,理解如何基于模式来侦听音讯。
【举荐浏览】
RabbitMQ 教程 3. 公布 / 订阅 (Publish/Subscribe)
RabbitMQ 教程 2. 工作队列(Work Queue)
RabbitMQ 教程 1.“Hello World”
Freemarker 教程(一)- 模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
深入浅出 MySQL 优先队列(你肯定会踩到的 order by limit 问题)
码字不易,欢送点赞关注和分享。
搜寻:【Java 课代表】,关注公众号,每日一更,及时获取更多 Java 干货。