关于后端:一文搞懂RabbitMQ

45次阅读

共计 9197 个字符,预计需要花费 23 分钟才能阅读完成。

本文曾经收录到 github 仓库,此仓库用于分享 Java 相干常识总结,包含 Java 根底、MySQL、Spring Boot、MyBatis、Redis、RabbitMQ、计算机网络、数据结构与算法等等,欢送大家提 pr 和 star!

github 地址:https://github.com/Tyson0314/…

如果 github 拜访不了,能够拜访 gitee 仓库。

gitee 地址:https://gitee.com/tysondai/Ja…

文章目录:

简介

RabbitMQ 是一个由 erlang 开发的音讯队列。音讯队列用于利用间的异步合作。

基本概念

Message:由音讯头和音讯体组成。音讯体是不通明的,而音讯头则由一系列的可选属性组成,这些属性包含 routing-key、priority、delivery-mode(是否持久性存储)等。

Publisher:音讯的生产者。

Exchange:接管音讯并将音讯路由到一个或多个 Queue。default exchange 是默认的直连交换机,名字为空字符串,每个新建队列都会主动绑定到默认交换机上,绑定的路由键名称与队列名称雷同。

Binding:通过 Binding 将 Exchange 和 Queue 关联,这样 Exchange 就晓得将音讯路由到哪个 Queue 中。

Queue:存储音讯,队列的个性是先进先出。一个音讯可散发到一个或多个队列。

Virtual host:每个 vhost 实质上就是一个 mini 版的 RabbitMQ 服务器,领有本人的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的根底,必须在连贯时指定,RabbitMQ 默认的 vhost 是 /。当多个不同的用户应用同一个 RabbitMQ server 提供的服务时,能够划分出多个 vhost,每个用户在本人的 vhost 创立 exchange 和 queue。

Broker:音讯队列服务器实体。

什么时候应用 MQ

对于一些不须要立刻失效的操作,能够拆分进去,异步执行,应用音讯队列实现。

以常见的订单零碎为例,用户点击下单按钮之后的业务逻辑可能包含:扣减库存、生成相应单据、发短信告诉。这种场景下就能够用 MQ。将短信告诉放到 MQ 异步执行,在下单的主流程(比方扣减库存、生成相应单据)实现之后发送一条音讯到 MQ,让主流程疾速完结,而由另外的线程生产 MQ 的音讯。

优缺点

毛病:应用 erlang 实现,不利于二次开发和保护;性能较 kafka 差,长久化音讯和 ACK 确认的状况下生产和生产音讯单机吞吐量大概在 1 - 2 万左右,kafka 单机吞吐量在十万级别。

长处:有治理界面,方便使用;可靠性高;功能丰富,反对音讯长久化、音讯确认机制、多种音讯散发机制。

Exchange 类型

Exchange 散发音讯时依据类型的不同散发策略不同,目前共四种类型:direct、fanout、topic、headers。headers 模式依据音讯的 headers 进行路由,此外 headers 交换器和 direct 交换器完全一致,但性能差很多。

Exchange 规定。

类型名称 类型形容
fanout 把所有发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中
direct Routing Key==Binding Key
topic 含糊匹配
headers Exchange 不依赖于 routing key 与 binding key 的匹配规定来路由音讯,而是依据发送的音讯内容中的 header 属性进行匹配。

direct

direct 替换机会将音讯路由到 binding key 和 routing key 齐全匹配的队列中。它是齐全匹配、单播的模式。

fanout

所有发到 fanout 类型交换机的音讯都会路由到所有与该交换机绑定的队列下来。fanout 类型转发音讯是最快的。

topic

topic 交换机应用 routing key 和 binding key 进行含糊匹配,匹配胜利则将音讯发送到相应的队列。routing key 和 binding key 都是句点号“.”分隔的字符串,binding key 中能够存在两种特殊字符“*”与“#”,用于做含糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词。

headers

