@[toc]
明天这篇文章比较简单,来和小伙伴们分享一下 RabbitMQ 的七种消息传递模式。一起来看看。

大部分状况下,咱们可能都是在 Spring Boot 或者 Spring Cloud 环境下应用 RabbitMQ,因而本文我也次要从这两个方面来和大家分享 RabbitMQ 的用法。

1. RabbitMQ 架构简介

一图胜千言,如下:

这张图中波及到如下一些概念:

  1. 生产者(Publisher):公布音讯到 RabbitMQ 中的交换机(Exchange)上。
  2. 交换机(Exchange):和生产者建设连贯并接管生产者的音讯。
  3. 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的音讯。
  4. 队列(Queue):Exchange 将音讯散发到指定的 Queue,Queue 和消费者进行交互。
  5. 路由(Routes):交换机转发音讯到队列的规定。

2. 筹备工作

大家晓得,RabbitMQ 是 AMQP 营垒里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因而首先创立 Spring Boot 我的项目并增加该依赖,如下:

我的项目创立胜利后,在 application.properties 中配置 RabbitMQ 的根本连贯信息,如下:

spring.rabbitmq.host=localhostspring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=5672

接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的音讯生产者提交的音讯都会交由 Exchange 进行再调配,Exchange 会依据不同的策略将音讯散发到不同的 Queue 中。

RabbitMQ 官网介绍了如下几种音讯散发的模式:



这里给出了七种,其中第七种是音讯确认,音讯确认这块松哥之前发过相干的文章,传送门:

  • 四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?
  • RabbitMQ 高可用之如何确保音讯胜利生产

所以这里我次要和大家介绍前六种音讯收发形式。

3. 音讯收发

3.1 Hello World

咦?这个咋没有交换机?这个其实是默认的交换机,咱们须要提供一个生产者一个队列以及一个消费者。音讯流传图如下:

来看看代码实现:

先来看看队列的定义:

@Configurationpublic class HelloWorldConfig {    public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";    @Bean    Queue queue1() {        return new Queue(HELLO_WORLD_QUEUE_NAME);    }}

再来看看音讯消费者的定义:

@Componentpublic class HelloWorldConsumer {    @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)    public void receive(String msg) {        System.out.println("msg = " + msg);    }}

音讯发送:

