共计 4841 个字符,预计需要花费 13 分钟才能阅读完成。
rabbitmq 六种工作模式
3. 公布订阅模式
即向多个消费者传递同一条信息
1).Exchanges 交换机
RabbitMQ 消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。
相同,生产者只能向交换机 (Exchange) 发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗? 它应该增加到多个队列中吗? 或者它应该被抛弃。这些规定由 exchange 的类型定义。
有几种可用的替换类型:direct、topic、header 和 fanout。
创立 fanout 交换机 logs: c.exchangeDeclare("logs", "fanout");
或c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
fanout 交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。
2). 绑定 Bindings
创立了一个 fanout 交换机和一个队列。当初咱们须要通知 exchange 向指定队列发送音讯。exchange 和队列之间的关系称为绑定。
// 指定的队列, 与指定的交换机关联起来 | |
// 称为绑定 -- binding | |
// 第三个参数时 routingKey, 因为是 fanout 交换机, 这里疏忽 routingKey | |
ch.queueBind(queueName, "logs", ""); |
3). 整体代码
1. 生产者
最重要的更改是,咱们当初心愿将音讯公布到 logs 交换机,而不是无名的日志交换机。咱们须要在发送时提供一个 routingKey,然而对于 fanout 交换机类型,该值会被疏忽。
public class Producer {public static void main(String[] args) throws Exception { | |
// 建设连贯 | |
ConnectionFactory f = new ConnectionFactory(); | |
f.setHost("192.168.64.140"); | |
f.setPort(5672); | |
f.setUsername("admin"); | |
f.setPassword("admin"); | |
Connection con = f.newConnection(); | |
Channel c = con.createChannel(); | |
// 定义 fanout 类型交换机:logs | |
//c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); | |
// 向交换机发送信息 | |
while (true){System.out.println("输出音讯:"); | |
String msg = new Scanner(System.in).nextLine(); | |
c.basicPublish("logs", | |
"", | |
null, msg.getBytes()); | |
} | |
} | |
} |
2. 消费者
如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题; 如果还没有消费者在听,咱们能够平安地抛弃这些信息。
public class Consumer {public static void main(String[] args) throws Exception { | |
// 建设连贯 | |
ConnectionFactory f = new ConnectionFactory(); | |
f.setHost("192.168.64.140"); | |
f.setPort(5672); | |
f.setUsername("admin"); | |
f.setPassword("admin"); | |
Connection con = f.newConnection(); | |
Channel c = con.createChannel(); | |
//1. 定义随机队列 2. 定义交换机 3. 绑定 | |
// 随机命名, 非长久, 独占, 主动删除 | |
String queue = UUID.randomUUID().toString(); | |
c.queueDeclare(queue, | |
false, true, true, null); | |
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); | |
// 第三个参数对公布订阅模式 fanout 交换机有效 | |
c.queueBind(queue, "logs", ""); | |
DeliverCallback deliverCallback = new DeliverCallback() { | |
@Override | |
public void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody()); | |
System.out.println("收到:"+msg); | |
} | |
}; | |
CancelCallback cancelCallback = new CancelCallback() { | |
@Override | |
public void handle(String consumerTag) throws IOException {}}; | |
// 失常的生产数据 | |
c.basicConsume(queue, | |
true, deliverCallback, | |
cancelCallback); | |
} | |
} |
4. 路由模式
路由模式与订阅模式不同之处在于, 咱们将向其增加一个个性—咱们将只订阅所有音讯中的一部分. 本文中已增加 err/info/warning 等报错提醒来示范.
1). 绑定 Bindings
绑定是交换机和队列之间的关系。这能够简略地了解为: 队列对来自此替换的音讯感兴趣。
绑定能够应用额定的 routingKey 参数。为了防止与 basic_publish 参数混同,咱们将其称为 bindingKey。这是咱们如何创立一个键绑定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey 的含意取决于交换机类型。咱们后面应用的 fanout 交换机齐全疏忽它。
2). 直连交换机 Direct exchange
上一节中的日志零碎向所有消费者播送所有音讯。咱们心愿扩大它,容许依据音讯的严重性过滤音讯。
后面咱们应用的是 fanout 交换机,这并没有给咱们太多的灵活性——它只能进行简略的播送。
咱们将用直连交换机 (Direct exchange) 代替。它背地的路由算法很简略——消息传递到 bindingKey 与 routingKey 齐全匹配的队列。
3). 多重绑定 Multiple bindings
应用雷同的 bindingKey 绑定多个队列是齐全容许的。能够应用 binding key “black” 将 X 与 Q1 和 Q2 绑定。在这种状况下,直连交换机的行为相似于 fanout,并将音讯播送给所有匹配的队列。一条路由键为 black 的音讯将同时发送到 Q1 和 Q2。
4). 更改
1. 发送音讯
咱们将提供日志级别作为 routingKey, 这样, 接管程序将可能抉择它心愿接管的级别
// 参数 1: 交换机名 | |
// 参数 2: routingKey, 路由键, 这里咱们用日志级别, 如 "error","info","warning" | |
// 参数 3: 其余配置属性 | |
// 参数 4: 公布的音讯数据 | |
ch.basicPublish("direct_logs", "error", null, message.getBytes()); |
2. 接管音讯
咱们将为感兴趣的每个日志级别创立一个新的绑定
ch.queueBind(queueName, "logs", "info"); | |
ch.queueBind(queueName, "logs", "warning"); |
5). 残缺代码
1. 生产者
public class Producer {public static void main(String[] args) throws Exception { | |
// 建设连贯 | |
ConnectionFactory f = new ConnectionFactory(); | |
f.setHost("192.168.64.140"); | |
f.setPort(5672); | |
f.setUsername("admin"); | |
f.setPassword("admin"); | |
Connection con = f.newConnection(); | |
Channel c = con.createChannel(); | |
// 定义 fanout 类型交换机:logs | |
//c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); | |
// 向交换机发送信息 | |
while (true){System.out.println("输出音讯:"); | |
String msg = new Scanner(System.in).nextLine(); | |
System.out.println("输出路由键:"); | |
String key = new Scanner(System.in).nextLine(); | |
c.basicPublish("direct_logs", | |
key, // 路由键关键词 | |
null, | |
msg.getBytes()); | |
} | |
} | |
} |
2. 消费者
public class Consumer {public static void main(String[] args) throws Exception { | |
// 建设连贯 | |
ConnectionFactory f = new ConnectionFactory(); | |
f.setHost("192.168.64.140"); | |
f.setPort(5672); | |
f.setUsername("admin"); | |
f.setPassword("admin"); | |
Connection con = f.newConnection(); | |
Channel c = con.createChannel(); | |
//1. 定义随机队列 2. 定义交换机 3. 绑定 | |
// 随机命名, 非长久, 独占, 主动删除 | |
String queue = UUID.randomUUID().toString(); | |
c.queueDeclare(queue, | |
false, true, true, null); | |
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); | |
// 用输出绑定键进行绑定 | |
System.out.println("输出绑定键, 用空格隔开:"); | |
String s = new Scanner(System.in).nextLine(); | |
String[] a = s.split(""); //["aaa","bbb","ccc"] | |
for (String key:a){c.queueBind(queue, "direct_logs", key); | |
} | |
DeliverCallback deliverCallback = new DeliverCallback() { | |
@Override | |
public void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody()); | |
String key = message.getEnvelope().getRoutingKey(); | |
System.out.println("收到:"+msg+"-"+key); | |
} | |
}; | |
CancelCallback cancelCallback = new CancelCallback() { | |
@Override | |
public void handle(String consumerTag) throws IOException {}}; | |
// 失常的生产数据 | |
c.basicConsume(queue, | |
true, deliverCallback, | |
cancelCallback); | |
} | |
} |