headers 交换机是依据发送的音讯内容中的 headers 属性进行路由的。在绑定 Queue 与 Exchange 时指定一组键值对;当音讯发送到 Exchange 时,RabbitMQ 会取到该音讯的 headers(也是一个键值对的模式),比照其中的键值对是否齐全匹配 Queue 与 Exchange 绑定时指定的键值对;如果齐全匹配则音讯会路由到该 Queue,否则不会路由到该 Queue。

音讯失落

音讯失落场景:生产者生产音讯到 RabbitMQ Server 音讯失落、RabbitMQ Server 存储的音讯失落和 RabbitMQ Server 到消费者音讯失落。

音讯失落从三个方面来解决:生产者确认机制、消费者手动确认音讯和长久化。

生产者确认机制

生产者发送音讯到队列,无奈确保发送的音讯胜利的达到 server。

解决办法:

  1. 事务机制。在一条音讯发送之后会使发送端阻塞,期待 RabbitMQ 的回应,之后能力持续发送下一条音讯。性能差。
  2. 开启生产者确认机制,只有音讯胜利发送到交换机之后,RabbitMQ 就会发送一个 ack 给生产者(即便音讯没有 Queue 接管,也会发送 ack)。如果音讯没有胜利发送到交换机,就会发送一条 nack 音讯,提醒发送失败。

在 Springboot 是通过 publisher-confirms 参数来设置 confirm 模式:

spring:
    rabbitmq:   
        #开启 confirm 确认机制
        publisher-confirms: true

在生产端提供一个回调办法,当服务端确认了一条或者多条音讯后,生产者会回调这个办法,依据具体的后果对音讯进行后续解决,比方从新发送、记录日志等。

// 音讯是否胜利发送到 Exchange
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {log.info("correlationData:" + correlationData);
            log.info("ack:" + ack);
            if(!ack) {log.info("异样解决....");
            }
    };

rabbitTemplate.setConfirmCallback(confirmCallback);

路由不可达音讯

生产者确认机制只确保音讯正确达到交换机,对于从交换机路由到 Queue 失败的音讯,会被抛弃掉,导致音讯失落。

对于不可路由的音讯,有两种解决形式:Return 音讯机制和备份交换机。

Return 音讯机制

Return 音讯机制提供了回调函数 ReturnCallback,当音讯从交换机路由到 Queue 失败才会回调这个办法。须要将 mandatory 设置为 true,能力监听到路由不可达的音讯。

spring:
    rabbitmq:
        #触发 ReturnCallback 必须设置 mandatory=true, 否则 Exchange 没有找到 Queue 就会抛弃掉音讯, 而不会触发 ReturnCallback
        template.mandatory: true

通过 ReturnCallback 监听路由不可达音讯。

    final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
            log.info("return exchange:" + exchange + ", routingKey:"
                    + routingKey + ", replyCode:" + replyCode + ", replyText:" + replyText);
rabbitTemplate.setReturnCallback(returnCallback);

当音讯从交换机路由到 Queue 失败时,会返回 return exchange: , routingKey: MAIL, replyCode: 312, replyText: NO_ROUTE

备份交换机

备份交换机 alternate-exchange 是一个一般的 exchange,当你发送音讯到对应的 exchange 时,没有匹配到 queue,就会主动转移到备份交换机对应的 queue,这样音讯就不会失落。

消费者手动音讯确认

有可能消费者收到音讯还没来得及解决 MQ 服务就宕机了,导致音讯失落。因为音讯者默认采纳主动 ack,一旦消费者收到音讯后会告诉 MQ Server 这条音讯曾经解决好了,MQ 就会移除这条音讯。

解决办法:消费者设置为手动确认音讯。消费者解决完逻辑之后再给 broker 回复 ack,示意音讯曾经胜利生产,能够从 broker 中删除。当音讯者生产失败的时候,给 broker 回复 nack,依据配置决定从新入队还是从 broker 移除,或者进入死信队列。只有没收到消费者的 acknowledgment,broker 就会始终保留着这条音讯,但不会 requeue,也不会调配给其余 消费者。

消费者设置手动 ack:

# 设置生产端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

