人生终将是场单人旅途,孤单之前是迷茫,孤单过后是成长。
楔子
本篇是音讯队列RabbitMQ
的第三弹。
RabbitMQ的入门和RabbitMQ+SpringBoot的整合能够点此链接进去回顾,明天要讲的是RabbitMQ
的交换机。
本篇是了解RabbitMQ
很重要的一篇,交换机是音讯的第一站,只有了解了交换机的散发模式,咱们能力晓得不同交换机依据什么规定散发音讯,能力明确在面对不同业务需要的时候应采纳哪种交换机。
祝有好播种,先赞后看,高兴有限。
本文代码: 码云地址 GitHub地址
1. ????Exchange
先来放上简直每篇都要呈现一遍的我画了良久的RabbitMQ
架构图。
前两篇文中咱们始终没有显式的去应用Exchange
,都是应用的默认Exchange
,其实Exchange
是一个十分要害的组件,有了它才有了各种音讯散发模式。
我先简略说说Exchange
有哪几种类型:
- fanout:
Fanout-Exchange
会将它接管到的音讯发往所有与他绑定的Queue中。 - direct:
Direct-Exchange
会把它接管到的音讯发往与它有绑定关系且Routingkey
齐全匹配的Queue中(默认)。 - topic:
Topic-Exchange
与Direct-Exchange类似,不过Topic-Exchange不须要全匹配,能够局部匹配,它约定:Routingkey
为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。 - header:
Header-Exchange
不依赖于RoutingKey或绑定关系来散发音讯,而是依据发送的音讯内容中的headers属性进行匹配。此模式曾经不再应用,本文中也不会去讲,大家晓得即可。
本文中咱们次要讲前三种Exchange
形式,置信凭借着我简练的文字和灵魂的画技给大家好好讲讲,争取老妪能解。
Tip:本文的代码演示间接应用SpringBoot+RabbitMQ的模式。
2. ????Fanout-Exchange
先来看看Fanout-Exchange
,Fanout-Exchange
又称扇形交换机,这个交换机应该是最容易了解的。
Exchange
和Queue
建设一个绑定关系,Exchange
会分发给所有和它有绑定关系的Queue
中,绑定了十个Queue
就把音讯复制十份进行散发。
这种绑定关系为了效率必定都会保护一张表,从算法效率上来说个别是O(1),所以Fanout-Exchange
是这几个交换机中查找须要被散发队列最快的交换机。
上面是一段代码演示:
@Bean public Queue fanout1() { return new Queue("fanout1"); } @Bean public Queue fanout2() { return new Queue("fanout2"); } @Bean public FanoutExchange fanoutExchange() { // 三个结构参数:name durable autoDelete return new FanoutExchange("fanoutExchange", false, false); } @Bean public Binding binding1() { return BindingBuilder.bind(fanout1()).to(fanoutExchange()); } @Bean public Binding binding2() { return BindingBuilder.bind(fanout2()).to(fanoutExchange()); }
为了清晰明了,我新建了两个演示用的队列,而后建了一个FanoutExchange
,最初给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算实现了。
紧接着编写一下生产者和消费者:
public void sendFanout() { Client client = new Client(); // 应读者要求,当前代码打印的中央都会改成log形式,这是一种良好的编程习惯,用System.out.println个别是不举荐的。 log.info("Message content : " + client); rabbitTemplate.convertAndSend("fanoutExchange",null,client); System.out.println("音讯发送结束。"); } @Test public void sendFanoutMessage() { rabbitProduce.sendFanout(); }
@Slf4j@Component("rabbitFanoutConsumer")public class RabbitFanoutConsumer { @RabbitListener(queues = "fanout1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("音讯已确认"); } @RabbitListener(queues = "fanout2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("音讯已确认"); }}
这两段代码都很好了解,不再赘述,有忘记的能够去看RabbitMQ第一弹的内容。
其中发送音讯的代码有三个参数,第一个参数是Exchange
的名称,第二个参数是routingKey
的名称,这个参数在扇形交换机外面用不到,在其余两个交换机类型外面会用到。
代码的筹备到此结束,咱们能够运行发送办法之后run一下了~
我的项目启动后,咱们能够先来察看一下队列与交换机的绑定关系有没有失效,咱们在RabbitMQ控制台应用rabbitmqctl list_bindings
命令查看绑定关系。
要害局部我用红框标记了起来,这就代表着名叫fanoutExchange
的交换机绑定着两个队列,一个叫fanout1
,另一个叫fanout2
。
紧接着,咱们来看控制台的打印状况:
能够看到,一条信息发送进来之后,两个队列都接管到了这条音讯,紧接着由咱们的两个消费者生产。
Tip: 如果你的演示利用启动之后没有生产信息,能够尝试从新运行一次生产者的办法发送音讯。
3. ????Direct-Exchange
Direct-Exchange
是一种精准匹配的交换机,咱们之前始终应用默认的交换机,其实默认的交换机就是Direct类型。
如果将Direct交换机都比作一所公寓的管理员,那么队列就是外面的住户。(绑定关系)
管理员每天都会收到各种各样的函件(音讯),这些函件的地址不光要表明地址(ExchangeKey)还须要表明要送往哪一户(routingKey),不然音讯无奈投递。
以上图为例,筹备一条音讯发往名为SendService
的间接交换机中去,这个交换机次要是用来做发送服务,所以其绑定了两个队列,SMS队列和MAIL队列,用于发送短信和邮件。
咱们的音讯除了指定ExchangeKey
还须要指定routingKey
,routingKey
对应着最终要发送的是哪个队列,咱们的示例中的routingKey
是sms,这里这条音讯就会交给SMS队列。
听了下面这段,可能大家对routingKey
还不是很了解,咱们上段代码实际一下,大家应该就明确了。
筹备工作:
@Bean public Queue directQueue1() { return new Queue("directQueue1"); } @Bean public Queue directQueue2() { return new Queue("directQueue2"); } @Bean public DirectExchange directExchange() { // 三个结构参数:name durable autoDelete return new DirectExchange("directExchange", false, false); } @Bean public Binding directBinding1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms"); } @Bean public Binding directBinding2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail"); }
新建两个队列,新建了一个间接交换机,并设置了绑定关系。
这里的示例代码和下面扇形交换机的代码很像,惟一能够说不同的就是绑定的时候多调用了一个with
将routingKey
设置了下来。
所以是交换机和队列建设绑定关系的时候设置的routingKey
,一个音讯达到交换机之后,交换机通过音讯上带来的routingKey
找到本人与队列建设绑定关系时设置的routingKey
,而后将音讯散发到这个队列去。
生产者:
public void sendDirect() { Client client = new Client(); log.info("Message content : " + client); rabbitTemplate.convertAndSend("directExchange","sms",client); System.out.println("音讯发送结束。"); }
消费者:
@Slf4j@Component("rabbitDirectConsumer")public class RabbitDirectConsumer { @RabbitListener(queues = "directQueue1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("音讯已确认"); } @RabbitListener(queues = "directQueue2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("音讯已确认"); }}
效果图如下:
只有一个消费者进行了音讯,合乎咱们的预期。
4. ????Topic-Exchange
Topic-Exchange
是间接交换机的含糊匹配版本,Topic类型的交换器,反对应用"*"和"#"通配符定义含糊bindingKey,而后依照routingKey
进行含糊匹配队列进行散发。
*
:可能含糊匹配一个单词。#
:可能含糊匹配零个或多个单词。
因为退出了两个通配定义符,所以Topic交换机的routingKey
也有些变动,routingKey
能够应用.
将单词离开。
这里咱们间接来用一个例子说明会更加的清晰:
筹备工作:
// 主题交换机示例 @Bean public Queue topicQueue1() { return new Queue("topicQueue1"); } @Bean public Queue topicQueue2() { return new Queue("topicQueue2"); } @Bean public TopicExchange topicExchange() { // 三个结构参数:name durable autoDelete return new TopicExchange("topicExchange", false, false); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#"); }
新建两个队列,新建了一个Topic交换机,并设置了绑定关系。
这里的示例代码咱们次要看设置routingKey
,这里的routingKey
用上了通配符,且两头用.
隔开,这就代表topicQueue1
生产sms
结尾的音讯,topicQueue2
生产mail
结尾的音讯,具体不同往下看。
生产者:
public void sendTopic() { Client client = new Client(); log.info("Message content : " + client); rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client); System.out.println("音讯发送结束。"); }
消费者:
@Slf4j@Component("rabbitTopicConsumer")public class RabbitTopicConsumer { @RabbitListener(queues = "topicQueue1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("音讯已确认"); } @RabbitListener(queues = "topicQueue2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("音讯已确认"); }}
这里咱们的生产者发送的音讯routingKey
是sms.liantong
,它就会被发到topicQueue1
队列中去,这里音讯的routingKey
也须要用.
隔离开,用其余符号无奈正确辨认。
如果咱们的routingKey
是sms.123.liantong
,那么它将无奈找到对应的队列,因为topicQueue1
的含糊匹配用的通配符是*
而不是#
,只有#
是能够匹配多个单词的。
Topic-Exchange
和Direct-Exchange
很类似,我就不再赘述了,通配符*
和#
的区别也很简略,大家能够本人试一下。
后记
周一没更文切实羞愧,去医院抽血了,抽了三管~,吃多少能力补回来~
RabbitMQ曾经更新了三篇了,这三篇的内容有些偏根底,下一篇将会更新高级局部内容:包含避免音讯失落,避免音讯反复生产等等内容,心愿大家继续关注。
最近这段时间压力挺大,优狐令我八月底之前降级到三级,所以各位读者的赞对我很重要,心愿大家可能高抬贵手,帮我一哈~
好了,以上就是本期的全部内容,感激你能看到这里,欢送对本文点赞珍藏与评论,????你们的每个点赞都是我创作的最大能源。
我是耳朵,一个始终想做常识输入的伪文艺程序员,咱们下期见。
本文代码:码云地址 GitHub地址