@SpringBootTestclass RabbitmqdemoApplicationTests {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    void contextLoads() {        rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");    }}

这个时候应用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将音讯队列绑定到一个 DirectExchange 上,当一条音讯达到 DirectExchange 时会被转发到与该条音讯 routing key 雷同的 Queue 上,例如音讯队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的音讯会被该音讯队列接管。

3.2 Work queues

这种状况是这样的:

一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:

一个队列对应了多个消费者,默认状况下,由队列对音讯进行平均分配,音讯会被分到不同的消费者手中。消费者能够配置各自的并发能力,进而进步音讯的生产能力,也能够配置手动 ack,来决定是否要生产某一条音讯。

先来看并发能力的配置,如下:

@Componentpublic class HelloWorldConsumer {    @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)    public void receive(String msg) {        System.out.println("receive = " + msg);    }    @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")    public void receive2(String msg) {        System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());    }}

能够看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去生产音讯。

启动我的项目,在 RabbitMQ 后盾也能够看到一共有 11 个消费者。

此时,如果生产者发送 10 条音讯,就会一下都被生产掉。

音讯发送形式如下:

@SpringBootTestclass RabbitmqdemoApplicationTests {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    void contextLoads() {        for (int i = 0; i < 10; i++) {            rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");        }    }}

音讯生产日志如下:

能够看到,音讯都被第一个消费者生产了。然而小伙伴们须要留神,事件并不总是这样(多试几次就能够看到差别),音讯也有可能被第一个消费者生产(只是因为第二个消费者有十个线程一起开动,所以第二个消费者生产的音讯占比更大)。

当然音讯消费者也能够开启手动 ack,这样能够自行决定是否生产 RabbitMQ 发来的音讯,配置手动 ack 的形式如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

生产代码如下:

@Componentpublic class HelloWorldConsumer {    @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)    public void receive(Message message,Channel channel) throws IOException {        System.out.println("receive="+message.getPayload());        channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);    }    @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")    public void receive2(Message message, Channel channel) throws IOException {        System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());        channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);    }}

此时第二个消费者回绝了所有音讯,第一个消费者生产了所有音讯。

这就是 Work queues 这种状况。

3.3 Publish/Subscrite

再来看公布订阅模式,这种状况是这样:

一个生产者,多个消费者,每一个消费者都有本人的一个队列,生产者没有将音讯间接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的音讯通过交换机,达到队列,实现一个音讯被多个消费者获取的目标。须要留神的是,如果将音讯发送到一个没有队列绑定的 Exchange下面,那么该音讯将会失落,这是因为在 RabbitMQ 中 Exchange 不具备存储音讯的能力,只有队列具备存储音讯的能力,如下图:

这种状况下,咱们有四种交换机可供选择,别离是:

  • Direct
  • Fanout
  • Topic
  • Header

我别离来给大家举一个简略例子看下。

3.3.1 Direct

DirectExchange 的路由策略是将音讯队列绑定到一个 DirectExchange 上,当一条音讯达到 DirectExchange 时会被转发到与该条音讯 routing key 雷同的 Queue 上,例如音讯队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的音讯会被该音讯队列接管。DirectExchange 的配置如下:

@Configurationpublic class RabbitDirectConfig {    public final static String DIRECTNAME = "javaboy-direct";    @Bean    Queue queue() {        return new Queue("hello-queue");    }    @BeanDirectExchange directExchange() {        return new DirectExchange(DIRECTNAME, true, false);    }    @Bean    Binding binding() {        return BindingBuilder.bind(queue())                .to(directExchange()).with("direct");    }}
  • 首先提供一个音讯队列Queue,而后创立一个DirectExchange对象,三个参数别离是名字,重启后是否仍然无效以及长期未用时是否删除。
  • 创立一个Binding对象将Exchange和Queue绑定在一起。
  • DirectExchange和Binding两个Bean的配置能够省略掉,即如果应用DirectExchange,能够只配置一个Queue的实例即可。

再来看看消费者:

@Componentpublic class DirectReceiver {    @RabbitListener(queues = "hello-queue")    public void handler1(String msg) {        System.out.println("DirectReceiver:" + msg);    }}

通过 @RabbitListener 注解指定一个办法是一个音讯生产办法,办法参数就是所接管到的音讯。而后在单元测试类中注入一个 RabbitTemplate 对象来进行音讯发送,如下:

@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    public void directTest() {        rabbitTemplate.convertAndSend("hello-queue", "hello direct!");    }}

最终执行后果如下:

3.3.2 Fanout

FanoutExchange 的数据交换策略是把所有达到 FanoutExchange 的音讯转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置形式如下:

@Configurationpublic class RabbitFanoutConfig {    public final static String FANOUTNAME = "sang-fanout";    @Bean    FanoutExchange fanoutExchange() {        return new FanoutExchange(FANOUTNAME, true, false);    }    @Bean    Queue queueOne() {        return new Queue("queue-one");    }    @Bean    Queue queueTwo() {        return new Queue("queue-two");    }    @Bean    Binding bindingOne() {        return BindingBuilder.bind(queueOne()).to(fanoutExchange());    }    @Bean    Binding bindingTwo() {        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());    }}

在这里首先创立 FanoutExchange,参数含意与创立 DirectExchange 参数含意统一,而后创立两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创立两个消费者,如下:

@Componentpublic class FanoutReceiver {    @RabbitListener(queues = "queue-one")    public void handler1(String message) {        System.out.println("FanoutReceiver:handler1:" + message);    }    @RabbitListener(queues = "queue-two")    public void handler2(String message) {        System.out.println("FanoutReceiver:handler2:" + message);    }}

两个消费者别离生产两个音讯队列中的音讯,而后在单元测试中发送音讯,如下:

@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    public void fanoutTest() {        rabbitTemplate        .convertAndSend(RabbitFanoutConfig.FANOUTNAME,                 null, "hello fanout!");    }}

留神这里发送音讯时不须要 routingkey,指定 exchange 即可,routingkey 能够间接传一个 null

最终执行日志如下:

3.3.3 Topic

TopicExchange 是比较复杂然而也比拟灵便的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当音讯达到 TopicExchange 后,TopicExchange 依据音讯的 routingkey 将音讯路由到一个或者多个 Queue 上。TopicExchange 配置如下:

@Configurationpublic class RabbitTopicConfig {    public final static String TOPICNAME = "sang-topic";    @Bean    TopicExchange topicExchange() {        return new TopicExchange(TOPICNAME, true, false);    }    @Bean    Queue xiaomi() {        return new Queue("xiaomi");    }    @Bean    Queue huawei() {        return new Queue("huawei");    }    @Bean    Queue phone() {        return new Queue("phone");    }    @Bean    Binding xiaomiBinding() {        return BindingBuilder.bind(xiaomi()).to(topicExchange())                .with("xiaomi.#");    }    @Bean    Binding huaweiBinding() {        return BindingBuilder.bind(huawei()).to(topicExchange())                .with("huawei.#");    }    @Bean    Binding phoneBinding() {        return BindingBuilder.bind(phone()).to(topicExchange())                .with("#.phone.#");    }}
  • 首先创立 TopicExchange,参数和后面的统一。而后创立三个 Queue,第一个 Queue 用来存储和 “xiaomi” 无关的音讯,第二个 Queue 用来存储和 “huawei” 无关的音讯,第三个 Queue 用来存储和 “phone” 无关的音讯。
  • 将三个 Queue 别离绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 示意音讯的 routingkey 但凡以 “xiaomi” 结尾的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 示意音讯的 routingkey 但凡以 “huawei” 结尾的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则示意音讯的 routingkey 中但凡蕴含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。

接下来针对三个 Queue 创立三个消费者,如下:

@Componentpublic class TopicReceiver {    @RabbitListener(queues = "phone")    public void handler1(String message) {        System.out.println("PhoneReceiver:" + message);    }    @RabbitListener(queues = "xiaomi")    public void handler2(String message) {        System.out.println("XiaoMiReceiver:"+message);    }    @RabbitListener(queues = "huawei")    public void handler3(String message) {        System.out.println("HuaWeiReceiver:"+message);    }}

而后在单元测试中进行音讯的发送,如下:

@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    public void topicTest() {        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻..");        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻..");        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机..");        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机..");        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻..");    }}

