主题模式
在上一章我们改进了我们的日志系统,如果使用 fanout 我们只能简单进行广播,而使用 direct 则允许消费者可以进行一定程度的选择。但是 direct 还是有其局限性,其路由不支持多个条件。
在我们的日志系统中,消费者程序可能不止是基于日志的 severity,同时也想基于发送日志的源系统。你可能知道 linux 的 syslog 工具,它就是同时基于 severity(info/warn/crit…)和功能(auth/cron/kern…).
这就提供了很大的灵活性 - 我们想接收来自 cron 的严重错误日志和 kern 的所有日志。
下面我们就使用更复杂的 topic 来改进我们的日志系统。
Topic exchange
发送到 topic 类型 exchange 的 message 不可以具有模糊的 routing_key, 它必须具有以冒号分割的词。就像 ”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit” 等,限制长度 255 字节。
binding key 也采用相似的形势。topic exchange 的逻辑和 direct 相似,通过比较 message 的 routing key 和 bind 的 binding key,来匹配转发的 queue。但是 topic 的 binding 支持通配符:
”*“表示任何一个词
”#“表示 0 或 1 个词
通过上面图示的场景来解释会比较好理解。
例子中我们将发送描述动物的 message。message 会携带 routing key(包含三个词),第一个词表示 speed,第二个表示 color,第三个表示 species”<speed>.<colour>.<species>”.
创建了三个绑定:Q1 的 binding key 是”*.orange.*” Q2 的 binding key 是“*.*.rabbit”和 “lazy.#”.
以文字表述便是:
Q1 关心所有橘色的动物
Q2 关心所有的 rabbit 和所有的 lazy 动物
routing key 为“quick.orange.rabbit” 的 message 会同时发布到这两个 queue。routing key 为 ”lazy.orange.elephant” 的 message 会同时发布到这两个 queue。routing key 为”quick.orange.fox“只会发布到第一个 queue.routing key 为”lazy.brown.fox” 的 message 只会发布到第二个 queue.routing key 为 ”lazy.pink.rabbit” 的 message 虽然满足 Q2 的两个条件,但也只会发布到 Q2 一次。routing key 为 ”quick.brown.fox” 的 message 没有任何匹配,就会被丢失。
如果我们发送的 message 只有一个 word 或者多余三个 word,如 ”orange” 或者 ”quick.orange.male.rabbit” 会发生什么呢?这些 message 不会匹配任何 binding key,均会被丢弃掉。
另外 ”lazy.orange.male.rabbit” 虽然具有四个词,但是会匹配最后的 binding key,而被发送到第二个 queue。
Topic exhange 非常强大,同时可以模仿其他两种类型的 exchange。当 binding key 为 # 时,queue 会接收所有的 message。当 binding key 中没有使用通配符 (* 和 #) 时,topic 的行为和 direct 一致。
开始执行
我们将在日志系统中使用 topic exchange。我们的 routding key 采用两个词 “<facility>.<severity>”.EmitLogTopic.java 的代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = “topic_logs”;
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, “topic”);
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(“UTF-8″));
System.out.println(” [x] Sent ‘” + routingKey + “‘:'” + message + “‘”);
}
}
//..
}
ReceiveLogsTopic.java 的代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = “topic_logs”;
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, “topic”);
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println(“Usage: ReceiveLogsTopic [binding_key]…”);
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(” [*] Waiting for messages. To exit press CTRL+C”);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), “UTF-8″);
System.out.println(” [x] Received ‘” +
delivery.getEnvelope().getRoutingKey() + “‘:'” + message + “‘”);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
编译
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收所有日志
java -cp $CP ReceiveLogsTopic “#”
接收功能 ”kern” 的日志
java -cp $CP ReceiveLogsTopic “kern.*”
接收严重级别日志
java -cp $CP ReceiveLogsTopic “*.critical”
接收者使用两个绑定条件
java -cp $CP ReceiveLogsTopic “kern.*” “*.critical”
发送日志 message
java -cp $CP EmitLogTopic “kern.critical” “A critical kernal error”