关于后端:上手了RabbitMQ再来看看它的交换机Exchange吧

38次阅读

共计 6761 个字符,预计需要花费 17 分钟才能阅读完成。

人生终将是场单人旅途,孤单之前是迷茫,孤单过后是成长。

楔子

本篇是音讯队列 RabbitMQ 的第三弹。

RabbitMQ 的入门和 RabbitMQ+SpringBoot 的整合能够点此链接进去回顾,明天要讲的是 RabbitMQ 的交换机。

本篇是了解 RabbitMQ 很重要的一篇,交换机是音讯的第一站,只有了解了交换机的散发模式,咱们能力晓得不同交换机依据什么规定散发音讯,能力明确在面对不同业务需要的时候应采纳哪种交换机。


祝有好播种,先赞后看,高兴有限。

本文代码: 码云地址 GitHub 地址

1. ????Exchange

先来放上简直每篇都要呈现一遍的我画了良久的 RabbitMQ 架构图。

前两篇文中咱们始终没有显式的去应用 Exchange,都是应用的默认Exchange,其实Exchange 是一个十分要害的组件,有了它才有了各种音讯散发模式。

我先简略说说 Exchange 有哪几种类型:

  1. fanoutFanout-Exchange会将它接管到的音讯发往所有与他绑定的 Queue 中。
  2. directDirect-Exchange会把它接管到的音讯发往与它有绑定关系且 Routingkey 齐全匹配的 Queue 中(默认)。
  3. topicTopic-Exchange与 Direct-Exchange 类似,不过 Topic-Exchange 不须要全匹配,能够局部匹配,它约定:Routingkey为一个句点号“.”分隔的字符串(咱们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)。
  4. headerHeader-Exchange不依赖于 RoutingKey 或绑定关系来散发音讯,而是依据发送的音讯内容中的 headers 属性进行匹配。此模式曾经不再应用,本文中也不会去讲,大家晓得即可。

本文中咱们次要讲前三种 Exchange 形式,置信凭借着我简练的文字和灵魂的画技给大家好好讲讲,争取老妪能解。

Tip:本文的代码演示间接应用 SpringBoot+RabbitMQ 的模式。

2. ????Fanout-Exchange

先来看看 Fanout-ExchangeFanout-Exchange 又称扇形交换机,这个交换机应该是最容易了解的。

ExchangeQueue 建设一个绑定关系,Exchange会分发给所有和它有绑定关系的 Queue 中,绑定了十个 Queue 就把音讯复制十份进行散发。

这种绑定关系为了效率必定都会保护一张表,从算法效率上来说个别是 O(1),所以 Fanout-Exchange 是这几个交换机中 查找须要被散发队列 最快的交换机。


上面是一段代码演示:

    @Bean
    public Queue fanout1() {return new Queue("fanout1");
    }

    @Bean
    public Queue fanout2() {return new Queue("fanout2");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 三个结构参数:name durable autoDelete
        return new FanoutExchange("fanoutExchange", false, false);
    }

    @Bean
    public Binding binding1() {return BindingBuilder.bind(fanout1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {return BindingBuilder.bind(fanout2()).to(fanoutExchange());
    }

为了清晰明了,我新建了两个演示用的队列,而后建了一个FanoutExchange,最初给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算实现了。

紧接着编写一下生产者和消费者:

    public void sendFanout() {Client client = new Client();

        // 应读者要求,当前代码打印的中央都会改成 log 形式,这是一种良好的编程习惯,用 System.out.println 个别是不举荐的。log.info("Message content :" + client);

        rabbitTemplate.convertAndSend("fanoutExchange",null,client);
        System.out.println("音讯发送结束。");
    }

    @Test
    public void sendFanoutMessage() {rabbitProduce.sendFanout();
    }
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {@RabbitListener(queues = "fanout1")
    public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("音讯已确认");
    }

    @RabbitListener(queues = "fanout2")
    public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("音讯已确认");
    }

}

这两段代码都很好了解,不再赘述,有忘记的能够去看 RabbitMQ 第一弹的内容。

其中发送音讯的代码有三个参数,第一个参数是 Exchange 的名称,第二个参数是 routingKey 的名称,这个参数在扇形交换机外面用不到,在其余两个交换机类型外面会用到。

代码的筹备到此结束,咱们能够运行发送办法之后 run 一下了~

我的项目启动后,咱们能够先来察看一下队列与交换机的绑定关系有没有失效,咱们在 RabbitMQ 控制台应用 rabbitmqctl list_bindings 命令查看绑定关系。

要害局部我用红框标记了起来,这就代表着名叫 fanoutExchange 的交换机绑定着两个队列,一个叫fanout1,另一个叫fanout2

紧接着,咱们来看控制台的打印状况:

能够看到,一条信息发送进来之后,两个队列都接管到了这条音讯,紧接着由咱们的两个消费者生产。

Tip: 如果你的演示利用启动之后没有生产信息,能够尝试从新运行一次生产者的办法发送音讯。

3. ????Direct-Exchange

Direct-Exchange是一种精准匹配的交换机,咱们之前始终应用默认的交换机,其实默认的交换机就是 Direct 类型。

