共计 6761 个字符,预计需要花费 17 分钟才能阅读完成。
人生终将是场单人旅途,孤单之前是迷茫,孤单过后是成长。
楔子
本篇是音讯队列 RabbitMQ
的第三弹。
RabbitMQ 的入门和 RabbitMQ+SpringBoot 的整合能够点此链接进去回顾,明天要讲的是 RabbitMQ
的交换机。
本篇是了解 RabbitMQ
很重要的一篇,交换机是音讯的第一站,只有了解了交换机的散发模式,咱们能力晓得不同交换机依据什么规定散发音讯,能力明确在面对不同业务需要的时候应采纳哪种交换机。
祝有好播种,先赞后看,高兴有限。
本文代码: 码云地址 GitHub 地址
1. ????Exchange
先来放上简直每篇都要呈现一遍的我画了良久的 RabbitMQ
架构图。
前两篇文中咱们始终没有显式的去应用 Exchange
,都是应用的默认Exchange
,其实Exchange
是一个十分要害的组件,有了它才有了各种音讯散发模式。
我先简略说说 Exchange
有哪几种类型:
- fanout:
Fanout-Exchange
会将它接管到的音讯发往所有与他绑定的 Queue 中。 - direct:
Direct-Exchange
会把它接管到的音讯发往与它有绑定关系且Routingkey
齐全匹配的 Queue 中(默认)。 - topic:
Topic-Exchange
与 Direct-Exchange 类似,不过 Topic-Exchange 不须要全匹配,能够局部匹配,它约定:Routingkey
为一个句点号“.”分隔的字符串(咱们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)。 - header:
Header-Exchange
不依赖于 RoutingKey 或绑定关系来散发音讯,而是依据发送的音讯内容中的 headers 属性进行匹配。此模式曾经不再应用,本文中也不会去讲,大家晓得即可。
本文中咱们次要讲前三种 Exchange
形式,置信凭借着我简练的文字和灵魂的画技给大家好好讲讲,争取老妪能解。
Tip:本文的代码演示间接应用 SpringBoot+RabbitMQ 的模式。
2. ????Fanout-Exchange
先来看看 Fanout-Exchange
,Fanout-Exchange
又称扇形交换机,这个交换机应该是最容易了解的。
Exchange
和 Queue
建设一个绑定关系,Exchange
会分发给所有和它有绑定关系的 Queue
中,绑定了十个 Queue
就把音讯复制十份进行散发。
这种绑定关系为了效率必定都会保护一张表,从算法效率上来说个别是 O(1),所以 Fanout-Exchange
是这几个交换机中 查找须要被散发队列 最快的交换机。
上面是一段代码演示:
@Bean
public Queue fanout1() {return new Queue("fanout1");
}
@Bean
public Queue fanout2() {return new Queue("fanout2");
}
@Bean
public FanoutExchange fanoutExchange() {
// 三个结构参数:name durable autoDelete
return new FanoutExchange("fanoutExchange", false, false);
}
@Bean
public Binding binding1() {return BindingBuilder.bind(fanout1()).to(fanoutExchange());
}
@Bean
public Binding binding2() {return BindingBuilder.bind(fanout2()).to(fanoutExchange());
}
为了清晰明了,我新建了两个演示用的队列,而后建了一个FanoutExchange
,最初给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算实现了。
紧接着编写一下生产者和消费者:
public void sendFanout() {Client client = new Client();
// 应读者要求,当前代码打印的中央都会改成 log 形式,这是一种良好的编程习惯,用 System.out.println 个别是不举荐的。log.info("Message content :" + client);
rabbitTemplate.convertAndSend("fanoutExchange",null,client);
System.out.println("音讯发送结束。");
}
@Test
public void sendFanoutMessage() {rabbitProduce.sendFanout();
}
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {@RabbitListener(queues = "fanout1")
public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("音讯已确认");
}
@RabbitListener(queues = "fanout2")
public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("音讯已确认");
}
}
这两段代码都很好了解,不再赘述,有忘记的能够去看 RabbitMQ 第一弹的内容。
其中发送音讯的代码有三个参数,第一个参数是 Exchange
的名称,第二个参数是 routingKey
的名称,这个参数在扇形交换机外面用不到,在其余两个交换机类型外面会用到。
代码的筹备到此结束,咱们能够运行发送办法之后 run 一下了~
我的项目启动后,咱们能够先来察看一下队列与交换机的绑定关系有没有失效,咱们在 RabbitMQ 控制台应用 rabbitmqctl list_bindings
命令查看绑定关系。
要害局部我用红框标记了起来,这就代表着名叫 fanoutExchange
的交换机绑定着两个队列,一个叫fanout1
,另一个叫fanout2
。
紧接着,咱们来看控制台的打印状况:
能够看到,一条信息发送进来之后,两个队列都接管到了这条音讯,紧接着由咱们的两个消费者生产。
Tip: 如果你的演示利用启动之后没有生产信息,能够尝试从新运行一次生产者的办法发送音讯。
3. ????Direct-Exchange
Direct-Exchange
是一种精准匹配的交换机,咱们之前始终应用默认的交换机,其实默认的交换机就是 Direct 类型。
如果将 Direct 交换机都比作一所公寓的管理员,那么队列就是外面的住户。(绑定关系)
管理员每天都会收到各种各样的函件 (音讯),这些函件的地址不光要表明地址(ExchangeKey) 还须要表明要送往哪一户(routingKey),不然音讯无奈投递。
以上图为例,筹备一条音讯发往名为 SendService
的间接交换机中去,这个交换机次要是用来做发送服务,所以其绑定了两个队列,SMS 队列和 MAIL 队列,用于发送短信和邮件。
咱们的音讯除了指定 ExchangeKey
还须要指定 routingKey
,routingKey
对应着最终要发送的是哪个队列,咱们的示例中的 routingKey
是 sms,这里这条音讯就会交给 SMS 队列。
听了下面这段,可能大家对 routingKey
还不是很了解,咱们上段代码实际一下,大家应该就明确了。
筹备工作:
@Bean
public Queue directQueue1() {return new Queue("directQueue1");
}
@Bean
public Queue directQueue2() {return new Queue("directQueue2");
}
@Bean
public DirectExchange directExchange() {
// 三个结构参数:name durable autoDelete
return new DirectExchange("directExchange", false, false);
}
@Bean
public Binding directBinding1() {return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
}
@Bean
public Binding directBinding2() {return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
}
新建两个队列,新建了一个间接交换机,并设置了绑定关系。
这里的示例代码和下面扇形交换机的代码很像,惟一能够说不同的就是绑定的时候多调用了一个 with
将routingKey
设置了下来。
所以是交换机和队列建设绑定关系的时候设置的 routingKey
,一个音讯达到交换机之后,交换机通过音讯上带来的routingKey
找到本人与队列建设绑定关系时设置的routingKey
,而后将音讯散发到这个队列去。
生产者:
public void sendDirect() {Client client = new Client();
log.info("Message content :" + client);
rabbitTemplate.convertAndSend("directExchange","sms",client);
System.out.println("音讯发送结束。");
}
消费者:
@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {@RabbitListener(queues = "directQueue1")
public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("音讯已确认");
}
@RabbitListener(queues = "directQueue2")
public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("音讯已确认");
}
}
效果图如下:
只有一个消费者进行了音讯,合乎咱们的预期。
4. ????Topic-Exchange
Topic-Exchange
是间接交换机的含糊匹配版本,Topic 类型的交换器,反对应用 ”*” 和 ”#” 通配符定义含糊 bindingKey,而后依照 routingKey
进行含糊匹配队列进行散发。
*
:可能含糊匹配一个单词。#
:可能含糊匹配零个或多个单词。
因为退出了两个通配定义符,所以 Topic 交换机的 routingKey
也有些变动,routingKey
能够应用 .
将单词离开。
这里咱们间接来用一个例子说明会更加的清晰:
筹备工作:
// 主题交换机示例
@Bean
public Queue topicQueue1() {return new Queue("topicQueue1");
}
@Bean
public Queue topicQueue2() {return new Queue("topicQueue2");
}
@Bean
public TopicExchange topicExchange() {
// 三个结构参数:name durable autoDelete
return new TopicExchange("topicExchange", false, false);
}
@Bean
public Binding topicBinding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
}
@Bean
public Binding topicBinding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
}
新建两个队列,新建了一个 Topic 交换机,并设置了绑定关系。
这里的示例代码咱们次要看设置 routingKey
,这里的routingKey
用上了通配符,且两头用 .
隔开,这就代表 topicQueue1
生产 sms
结尾的音讯,topicQueue2
生产 mail
结尾的音讯,具体不同往下看。
生产者:
public void sendTopic() {Client client = new Client();
log.info("Message content :" + client);
rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
System.out.println("音讯发送结束。");
}
消费者:
@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {@RabbitListener(queues = "topicQueue1")
public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("音讯已确认");
}
@RabbitListener(queues = "topicQueue2")
public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("音讯已确认");
}
}
这里咱们的生产者发送的音讯 routingKey
是sms.liantong
,它就会被发到 topicQueue1
队列中去,这里音讯的 routingKey
也须要用 .
隔离开,用其余符号无奈正确辨认。
如果咱们的 routingKey
是sms.123.liantong
,那么它将无奈找到对应的队列,因为 topicQueue1
的含糊匹配用的通配符是 *
而不是 #
,只有#
是能够匹配多个单词的。
Topic-Exchange
和 Direct-Exchange
很类似,我就不再赘述了,通配符 *
和#
的区别也很简略,大家能够本人试一下。
后记
周一没更文切实羞愧,去医院抽血了,抽了三管~,吃多少能力补回来~
RabbitMQ 曾经更新了三篇了,这三篇的内容有些偏根底,下一篇将会更新高级局部内容:包含避免音讯失落,避免音讯反复生产等等内容,心愿大家继续关注。
最近这段时间压力挺大,优狐令我八月底之前降级到三级,所以各位读者的赞对我很重要,心愿大家可能高抬贵手,帮我一哈~
好了,以上就是本期的全部内容,感激你能看到这里,欢送对本文点赞珍藏与评论,???? 你们的每个点赞都是我创作的最大能源。
我是耳朵,一个始终想做常识输入的伪文艺程序员,咱们下期见。
本文代码:码云地址 GitHub 地址