乐趣区

关于java:RabbitMQ教程-4路由Routing

为不便更好交换,可关注公众号:Java 课代表,每日一更,等你来呦!

4 路由(Routing)

在上一篇教程中咱们创立了一个繁难的日志零碎。能够将日志音讯播送给多个接收者。

本教程中,咱们将给它增加一个新个性——让独自订阅某一部分音讯 (子集) 成为可能。比方,咱们只把严重错误写入到磁盘文件中(只保留严重错误日志能够节俭磁盘),同时依然将所有日志都在终端输入。

绑定(Bindings)

在后面的例子中,咱们曾经创立了绑定,你可能还记得如下代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是替换 (exchange) 和队列 (queue) 之间的一种关系。能够简略地了解为:这个队列对来自这个替换的音讯感兴趣。

绑定能够有一个 routingKey参数。为了防止和 basic_publish 办法的参数混同 (课代表注:Channel#queueBind(String queue, String exchange, String routingKey) 办法的 routingKey 参数用于实现绑定关系,而 Channel#basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)办法的 routingKey 用来路由音讯),咱们将称其为binding key

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key 的含意取决于替换的类型。咱们后面应用过的 fanout 类型的替换会疏忽它的值。

Direct 替换(Direct exchange)

咱们后面教程中的日志记录零碎会播送所有音讯给所有消费者。咱们心愿能够依据音讯的紧急性进行过滤。比方,心愿某个利用只记录严重错误到磁盘,不记录正告和信息类日志音讯以节约磁盘。

之前咱们应用的 fanout 替换灵活性并不高——他只是无脑播送。

接下来将应用 direct exchange 代替,direct exchange的路由算法很简略——音讯会进入那个队列的 binding key 与音讯的 routing key 齐全匹配的队列。

为了说明该点,思考如下设置:

在上图的设置中,能够看到 direct exchange X 绑定了两个queue.Q1 应用 orange 作为 binding keyQ2 有两个绑定,一个是black,另一个是green.

在该设置下,应用 orange 作为 routing key 的音讯,发送到 exchange 后将被路由到 Q1. 应用blackgreen作为 routing key 的音讯将被路由到Q2。其余音讯将会被抛弃。

多重绑定(Multiple bindings)

应用雷同的 binding key 绑定到多个 queue 是齐全非法的。在上图中,咱们将 XQ1通过 black 作为 binding key 绑定起来。这样一来,direct exchange 会像 fanout 一样播送音讯到所有匹配的队列,(课代表注:fanout是无脑播送到所有 queuedirect 是发送给所有匹配的queue)。

发送日志(Emitting logs)

咱们将把这个模型利用到日志零碎上。应用 direct exchange 替换 fanout 来发送音讯。咱们将提供日志级别作为routing key。这样一来,接管程序就能够依据他想要的级别来接管音讯。上面先来看一下发送日志。

跟之前一样,首先要创立一个exchange:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

而后就能够发送音讯了:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化起见,咱们假如日志级别分为:’info’, ‘warning’, ‘error’。

订阅(Subscribing)

接管音讯程序和先前教程中的一样失常工作,不过有一点例外,咱们须要为感兴趣的日志级别创立绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

代码整合(Putting it all together)

EmitLogDirect.java 残缺代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println("[x] Sent'" + severity + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsDirect.java 残缺代码:

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println("[*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("[x] Received'" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
  }
}

跟后面一样编译(参考第一节的编译和 classpath 设置)。为了使用方便,咱们在运行例子时应用环境变量 $CP(Windows 下是 %CP%)作为 classpath。

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

如果你只想保留 ‘warning’ 和 ’error’ (不想要 ‘info’) 级别的日志到文件,只须要关上终端,输出:

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果想在屏幕上看到所有音讯,关上新终端,输出:

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

最初作为例子,发送一个 ’error’ 级别的日志音讯:

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(残缺源码参见 (EmitLogDirect.java source) 和 (ReceiveLogsDirect.java source))

持续学习第 5 篇教程,理解如何基于模式来侦听音讯。


【举荐浏览】

RabbitMQ 教程 3. 公布 / 订阅 (Publish/Subscribe)
RabbitMQ 教程 2. 工作队列(Work Queue)
RabbitMQ 教程 1.“Hello World”
Freemarker 教程(一)- 模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
深入浅出 MySQL 优先队列(你肯定会踩到的 order by limit 问题)


码字不易,欢送点赞关注和分享。
搜寻:【Java 课代表】,关注公众号,每日一更,及时获取更多 Java 干货。

退出移动版