音讯解决完,手动确认:

    @RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE)
    public void onMessage(Message message, Channel channel) throws IOException {

        try {Thread.sleep(5000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 手工 ack;第二个参数是 multiple,设置为 true,示意 deliveryTag 序列号之前(包含本身)的音讯都曾经收到,设为 false 则示意收到一条音讯
        channel.basicAck(deliveryTag, true);
        System.out.println("mail listener receive:" + new String(message.getBody()));
    }

当音讯生产失败时,生产端给 broker 回复 nack,如果 consumer 设置了 requeue 为 false,则 nack 后 broker 会删除音讯或者进入死信队列,否则音讯会从新入队。

长久化

如果 RabbitMQ 服务异样导致重启,将会导致音讯失落。RabbitMQ 提供了长久化的机制,将内存中的音讯长久化到硬盘上,即便重启 RabbitMQ,音讯也不会失落。

音讯长久化须要满足以下条件:

  1. 音讯设置长久化。公布音讯前,设置投递模式 delivery mode 为 2,示意音讯须要长久化。
  2. Queue 设置长久化。
  3. 交换机设置长久化。

当公布一条音讯到交换机上时,Rabbit 会先把音讯写入长久化日志,而后才向生产者发送响应。一旦从队列中生产了一条音讯的话并且做了确认,RabbitMQ 会在长久化日志中移除这条音讯。在生产音讯前,如果 RabbitMQ 重启的话,服务器会主动重建交换机和队列,加载长久化日志中的音讯到相应的队列或者交换机上,保障音讯不会失落。

镜像队列

当 MQ 产生故障时,会导致服务不可用。引入 RabbitMQ 的镜像队列机制,将 queue 镜像到集群中其余的节点之上。如果集群中的一个节点生效了,能主动地切换到镜像中的另一个节点以保障服务的可用性。

通常每一个镜像队列都蕴含一个 master 和多个 slave,别离对应于不同的节点。发送到镜像队列的所有音讯总是被间接发送到 master 和所有的 slave 之上。除了 publish 外所有动作都只会向 master 发送,而后由 master 将命令执行的后果播送给 slave,从镜像队列中的生产操作实际上是在 master 上执行的。

反复生产

音讯反复的起因有两个:1. 生产时音讯反复,2. 生产时音讯反复。

生产者发送音讯给 MQ,在 MQ 确认的时候呈现了网络稳定,生产者没有收到确认,这时候生产者就会从新发送这条音讯,导致 MQ 会接管到反复音讯。

消费者生产胜利后,给 MQ 确认的时候呈现了网络稳定,MQ 没有接管到确认,为了保障音讯不失落,MQ 就会持续给消费者投递之前的音讯。这时候消费者就接管到了两条一样的音讯。因为反复音讯是因为网络起因造成的,无奈防止。

解决办法:发送音讯时让每个音讯携带一个全局的惟一 ID,在生产音讯时先判断音讯是否曾经被生产过,保障音讯生产逻辑的幂等性。具体生产过程为:

  1. 消费者获取到音讯后先依据 id 去查问 redis/db 是否存在该音讯
  2. 如果不存在,则失常生产,生产结束后写入 redis/db
  3. 如果存在,则证实音讯被生产过,间接抛弃

生产端限流

当 RabbitMQ 服务器积压大量音讯时,队列里的音讯会大量涌入生产端,可能导致生产端服务器奔溃。这种状况下须要对生产端限流。

Spring RabbitMQ 提供参数 prefetch 能够设置单个申请解决的音讯个数。如果消费者同时解决的音讯达到最大值的时候,则该消费者会阻塞,不会生产新的音讯,直到有音讯 ack 才会生产新的音讯。

开启生产端限流:

# 在单个申请中解决的音讯个数,unack 的最大数量
spring.rabbitmq.listener.simple.prefetch=2

原生 RabbitMQ 还提供 prefetchSize 和 global 两个参数。Spring RabbitMQ 没有这两个参数。

// 单条音讯大小限度,0 代表不限度
//global:限度限流性能是 channel 级别的还是 consumer 级别。当设置为 false,consumer 级别,限流性能失效,设置为 true 没有了限流性能,因为 channel 级别尚未实现。void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

死信队列

生产失败的音讯寄存的队列。

音讯生产失败的起因:

  • 音讯被回绝并且音讯没有从新入队(requeue=false)
  • 音讯超时未生产
  • 达到最大队列长度

设置死信队列的 exchange 和 queue,而后进行绑定:

    @Bean
    public DirectExchange dlxExchange() {return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE);
    }

    @Bean
    public Queue dlxQueue() {return new Queue(RabbitMqConfig.DLX_QUEUE, true);
    }

    @Bean
    public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) {return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE);
    }

