提醒:本文最后更新于 2018-08-27 17:57,文中所关联的信息可能已发生改变,请知悉!
rabbitmq 官网上提供了 6 个 demo,分别从是 hello world、工作队列、发布 / 订阅、路由、主题、rpc 这六个 demo。
基本上看完这 6 哥 demo 之后,对 rabbitmq 应该就有了清晰的认识,并且可以达到基本数量应用的程度。
下面我挑选最常用的路由和主题这两个 demo,为大家翻译下。个人加谷歌翻译,有不合适的地方,欢迎大家批评指正。
Routing— 路由
在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。
在本教程中,我们将在他的基础上添加一个功能 – 只订阅一部分消息。例如,我们只将严重错误的消息导入日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定
在前面的例子中,我们已经创建了绑定。您可能会回想一下代码:
channel.queueBind(queueName,EXCHANGE_NAME,“”);
binding 是 exchange 和 queue 之间的桥梁。可以简单地理解为:queue 对来自该 exchange 的消息感兴趣。
bindings 可以采用额外的 routingKey 参数。为了避免与 basic_publish 参数混淆,我们将其称为 binding key。下面就是我们如何使用一个 key 创建一个 bindings:
channel.queueBind(queueName,EXCHANGE_NAME,“black”);
binding key 的含义取决于 exchange 的类型。但此 key 对于 exchange 为 fanout 的类型无效。(因为 fanout 类型的 exchange 是将消息发给全部 queue)
direct exchange
我们之前教程的日志记录系统将所有消息广播给所有消费者。我们希望将其扩展一个功能:可以根据消息的严重性进行过滤。例如,我们可能需要一个将日志消息中仅仅是严重错误的写入磁盘,而不会在 warn 或 info 级别的日志消息中浪费磁盘空间。
我们正在使用一个fanout exchange,这没有给我们很大的灵活性 – 它只能进行盲目的广播。
我们将使用 direct exchange。direct exchange 背后的路由算法很简单 – 消息进入队列,其 绑定密钥 与消息的 路由密钥 完全匹配。
为了说明这一点,请参考下面的图:
在这种图中,使用 routing 将 orange 发布到 exchange 的消息 将被路由到队列 Q1。带有black 或green 路由键的消息将进入Q2。所有其他消息将被丢弃。
多个绑定 multiply bindings
发布日志 Emitting Logs
我们将把这个模型用于我们的日志系统。这次我们不用 fanout 的 exchange, 而是将消息发送到direck exchange。我们将日志严重级别作为key。这样接收程序将能够选择想要接收的严重程度。我们先关注发布日志。
与往常一样,我们需要先创建一个 exchange:
channel.exchangeDeclare(EXCHANGE_NAME,“direct”);
我们准备发送一条消息:
channel.basicPublish(EXCHANGE_NAME,severity,null,message.getBytes());
为了简化,我们将假设“严重级别”可以是 ’info’,’warning’,’error’ 之一。
订阅 Subscribing
接收消息的方式与上一个教程中的一样,只有一个例外 – 我们将为每个我们感兴趣的严重级别创建一个新绑定。
String queueName = channel.queueDeclare().getQueue();
for(String severity:argv){channel.queueBind(queueName,EXCHANGE_NAME,severity);}
把它放在一起
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();}
//..
}
ReceiveLogsDirect.java 的代码:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
接下来官网里面是用 java 命令分别运行这两个 class 文件。
我们如果是在 ide 中的话,直接运行即可(如果直接 run 的话,需要配置下参数)