5. 主题模式
在路由模式中, 应用 Direct 交换机,从而能够选择性接管日志。
尽管应用 Direct 交换机改良了咱们的零碎,但它不能基于多个规范进行路由。
这是不足灵活性的, 要在日志零碎中实现这一点,咱们须要理解更简单的 Topic 交换机。
主题交换机 Topic exchange
发送到 Topic 交换机的音讯, 它的的 routingKey, 必须是由点分隔的多个单词。单词能够是任何货色,但通常是与音讯相干的一些个性。
routingKey 能够有任意多的单词,最多 255 个字节。
bindingKey 也必须采纳雷同的模式。Topic 交换机的逻辑与直连交换机相似——应用特定 routingKey 发送的音讯将被传递到所有应用匹配 bindingKey 绑定的队列。bindingKey 有两个重要的非凡点:
*
能够通配单个单词。#
能够通配零个或多个单词。
如图所示:
如上图中, 将 routingKey 设置为 ”quick.orange.rabbit
“ 的音讯将被发送到两个队列。音讯 “lazy.orange.elephant
“也发送到它们两个。另外”quick.orange.fox
“只会发到第一个队列,”lazy.brown.fox
“只发给第二个。”lazy.pink.rabbit
“将只被传递到第二个队列一次,即便它匹配两个绑定。”quick.brown.fox
“ 不匹配任何绑定,因而将被抛弃。
生产者
public class Test1 {public static void main(String[] args) throws Exception {ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
// 参数 1: 交换机名
// 参数 2: 交换机类型
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
while (true) {System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
if ("exit".contentEquals(msg)) {break;}
System.out.print("输出 routingKey:");
String routingKey = new Scanner(System.in).nextLine();
// 参数 1: 交换机名
// 参数 2: routingKey, 路由键, 这里咱们用日志级别, 如 "error","info","warning"
// 参数 3: 其余配置属性
// 参数 4: 公布的音讯数据
ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
System.out.println("音讯已发送:"+routingKey+"-"+msg);
}
c.close();}
}
消费者
public class Test2 {public static void main(String[] args) throws Exception {ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
// 主动生成对列名,
// 非长久, 独占, 主动删除
String queueName = ch.queueDeclare().getQueue();
System.out.println("输出 bindingKey, 用空格隔开:");
String[] a = new Scanner(System.in).nextLine().split("\\s");
// 把该队列, 绑定到 topic_logs 交换机
// 容许应用多个 bindingKey
for (String bindingKey : a) {ch.queueBind(queueName, "topic_logs", bindingKey);
}
System.out.println("期待接收数据");
// 收到音讯后用来解决音讯的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody(), "UTF-8");
String routingKey = message.getEnvelope().getRoutingKey();
System.out.println("收到:"+routingKey+"-"+msg);
}
};
// 消费者勾销时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
ch.basicConsume(queueName, true, callback, cancel);
}
}
6.RPC 模式
该模式不常应用, 也较为简单, 理解即可.
RPC 模式顾名思义也就是近程调用模式.
RabbitMQ 去搭建一个 RPC 零碎:一个客户端和一个能够降级 (扩大) 的 RPC 服务器
总结:
RPC 的工作形式是这样的:
- 对于 RPC 申请,客户端发送一条带有两个属性的音讯:replyTo, 设置为仅为申请创立的匿名独占队列, 和 correlationId, 设置为每个申请的惟一 id 值。
- 申请被发送到 rpc_queue 队列。
- RPC 工作过程 (即: 服务器) 在队列上期待申请。当一个申请呈现时,它执行工作, 并应用 replyTo 字段中的队列将后果发回客户机。
- 客户机在回应音讯队列上期待数据。当音讯呈现时,它查看 correlationId 属性。如果匹配申请中的值,则向程序返回该响应数据。