在一般队列加上两个参数,绑定一般队列到死信队列。当音讯生产失败时,音讯会被路由到死信队列。

    @Bean
    public Queue sendSmsQueue() {Map<String,Object> arguments = new HashMap<>(2);
        // 绑定该队列到私信交换机
        arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE);
        return new Queue(RabbitMqConfig.MAIL_QUEUE, true, false, false, arguments);
    }

生产者残缺代码:

@Component
@Slf4j
public class MQProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    RandomUtil randomUtil;

    @Autowired
    UserService userService;

    final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {log.info("correlationData:" + correlationData);
            log.info("ack:" + ack);
            if(!ack) {log.info("异样解决....");
            }
    };


    final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
            log.info("return exchange:" + exchange + ", routingKey:"
                    + routingKey + ", replyCode:" + replyCode + ", replyText:" + replyText);

    public void sendMail(String mail) {
        // 貌似线程不平安 范畴 100000 - 999999
        Integer random = randomUtil.nextInt(100000, 999999);
        Map<String, String> map = new HashMap<>(2);
        String code = random.toString();
        map.put("mail", mail);
        map.put("code", code);

        MessageProperties mp = new MessageProperties();
        // 在生产环境中这里不必 Message,而是应用 fastJson 等工具将对象转换为 json 格局发送
        Message msg = new Message("tyson".getBytes(), mp);
        msg.getMessageProperties().setExpiration("3000");
        // 如果生产端要设置为手工 ACK,那么生产端发送音讯的时候肯定发送 correlationData,并且全局惟一,用以惟一标识音讯。CorrelationData correlationData = new CorrelationData("1234567890"+new Date());

        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData);

        // 存入 redis
        userService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT);
    }
}

消费者残缺代码:

@Slf4j
@Component
public class DeadListener {@RabbitListener(queues = RabbitMqConfig.DLX_QUEUE)
    public void onMessage(Message message, Channel channel) throws IOException {

        try {Thread.sleep(5000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 手工 ack
        channel.basicAck(deliveryTag,false);
        System.out.println("receive--1:" + new String(message.getBody()));
    }
}

当一般队列中有死信时,RabbitMQ 就会主动的将这个音讯从新公布到设置的死信交换机去,而后被路由到死信队列。能够监听死信队列中的音讯做相应的解决。

其余

pull 模式

pull 模式次要是通过 channel.basicGet 办法来获取音讯,示例代码如下:

GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

音讯过期工夫

在生产端发送音讯的时候能够给音讯设置过期工夫,单位为毫秒 (ms)

Message msg = new Message("tyson".getBytes(), mp);
msg.getMessageProperties().setExpiration("3000");

也能够在创立队列的时候指定队列的 ttl,从音讯入队列开始计算,超过该工夫的音讯将会被移除。

参考链接

RabbitMQ 根底

Springboot 整合 RabbitMQ

RabbitMQ 之音讯长久化

RabbitMQ 发送邮件代码

线上 rabbitmq 问题

最初给大家分享一个 github 仓库,下面放了 200 多本经典的计算机书籍 ,包含 C 语言、C++、Java、Python、前端、数据库、操作系统、计算机网络、数据结构和算法、机器学习、编程人生等,能够 star 一下,下次找书间接在下面搜寻,仓库继续更新中~

github 地址:https://github.com/Tyson0314/…

如果 github 拜访不了,能够拜访 gitee 仓库。

gitee 地址:https://gitee.com/tysondai/ja…

正文完
 0