浅谈RabbitMQ

8次阅读

共计 3253 个字符,预计需要花费 9 分钟才能阅读完成。

简介

RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

核心概念

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

整体模型架构图

Producer(生产者) 和 Consumer(消费者)

  • Producer(生产者): 生产消息的一方(邮件投递者)
  • Consumer(消费者): 消费消息的一方(邮件收件人)

RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)

Exchange(交换器)

在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。

RabbitMQ 的 Exchange(交换器) 有 4 种类型,不同的类型对应着不同的路由策略:direct(默认),fanout, topic, 和 headers,不同类型的 Exchange 转发消息的策略有所区别。

Exchange(交换器) 示意图如下:

生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个RoutingKey 需要与交换器类型和绑定键(BindingKey) 联合使用才能最终生效

RabbitMQ 中通过 Binding(绑定)Exchange(交换器)Queue(消息队列) 关联起来,在绑定的时候一般会指定一个BindingKey(绑定键), 这样 RabbitMQ 就知道如何正确将消息路由到队列了, 如下图所示

生产者将消息发送给交换器时,需要一个 RoutingKey, 当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的** 情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。

Queue(消息队列)

Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免的消息被重复消费。

RabbitMQ不支持队列层面的广播消费, 如果有广播消费的需求,需要在其上进行二次开发, 这样会很麻烦,不建议这样做。

Exchange Types(交换器类型)

RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种。

fanout

fanout 类型的 Exchange 路由规则非常简单,它会把 所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。

direct

direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。

以上图为例,如果发送消息的时候设置路由键为“warning”, 那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为 ”Info”或者 ”debug”,消息只会路由到 Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。

direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

topic

前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

  • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如“com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
  • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
  • BindingKey 中可以存在两种特殊字符串“_”和“#”,用于做模糊匹配,其中“_”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

以上图为例:

  • 路由键为“com.rabbitmq.client”的消息会同时路由到 Queuel 和 Queue2;
  • 路由键为“com.hidden.client”的消息只会路由到 Queue2 中;
  • 路由键为“com.hidden.demo”的消息只会路由到 Queue2 中;
  • 路由键为“java.rabbitmq.demo”的消息只会路由到 Queuel 中;
  • 路由键为“java.util.concurrent”的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。

headers(不推荐)

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式)’ 对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

项目实战

接下来讲讲我在秒杀项目中是如何使用 RabbitMQ 的

  • 绑定队列
@Configuration
public class MQConfig {

    public static final String MIAOSHA_QUEUE = "miaosha.queue";
 
    /**
     * Direct 模式 交换机 Exchange(其它模式感兴趣的自己去了解一下)* */
    @Bean
    public Queue queue() {return new Queue(MIAOSHA_QUEUE, true);//true 表示重启后会重新连接
    }
 }   
  • 生产者(调用 AmqpTemplate 类的 convertAndSend 发送消息到路由器)
@Service
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMiaoshaMessage(MiaoshaMessage mm) {String msg = RedisService.beanToString(mm);
        amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
    }
}
  • 消费者(使用 @RabbitListener 接受消息)
@Service
public class MQReceiver {@RabbitListener(queues = MQConfig.MIAOSHA_QUEUE)
    public void receive(String message) {// 执行秒杀}

参考

RabbitMQ 入门

正文完
 0