如果将 Direct 交换机都比作一所公寓的管理员,那么队列就是外面的住户。(绑定关系)

管理员每天都会收到各种各样的函件 (音讯),这些函件的地址不光要表明地址(ExchangeKey) 还须要表明要送往哪一户(routingKey),不然音讯无奈投递。

以上图为例,筹备一条音讯发往名为 SendService 的间接交换机中去,这个交换机次要是用来做发送服务,所以其绑定了两个队列,SMS 队列和 MAIL 队列,用于发送短信和邮件。

咱们的音讯除了指定 ExchangeKey 还须要指定 routingKeyroutingKey 对应着最终要发送的是哪个队列,咱们的示例中的 routingKey 是 sms,这里这条音讯就会交给 SMS 队列。


听了下面这段,可能大家对 routingKey 还不是很了解,咱们上段代码实际一下,大家应该就明确了。

筹备工作:

    @Bean
    public Queue directQueue1() {return new Queue("directQueue1");
    }

    @Bean
    public Queue directQueue2() {return new Queue("directQueue2");
    }

    @Bean
    public DirectExchange directExchange() {
        // 三个结构参数:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }

    @Bean
    public Binding directBinding1() {return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding directBinding2() {return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
    }

新建两个队列,新建了一个间接交换机,并设置了绑定关系。

这里的示例代码和下面扇形交换机的代码很像,惟一能够说不同的就是绑定的时候多调用了一个 withroutingKey设置了下来。

所以是交换机和队列建设绑定关系的时候设置的 routingKey,一个音讯达到交换机之后,交换机通过音讯上带来的routingKey 找到本人与队列建设绑定关系时设置的routingKey,而后将音讯散发到这个队列去。

生产者:

    public void sendDirect() {Client client = new Client();

        log.info("Message content :" + client);

        rabbitTemplate.convertAndSend("directExchange","sms",client);
        System.out.println("音讯发送结束。");
    }

消费者:

@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {@RabbitListener(queues = "directQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("音讯已确认");
    }

    @RabbitListener(queues = "directQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("音讯已确认");
    }

}

效果图如下:

只有一个消费者进行了音讯,合乎咱们的预期。

4. ????Topic-Exchange

Topic-Exchange是间接交换机的含糊匹配版本,Topic 类型的交换器,反对应用 ”*” 和 ”#” 通配符定义含糊 bindingKey,而后依照 routingKey 进行含糊匹配队列进行散发。

  • *:可能含糊匹配一个单词。
  • #:可能含糊匹配零个或多个单词。

因为退出了两个通配定义符,所以 Topic 交换机的 routingKey 也有些变动,routingKey能够应用 . 将单词离开。


这里咱们间接来用一个例子说明会更加的清晰:

筹备工作:

    // 主题交换机示例
    @Bean
    public Queue topicQueue1() {return new Queue("topicQueue1");
    }

    @Bean
    public Queue topicQueue2() {return new Queue("topicQueue2");
    }

    @Bean
    public TopicExchange topicExchange() {
        // 三个结构参数:name durable autoDelete
        return new TopicExchange("topicExchange", false, false);
    }

    @Bean
    public Binding topicBinding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
    }

    @Bean
    public Binding topicBinding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
    }

新建两个队列,新建了一个 Topic 交换机,并设置了绑定关系。

这里的示例代码咱们次要看设置 routingKey,这里的routingKey 用上了通配符,且两头用 . 隔开,这就代表 topicQueue1 生产 sms 结尾的音讯,topicQueue2生产 mail 结尾的音讯,具体不同往下看。

生产者:

    public void sendTopic() {Client client = new Client();

        log.info("Message content :" + client);

        rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
        System.out.println("音讯发送结束。");
    }

消费者:

@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {@RabbitListener(queues = "topicQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("音讯已确认");
    }

    @RabbitListener(queues = "topicQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content :" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("音讯已确认");
    }

}

这里咱们的生产者发送的音讯 routingKeysms.liantong,它就会被发到 topicQueue1 队列中去,这里音讯的 routingKey 也须要用 . 隔离开,用其余符号无奈正确辨认。

如果咱们的 routingKeysms.123.liantong,那么它将无奈找到对应的队列,因为 topicQueue1 的含糊匹配用的通配符是 * 而不是 #,只有# 是能够匹配多个单词的。

Topic-ExchangeDirect-Exchange 很类似,我就不再赘述了,通配符 *#的区别也很简略,大家能够本人试一下。

后记

周一没更文切实羞愧,去医院抽血了,抽了三管~,吃多少能力补回来~

RabbitMQ 曾经更新了三篇了,这三篇的内容有些偏根底,下一篇将会更新高级局部内容:包含避免音讯失落,避免音讯反复生产等等内容,心愿大家继续关注。


最近这段时间压力挺大,优狐令我八月底之前降级到三级,所以各位读者的赞对我很重要,心愿大家可能高抬贵手,帮我一哈~

好了,以上就是本期的全部内容,感激你能看到这里,欢送对本文点赞珍藏与评论,???? 你们的每个点赞都是我创作的最大能源。

我是耳朵,一个始终想做常识输入的伪文艺程序员,咱们下期见。

本文代码:码云地址 GitHub 地址

正文完
 0