依据 RabbitTopicConfig 中的配置,第一条音讯将被路由到名称为 “xiaomi” 的 Queue 上,第二条音讯将被路由到名为 “huawei” 的 Queue 上,第三条音讯将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,第四条音讯将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,最初一条音讯则将被路由到名为 “phone” 的 Queue 上。

3.3.4 Header

HeadersExchange 是一种应用较少的路由策略,HeadersExchange 会依据音讯的 Header 将音讯路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:

@Configurationpublic class RabbitHeaderConfig {    public final static String HEADERNAME = "javaboy-header";    @Bean    HeadersExchange headersExchange() {        return new HeadersExchange(HEADERNAME, true, false);    }    @Bean    Queue queueName() {        return new Queue("name-queue");    }    @Bean    Queue queueAge() {        return new Queue("age-queue");    }    @Bean    Binding bindingName() {        Map<String, Object> map = new HashMap<>();        map.put("name", "sang");        return BindingBuilder.bind(queueName())                .to(headersExchange()).whereAny(map).match();    }    @Bean    Binding bindingAge() {        return BindingBuilder.bind(queueAge())                .to(headersExchange()).where("age").exists();    }}

这里的配置大部分和后面介绍的一样,差异次要体现的 Binding 的配置上,第一个 bindingName 办法中,whereAny 示意音讯的 Header 中只有有一个 Header 匹配上 map 中的 key/value,就把该音讯路由到名为 “name-queue” 的 Queue 上,这里也能够应用 whereAll 办法,示意音讯的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则示意只有音讯的 Header 中蕴含 age,不论 age 的值是多少,都将音讯路由到名为 “age-queue” 的 Queue 上。

接下来创立两个音讯消费者:

@Componentpublic class HeaderReceiver {    @RabbitListener(queues = "name-queue")    public void handler1(byte[] msg) {        System.out.println("HeaderReceiver:name:"                + new String(msg, 0, msg.length));    }    @RabbitListener(queues = "age-queue")    public void handler2(byte[] msg) {        System.out.println("HeaderReceiver:age:"                + new String(msg, 0, msg.length));    }}

留神这里的参数用 byte 数组接管。而后在单元测试中创立音讯的发送办法,这里音讯的发送也和 routingkey 无关,如下:

@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    public void headerTest() {        Message nameMsg = MessageBuilder                .withBody("hello header! name-queue".getBytes())                .setHeader("name", "sang").build();        Message ageMsg = MessageBuilder                .withBody("hello header! age-queue".getBytes())                .setHeader("age", "99").build();        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);    }}

这里创立两条音讯,两条音讯具备不同的 header,不同 header 的音讯将被发到不同的 Queue 中去。

最终执行成果如下:

3.4 Routing

这种状况是这样:

一个生产者,一个交换机,两个队列,两个消费者,生产者在创立 Exchange 后,依据 RoutingKey 去绑定相应的队列,并且在发送音讯时,指定音讯的具体 RoutingKey 即可。

如下图:

这个就是依照 routing key 来路由音讯,我这里就不再举例子了,大家能够参考 3.3.1 小结。

3.5 Topics

这种状况是这样:

一个生产者,一个交换机,两个队列,两个消费者,生产者创立 Topic 的 Exchange 并且绑定到队列中,这次绑定能够通过 *# 关键字,对指定 RoutingKey 内容,编写时留神格局 xxx.xxx.xxx 去编写。

如下图:

这个我也就不举例啦,后面 3.3.3 大节曾经举过例子了,不再赘述。

3.6 RPC

RPC 这种音讯收发模式,松哥前两天刚刚写了文章和大家介绍,这里就不多说了,传送门:

  • SpringBoot+RabbitMQ 实现 RPC 调用

3.7 Publisher Confirms

这种发送确认松哥之前有写过相干文章,传送门:

  • 四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?
  • RabbitMQ 高可用之如何确保音讯胜利生产

4. 小结

好啦,明天这篇文章次要是和小伙伴们整顿了 RabbitMQ 中音讯收发的七种模式,感兴趣的小伙伴能够试试哦~

公众号【江南一点雨】后盾回复 rabbitmqdemo,获取本文案例地址~