共计 7950 个字符,预计需要花费 20 分钟才能阅读完成。
1.Exchange 的概念
2.Exchange 的绑定
3.Exchange 的 Fanout
4.Exchange 的 direct
5.Exchange 的 Topics
1.Exchange 的概念
在咱们之前练习 mq 的根本应用的时候,咱们发送一个信息,看起来是间接把音讯发送给了队列,其实,Rabbitmq 消息传递模型的核心思想是:生产者生产的音讯不会发送给队列,而是发送给交换机。交换机工作的内容也非常简略,一方面承受来自生产者的音讯,另一方面将他们推入队列。交换机晓得如何精确地解决音讯,到底是发送给一个或者多个队列,还是丢掉,都由交换机决定。
所以看看之前咱们写的代码,咱们是把音讯发送给了一个无名的交换机
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
第一个参数就是交换机的名称 ,空字符串示意无名交换机,音讯能路由到哪个队列中是由 routingKey(bindingkey) 绑定 key 指定的。
2.Exchange 的绑定
因为交换机晓得要把音讯发送给哪个队列,然而咱们须要将队列和路由器进行绑定。
绑定的形式有 Fanout(播送),Direct exchange(全匹配),Topics(主题匹配)。
3.Exchange 的 Fanout
Fanout 类型的 exchange 简略粗犷,就是 播送,将受到的所有音讯播送到它晓得的所有队列中。
消费者 1:
public class Consumer1 {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个长期的队列 队列的名称是随机的
* 当消费者断开和该队列的连贯时 队列主动删除
*/
String queueName = channel.queueDeclare().getQueue();
// 交换机和队列绑定,因为是 fanout 类型,所以无需路由键
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("消费者 1 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 1 控制台接管并打印消息:"+message);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
消费者 2:
public class Consumer2 {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个长期的队列 队列的名称是随机的
* 当消费者断开和该队列的连贯时 队列主动删除
*/
String queueName = channel.queueDeclare().getQueue();
// 交换机和队列绑定,因为是 fanout 类型,所以无需路由键
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("消费者 2 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 2 控制台接管并打印消息:"+message);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
生产者:
public class Producer {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输出信息");
while (sc.hasNext()) {String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者收回音讯" + message);
}
}
}
}
咱们间断发送十条音讯:
消费者 1 和消费者 2 同时受到了音讯:
胜利进行轮询散发。
4.Exchange 的 direct
咱们在下面演示的是把音讯播送给全副消费者,然而咱们心愿做一些扭转,比方 只发给一些特定的队列,有一些队列不发 。Fanout 这种形式对咱们来说 灵活性不是很高。
所以咱们采取 direct 的形式,绑定 key 和队列一一对应,这样咱们就能够依据绑定 key 来抉择要发送的队列。
然而如果k1 和 k2 雷同,就有点像 fanout 的感觉了,也达到了轮询散发的成果。
消费者 1 代码:
public class Consumer1 {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/**
* 生成一个长期的队列 队列的名称是随机的
* 当消费者断开和该队列的连贯时 队列主动删除
*/
String queueName = channel.queueDeclare().getQueue();
// 交换机和队列绑定,因为是 fanout 类型,所以无需路由键
channel.queueBind(queueName,EXCHANGE_NAME,"k1");
System.out.println("消费者 1 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 1 控制台接管并打印消息:"+message);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
消费者 2 代码:
public class Consumer2 {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/**
* 生成一个长期的队列 队列的名称是随机的
* 当消费者断开和该队列的连贯时 队列主动删除
*/
String queueName = channel.queueDeclare().getQueue();
// 交换机和队列绑定,因为是 fanout 类型,所以无需路由键
channel.queueBind(queueName,EXCHANGE_NAME,"k2");
System.out.println("消费者 2 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 2 控制台接管并打印消息:"+message);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
生产者代码:
public class Producer {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner sc = new Scanner(System.in);
System.out.println("请输出信息");
while (sc.hasNext()) {String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "k1", null, message.getBytes("UTF-8"));
System.out.println("生产者收回音讯" + message);
}
}
}
}
生产者发送音讯:
消费者 1 有音讯,消费者 2 无音讯:
5.Exchange 的 Topics
之前咱们从播送散发的 fanout 交换机,到了可抉择的 direct 交换机,从而 实现有选择性地对音讯进行发送和生产。
然而,新的需要又产生了,假如当初有一个日志零碎,咱们对多台服务器用队列进行发送日志,进行保留。
假如当初有了一个子分类,info.feign,info.service,咱们只想要保留 info.service 的音讯,不想保留 info.feign 的音讯,咱们就必须要 应用 topic 模式。
topic 交换机的音讯绑定 key 不能随便编写,它必须是一个单词列表,用. 离开。这些单词是任意单词,比方 ”info.user , info.store” 等等。
在这个规定中,还有两条规定:
“*”(星号)能够代替一个单词
“#”(井号)能够代替多个单词
例如:
Q1 绑定的是三个单词的字符串(.orange.)
Q2 绑定的是最初一个单词是 rabbit 的字符串(..rabbit)
第二个绑定关系是 lazy 后有任意个单词的字符串(lazy.#)
所以理论会呈现上面这种状况
绑定 key | 会收到的队列 |
---|---|
quick.orange.rabbit | Q1,Q2 |
lazy.orange.elephant | Q2 |
quick.orange.fox | Q1 |
lazy.brown.fox | Q2 |
lazy.pink.rabbit | Q2 |
quick.brown.fox | Q2 |
quick.orange.male.rabbit | 无队列接管 |
lazy.orange.male.rabbit | Q2 |
消费者 1:
public class Consumer1 {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/**
* 生成一个长期的队列 队列的名称是随机的
* 当消费者断开和该队列的连贯时 队列主动删除
*/
String queueName = channel.queueDeclare().getQueue();
// 交换机和队列绑定,因为是 fanout 类型,所以无需路由键
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("消费者 1 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 1 控制台接管并打印消息:"+message);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
消费者 2:
public class Consumer2 {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/**
* 生成一个长期的队列 队列的名称是随机的
* 当消费者断开和该队列的连贯时 队列主动删除
*/
String queueName = channel.queueDeclare().getQueue();
// 交换机和队列绑定,因为是 fanout 类型,所以无需路由键
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("消费者 2 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 2 控制台接管并打印消息:"+message);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
生产者:
public class Producer {
private final static String EXCHANGE_NAME = "test";
public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Scanner sc = new Scanner(System.in);
System.out.println("请输出信息");
while (sc.hasNext()) {String message = sc.nextLine();
String routingKey = null;
// 当发送的音讯不同,路右键也不同
if(message.equals("1")){routingKey = "1.orange.1";}else if(message.equals("2")){routingKey = "lazy.1.1";}else if(message.equals("3")){routingKey = "lazy.orange.1";}else if(message.equals("4")){routingKey = "orange.1";}
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("生产者收回音讯" + message);
}
}
}
}
演示成果:
依照咱们的冀望:
音讯 1 和 2 都会被各自的队列收到
音讯 3 会被两个队列收到
音讯 4 没有队列收到