rabbitMq常用创建消息应用的maven demo项目(一)—路由routing

rabbitmq官网上提供了6个demo,分别从是hello world、工作队列、发布/订阅、路由、主题、rpc这六个demo。
基本上看完这6哥demo之后,对rabbitmq应该就有了清晰的认识,并且可以达到基本数量应用的程度。

下面我挑选最常用的路由和主题这两个demo,为大家翻译下。个人加谷歌翻译,有不合适的地方,欢迎大家批评指正。

Routing—路由

在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。

在本教程中,我们将在他的基础上添加一个功能 – 只订阅一部分消息。例如,我们只将严重错误的消息导入日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在前面的例子中,我们已经创建了绑定。您可能会回想一下代码:

[code lang="java"]channel.queueBind(queueName,EXCHANGE_NAME,“”);[/code]

binding是exchange和queue之间的桥梁。可以简单地理解为:queue对来自该exchange的消息感兴趣。

bindings可以采用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为 binding key。下面就是我们如何使用一个key创建一个bindings:

[code lang="java"]channel.queueBind(queueName,EXCHANGE_NAME,“black”);[/code]

binding key的含义取决于exchange的类型。但此key对于exchange为fanout的类型无效。(因为fanout类型的exchange是将消息发给全部queue)

direct exchange

我们之前教程的日志记录系统将所有消息广播给所有消费者。我们希望将其扩展一个功能:可以根据消息的严重性进行过滤。例如,我们可能需要一个将日志消息中仅仅是严重错误的写入磁盘,而不会在warn或info级别的日志消息中浪费磁盘空间。

我们正在使用一个fanout exchange,这没有给我们很大的灵活性 – 它只能进行盲目的广播。

我们将使用direct exchangedirect exchange背后的路由算法很简单 – 消息进入队列,其 绑定密钥与消息的路由密钥完全匹配。

为了说明这一点,请参考下面的图:

 

在这个图中,我们可以看到有两个队列绑定的direct exchange X. 第一个队列用bindingKey:orange绑定,第二个队列有两个绑定,一个bindingKey为black,另一个为green

在这种图中,使用routing将orange发布到exchange的消息 将被路由到队列Q1。带有black 或green路由键的消息将进入Q2。所有其他消息将被丢弃。

多个绑定multiply bindings

使用相同的bindingKey绑定多个queue是完全合法的。在我们的例子中,我们可以使用绑定键black添加XQ1之间的绑定。在这种情况下,直接交换就像fanout类型一样,将消息广播到所有匹配的队列。带有路由键black的消息将传送到 Q1Q2

发布日志Emitting Logs

我们将把这个模型用于我们的日志系统。这次我们不用fanout的exchange,而是将消息发送到direck exchange。我们将日志严重级别作为key。这样接收程序将能够选择想要接收的严重程度。我们先关注发布日志。

与往常一样,我们需要先创建一个exchange:

channel.exchangeDeclare(EXCHANGE_NAME,“direct”);

我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME,severity,null,message.getBytes());

为了简化,我们将假设“严重级别”可以是’info’,’warning’,’error’之一。

订阅Subscribing

接收消息的方式与上一个教程中的一样,只有一个例外 – 我们将为每个我们感兴趣的严重级别创建一个新绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity:argv){
  channel.queueBind(queueName,EXCHANGE_NAME,severity);
}

把它放在一起

EmitLogDirect.java类的代码:

[code lang="java"]
import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}
[/code]

ReceiveLogsDirect.java的代码:

[code lang="java"]
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}
[/code]

接下来官网里面是用java命令分别运行这两个class文件。

我们如果是在ide中的话,直接运行即可(如果直接run的话,需要配置下参数)

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理