【译】RabbitMQ系列(四) – 路由模式

6次阅读

共计 3449 个字符,预计需要花费 9 分钟才能阅读完成。

路由模式
在之前的文章中我们建立了一个简单的日志系统。我们可以通过这个系统将日志 message 广播给很多接收者。
在本篇文章中,我们在这之上,添加一个新的功能,即允许接收者订阅 message 的一个子集。举个例子,我们将日志分成多个级别,一个接收者接收错误日志将之保存到磁盘,另一个接收者接收所有日志将之打印到控制台。
Bindings
在前面的章节中,我们已经接触过 binding 了,像下面的代码这样:
channel.queueBind(queueName,EXCHANGE_NAME,””);
binding 将 exchange 和 queue 关联在了一起。更形象的表示,如:queue 对 exchange 中的 message 感兴趣。
bindings 可以携带一个 routingKey 参数。为了避免和 basic_publish 的参数弄混,我们称之它为 binding_key. 我们像下面这样创建一个 binding
channel.queueBind(queueName,EXCHANGE_NAME,”black”);
binding key 的作用要看 exchange 的类型,对于 fanout 类型的 exchange,binding key 是直接忽略的。
Direct Exchange
在之前的日志系统中,message 会推送到所有的消费者去。我们想让系统依据 message 的日志级别进行过滤。比如一个消费者只接收严重级别的日志。
fanout 无法帮我们实现这样的功能,它只是无脑的进行广播。
我们使用 direct 类型的 exchange,它的路由算法是非常简单的 – 只要 message 的 routing_key 和 bind 的 binding_key 相同即进行转发。
为了进行说明,像下图这么来设置如图,可以看到有两个 queue 绑到了类型为 direct 的 exchange 上。第一个 queue 绑定用了 orange 这个 binding key,第二个则用了 black 和 green 两个 binding key。
那么结果就是有 routing key 为 orange 的 message 路由到了 Q1. 而 routing key 为 black 和 green 的 message 则路由到了 Q2,其他的消息则被丢弃了。
Multiple Bindings
若使用相同的 binding key 将多个 queue 绑定到 exchange 上,就和 fanout 的行为一样了,message 会广播到 binding key 相同的 queue 去。如图的设置中,一个 routing key 为 black 的 message 就会同时发送到 Q1 和 Q2。
Emitting logs
我们将在我们的日志系统上应用这个模型,使用 direct 类型的 exchange 去替代 fanout 类型的 exchange。提供日志的严重性作为 routing key。接收程序可以选择要接收日志的严重性级别。首先我们创建 exchange
channel.exchangeDeclare(EXCHANGE_NAME, “direct”);
然后就是发送 message
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
我们先假设 severity 取值 info | warning | error
Subscribing
接收 message 和上一章没什么区别,只是需要给各个 severity 创建新的 binding。
String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
开始执行

EmitLogDirect.java 代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

private static final String EXCHANGE_NAME = “direct_logs”;

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, “direct”);

String severity = getSeverity(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(“UTF-8″));
System.out.println(” [x] Sent ‘” + severity + “‘:'” + message + “‘”);
}
}
//..
}
ReceiveLogsDirect.java 代码如下:
import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = “direct_logs”;

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, “direct”);
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1) {
System.err.println(“Usage: ReceiveLogsDirect [info] [warning] [error]”);
System.exit(1);
}

for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(” [*] Waiting for messages. To exit press CTRL+C”);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), “UTF-8″);
System.out.println(” [x] Received ‘” +
delivery.getEnvelope().getRoutingKey() + “‘:'” + message + “‘”);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
编译代码
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果想把 warning 和 error 的日志保存到文件去,那么
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果想把所有的日志打印到控制台,那么
java -cp $CP ReceiveLogsDirect info warning error
发送 error 日志
java -cp $CP EmitLogDirect error “Run.Run. Or it will explode”

正文完
 0