rabbitmq官网上提供了6个demo,分别从是hello world、工作队列、发布/订阅、路由、主题、rpc这六个demo。基本上看完这6哥demo之后,对rabbitmq应该就有了清晰的认识,并且可以达到基本数量应用的程度。下面我挑选最常用的路由和主题这两个demo,为大家翻译下。个人加谷歌翻译,有不合适的地方,欢迎大家批评指正。Routing—路由在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。在本教程中,我们将在他的基础上添加一个功能 - 只订阅一部分消息。例如,我们只将严重错误的消息导入日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定
在前面的例子中,我们已经创建了绑定。您可能会回想一下代码:
[code lang="java"]channel.queueBind(queueName,EXCHANGE_NAME,“”);[/code]
binding是exchange和queue之间的桥梁。可以简单地理解为:queue对来自该exchange的消息感兴趣。bindings可以采用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为 binding key。下面就是我们如何使用一个key创建一个bindings:
[code lang="java"]channel.queueBind(queueName,EXCHANGE_NAME,“black”);[/code]
binding key的含义取决于exchange的类型。但此key对于exchange为fanout的类型无效。(因为fanout类型的exchange是将消息发给全部queue)
direct exchange
我们之前教程的日志记录系统将所有消息广播给所有消费者。我们希望将其扩展一个功能:可以根据消息的严重性进行过滤。例如,我们可能需要一个将日志消息中仅仅是严重错误的写入磁盘,而不会在warn或info级别的日志消息中浪费磁盘空间。我们正在使用一个fanout exchange,这没有给我们很大的灵活性 - 它只能进行盲目的广播。我们将使用direct exchange。direct exchange背后的路由算法很简单 - 消息进入队列,其 绑定密钥与消息的路由密钥完全匹配。为了说明这一点,请参考下面的图:

在这个图中,我们可以看到有两个队列绑定的direct exchange X. 第一个队列用bindingKey:orange绑定,第二个队列有两个绑定,一个bindingKey为black,另一个为green。在这种图中,使用routing将orange发布到exchange的消息 将被路由到队列Q1。带有black 或green路由键的消息将进入Q2。所有其他消息将被丢弃。
多个绑定multiply bindings

使用相同的bindingKey绑定多个queue是完全合法的。在我们的例子中,我们可以使用绑定键black添加X和Q1之间的绑定。在这种情况下,直接交换就像fanout类型一样,将消息广播到所有匹配的队列。带有路由键black的消息将传送到 Q1和Q2。
发布日志Emitting Logs
我们将把这个模型用于我们的日志系统。这次我们不用fanout的exchange,而是将消息发送到direck exchange。我们将日志严重级别作为key。这样接收程序将能够选择想要接收的严重程度。我们先关注发布日志。与往常一样,我们需要先创建一个exchange:
channel.exchangeDeclare(EXCHANGE_NAME,“direct”);
我们准备发送一条消息:
channel.basicPublish(EXCHANGE_NAME,severity,null,message.getBytes());
为了简化,我们将假设“严重级别”可以是’info’,‘warning’,’error’之一。
订阅Subscribing
接收消息的方式与上一个教程中的一样,只有一个例外 - 我们将为每个我们感兴趣的严重级别创建一个新绑定。
String queueName = channel.queueDeclare().getQueue();for(String severity:argv){ channel.queueBind(queueName,EXCHANGE_NAME,severity);}
把它放在一起

EmitLogDirect.java类的代码:
[code lang="java"]import com.rabbitmq.client.*;import java.io.IOException;public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //..}[/code]
ReceiveLogsDirect.java的代码:
[code lang="java"]import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }}[/code]
接下来官网里面是用java命令分别运行这两个class文件。我们如果是在ide中的话,直接运行即可(如果直接run的话,需要配置下参数)
