rabbitmq六种工作模式

3.公布订阅模式

即向多个消费者传递同一条信息

1).Exchanges 交换机

RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。

相同,生产者只能向交换机(Exchange)发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗?它应该增加到多个队列中吗?或者它应该被抛弃。这些规定由exchange的类型定义。

有几种可用的替换类型:direct、topic、header和fanout
创立fanout交换机logs: c.exchangeDeclare("logs", "fanout");
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

fanout交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。

2).绑定 Bindings

创立了一个fanout交换机和一个队列。当初咱们须要通知exchange向指定队列发送音讯。exchange和队列之间的关系称为绑定。

//指定的队列,与指定的交换机关联起来//称为绑定 -- binding//第三个参数时 routingKey, 因为是fanout交换机, 这里疏忽 routingKeych.queueBind(queueName, "logs", "");
3).整体代码

1.生产者
最重要的更改是,咱们当初心愿将音讯公布到logs交换机,而不是无名的日志交换机。咱们须要在发送时提供一个routingKey,然而对于fanout交换机类型,该值会被疏忽。

public class Producer {    public static void main(String[] args) throws Exception {        //建设连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定义fanout类型交换机:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //向交换机发送信息 while (true){            System.out.println("输出音讯:"); String msg = new Scanner(System.in).nextLine(); c.basicPublish("logs", "", null, msg.getBytes()); }    }}

2.消费者
如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题;如果还没有消费者在听,咱们能够平安地抛弃这些信息。

public class Consumer {    public static void main(String[] args) throws Exception {        //建设连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //1.定义随机队列 2.定义交换机 3.绑定 //随机命名,非长久,独占,主动删除 String queue = UUID.randomUUID().toString(); c.queueDeclare(queue, false, true, true, null); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //第三个参数对公布订阅模式fanout交换机有效 c.queueBind(queue, "logs", ""); DeliverCallback deliverCallback = new DeliverCallback() {            @Override public void handle(String consumerTag, Delivery message) throws IOException {                String msg = new String(message.getBody()); System.out.println("收到:"+msg); }        }; CancelCallback cancelCallback = new CancelCallback() {            @Override public void handle(String consumerTag) throws IOException {            }        }; //失常的生产数据 c.basicConsume(queue, true, deliverCallback, cancelCallback); }}

4.路由模式

路由模式与订阅模式不同之处在于,咱们将向其增加一个个性—咱们将只订阅所有音讯中的一部分.本文中已增加err/info/warning等报错提醒来示范.

1).绑定 Bindings

绑定是交换机和队列之间的关系。这能够简略地了解为:队列对来自此替换的音讯感兴趣。

绑定能够应用额定的routingKey参数。为了防止与basic_publish参数混同,咱们将其称为bindingKey。这是咱们如何创立一个键绑定:

ch.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey的含意取决于交换机类型。咱们后面应用的fanout交换机齐全疏忽它。

2).直连交换机 Direct exchange

上一节中的日志零碎向所有消费者播送所有音讯。咱们心愿扩大它,容许依据音讯的严重性过滤音讯。

后面咱们应用的是fanout交换机,这并没有给咱们太多的灵活性——它只能进行简略的播送。

咱们将用直连交换机(Direct exchange)代替。它背地的路由算法很简略——消息传递到bindingKey与routingKey齐全匹配的队列。

3).多重绑定 Multiple bindings

应用雷同的bindingKey绑定多个队列是齐全容许的。能够应用binding key "black"将X与Q1和Q2绑定。在这种状况下,直连交换机的行为相似于fanout,并将音讯播送给所有匹配的队列。一条路由键为black的音讯将同时发送到Q1和Q2。

4).更改

1.发送音讯
咱们将提供日志级别作为routingKey,这样,接管程序将可能抉择它心愿接管的级别

//参数1: 交换机名//参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning"//参数3: 其余配置属性//参数4: 公布的音讯数据 ch.basicPublish("direct_logs", "error", null, message.getBytes());

2.接管音讯
咱们将为感兴趣的每个日志级别创立一个新的绑定

ch.queueBind(queueName, "logs", "info");ch.queueBind(queueName, "logs", "warning");
5).残缺代码

1.生产者

public class Producer {    public static void main(String[] args) throws Exception {        //建设连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定义fanout类型交换机:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //向交换机发送信息 while (true){            System.out.println("输出音讯:"); String msg = new Scanner(System.in).nextLine(); System.out.println("输出路由键:"); String key = new Scanner(System.in).nextLine(); c.basicPublish("direct_logs", key, //路由键关键词 null, msg.getBytes()); }    }}

2.消费者

public class Consumer {    public static void main(String[] args) throws Exception {        //建设连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //1.定义随机队列 2.定义交换机 3.绑定 //随机命名,非长久,独占,主动删除 String queue = UUID.randomUUID().toString(); c.queueDeclare(queue, false, true, true, null); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //用输出绑定键进行绑定 System.out.println("输出绑定键,用空格隔开:"); String s = new Scanner(System.in).nextLine(); String[] a = s.split(" "); //["aaa","bbb","ccc"] for (String key:a){            c.queueBind(queue, "direct_logs", key); }        DeliverCallback deliverCallback = new DeliverCallback() {            @Override public void handle(String consumerTag, Delivery message) throws IOException {                String msg = new String(message.getBody()); String key = message.getEnvelope().getRoutingKey(); System.out.println("收到:"+msg+" - "+key); }        }; CancelCallback cancelCallback = new CancelCallback() {            @Override public void handle(String consumerTag) throws IOException {            }        }; //失常的生产数据 c.basicConsume(queue, true, deliverCallback, cancelCallback); }}