乐趣区

rabbitMq常用创建消息应用的maven demo项目(一)—路由routing

提醒:本文最后更新于 2018-08-27 17:57,文中所关联的信息可能已发生改变,请知悉!

rabbitmq 官网上提供了 6 个 demo,分别从是 hello world、工作队列、发布 / 订阅、路由、主题、rpc 这六个 demo。
基本上看完这 6 哥 demo 之后,对 rabbitmq 应该就有了清晰的认识,并且可以达到基本数量应用的程度。

下面我挑选最常用的路由和主题这两个 demo,为大家翻译下。个人加谷歌翻译,有不合适的地方,欢迎大家批评指正。

Routing— 路由

在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。

在本教程中,我们将在他的基础上添加一个功能 – 只订阅一部分消息。例如,我们只将严重错误的消息导入日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在前面的例子中,我们已经创建了绑定。您可能会回想一下代码:

channel.queueBind(queueName,EXCHANGE_NAME,“”);

binding 是 exchange 和 queue 之间的桥梁。可以简单地理解为:queue 对来自该 exchange 的消息感兴趣。

bindings 可以采用额外的 routingKey 参数。为了避免与 basic_publish 参数混淆,我们将其称为 binding key。下面就是我们如何使用一个 key 创建一个 bindings:

channel.queueBind(queueName,EXCHANGE_NAME,“black”);

binding key 的含义取决于 exchange 的类型。但此 key 对于 exchange 为 fanout 的类型无效。(因为 fanout 类型的 exchange 是将消息发给全部 queue)

direct exchange

我们之前教程的日志记录系统将所有消息广播给所有消费者。我们希望将其扩展一个功能:可以根据消息的严重性进行过滤。例如,我们可能需要一个将日志消息中仅仅是严重错误的写入磁盘,而不会在 warn 或 info 级别的日志消息中浪费磁盘空间。

我们正在使用一个fanout exchange,这没有给我们很大的灵活性 – 它只能进行盲目的广播。

我们将使用 direct exchangedirect exchange 背后的路由算法很简单 – 消息进入队列,其 绑定密钥 与消息的 路由密钥 完全匹配。

为了说明这一点,请参考下面的图:

 
在这个图中,我们可以看到有两个队列绑定的 direct exchange X. 第一个队列用 bindingKey:orange 绑定,第二个队列有两个绑定,一个 bindingKey 为black,另一个为green

在这种图中,使用 routing 将 orange 发布到 exchange 的消息 将被路由到队列 Q1。带有black 或green 路由键的消息将进入Q2。所有其他消息将被丢弃。

多个绑定 multiply bindings

使用相同的 bindingKey 绑定多个 queue 是完全合法的。在我们的例子中,我们可以使用绑定键 black 添加 XQ1之间的绑定。在这种情况下,直接 交换就像 fanout 类型 一样,将消息广播到所有匹配的队列。带有路由键 black 的 消息将传送到 Q1Q2

发布日志 Emitting Logs

我们将把这个模型用于我们的日志系统。这次我们不用 fanout 的 exchange, 而是将消息发送到direck exchange。我们将日志严重级别作为key。这样接收程序将能够选择想要接收的严重程度。我们先关注发布日志。

与往常一样,我们需要先创建一个 exchange:

channel.exchangeDeclare(EXCHANGE_NAME,“direct”);

我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME,severity,null,message.getBytes());

为了简化,我们将假设“严重级别”可以是 ’info’,’warning’,’error’ 之一。

订阅 Subscribing

接收消息的方式与上一个教程中的一样,只有一个例外 – 我们将为每个我们感兴趣的严重级别创建一个新绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity:argv){channel.queueBind(queueName,EXCHANGE_NAME,severity);}

把它放在一起

EmitLogDirect.java类的代码:
import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();}
    //..
}

ReceiveLogsDirect.java 的代码:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

接下来官网里面是用 java 命令分别运行这两个 class 文件。

我们如果是在 ide 中的话,直接运行即可(如果直接 run 的话,需要配置下参数)

退出移动版