乐趣区

关于rabbitmq:RabbitMQ的开发应用

1. 介绍

RabbitMQ 是一个由 erlang 语言编写的、开源的、在 AMQP 根底上残缺的、可复用的企业音讯零碎。反对多种语言,包含 java、Python、ruby、PHP、C/C++ 等。

1.1.AMQP 模型

AMQP:advanced message queuing protocol,一个提供对立音讯服务的应用层规范高级音讯队列协定,是应用层协定的一个凋谢规范,为面向音讯的中间件设计。基于此协定的客户端与消息中间件可传递音讯并不受客户端 / 中间件不同产品、不同开发语言等条件的限度。

AMQP 模型图

1.1.1. 工作过程

发布者 (Publisher)公布 音讯 (Message),经由 交换机(Exchange)。

交换机依据路由规定将收到的音讯分发给与该交换机绑定的 队列(Queue)。

最初 AMQP 代理会将音讯投递给订阅了此队列的消费者,或者消费者依照需要自行获取。

1、发布者、交换机、队列、消费者都能够有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,音讯代理 能够别离存在于不同的设施上。

2、发布者公布音讯时能够给音讯指定各种音讯属性(Message Meta-data)。有些属性有可能会被音讯代理(Brokers)应用,然而其余的属性则是齐全不通明的,它们只能被接管音讯的利用所应用。

3、从平安角度思考,网络是不牢靠的,又或是消费者在解决音讯的过程中意外挂掉,这样没有解决胜利的音讯就会失落。基于此起因,AMQP 模块蕴含了一个音讯确认(Message Acknowledgements)机制:当一个音讯从队列中投递给消费者后,不会立刻从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才齐全从队列中删除。

4、在某些状况下,例如当一个音讯无奈被胜利路由时(无奈从交换机散发到队列),音讯或者会被返回给发布者并被抛弃。或者,如果音讯代理执行了延期操作,音讯会被放入一个所谓的死信队列中。此时,音讯发布者能够抉择某些参数来解决这些非凡状况。

1.1.2.Exchange 交换机

交换机是用来发送音讯的 AMQP 实体。交换机拿到一个音讯之后将它路由给一个或零个队列。它应用哪种路由算法是由交换机类型和绑定(Bindings)规定所决定的。常见的交换机有如下几种:

  1. direct 直连交换机:Routing Key==Binding Key,严格匹配。
  2. fanout 扇形交换机:把发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中。
  3. topic 主题交换机:Routing Key==Binding Key,含糊匹配。
  4. headers 头交换机:依据发送的音讯内容中的 headers 属性进行匹配。
    具体无关这五种交换机的阐明和用法,后续会有章节具体介绍。

1.1.3.Queue 队列

AMQP 中的队列(queue)跟其余音讯队列或工作队列中的队列是很类似的:它们存储着行将被利用生产掉的音讯。队列跟交换机共享某些属性,然而队列也有一些另外的属性。

  • Durable(音讯代理重启后,队列仍旧存在)
  • Exclusive(只被一个连贯(connection)应用,而且当连贯敞开后队列即被删除)
  • Auto-delete(当最初一个消费者退订后即被删除)
  • Arguments(一些音讯代理用他来实现相似与 TTL 的某些额定性能)

1.2.rabbitmq 和 kafka 比照

rabbitmq 遵循 AMQP 协定,用在实时的对可靠性要求比拟高的消息传递上。kafka 次要用于解决沉闷的流式数据, 大数据量的数据处理上。次要体现在:

1.2.1. 架构

  1. rabbitmq:RabbitMQ 遵循 AMQP 协定RabbitMQ 的 broker 由 Exchange,Binding,queue 组成,其中 exchange 和 binding 组成了音讯的路由键;客户端 Producer 通过连贯 channel 和 server 进行通信,Consumer 从 queue 获取音讯进行生产(长连贯,queue 有音讯会推送到 consumer 端,consumer 循环从输出流读取数据)。rabbitMQ 以 broker 为核心。
  2. kafka:kafka 听从 个别的 MQ 构造,producer,broker,consumer,以 consumer 为核心,音讯的生产信息保留的客户端 consumer 上,consumer 依据生产的点,从 broker 上批量 pull 数据。

1.2.2. 音讯确认

  1. rabbitmq:有音讯确认机制。
  2. kafka:无音讯确认机制。

1.2.3. 吞吐量

  1. rabbitmq:rabbitMQ 在吞吐量方面稍逊于 kafka,他们的出发点不一样,rabbitMQ 反对对音讯的牢靠的传递,反对事务,不反对批量的操作;基于存储的可靠性的要求存储能够采纳内存或者硬盘。
  2. kafka:kafka 具备高的吞吐量,外部采纳音讯的批量解决,zero-copy 机制,数据的存储和获取是本地磁盘程序批量操作,具备 O(1)的复杂度,音讯解决的效率很高。
    (备注:kafka 零拷贝 ,通过sendfile 形式。(1)一般数据读取:磁盘 -> 内核缓冲区(页缓存 PageCache)-> 用户缓冲区 -> 内核缓冲区 -> 网卡输入;(2)kafka 的数据读取:磁盘 -> 内核缓冲区(页缓存 PageCache)-> 网卡输入。

1.2.4. 可用性

  1. rabbitmq(1)一般集群:在多台机器上启动多个 rabbitmq 实例,每个机器启动一个。然而你创立的 queue,只会放在一个 rabbtimq 实例上,然而每个实例都同步 queue 的元数据。完了你生产的时候,实际上如果连贯到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过去。(2)镜像集群:跟一般集群模式不一样的是,你创立的 queue,无论元数据还是 queue 里的音讯都会存在于多个实例上,而后每次你写音讯到 queue 的时候,都会主动把音讯到多个实例的 queue 里进行音讯同步。这样的话,益处在于,一个机器宕机了,没事儿,别的机器都能够用。害处在于,第一,这个性能开销太大了,音讯同步所有机器,导致网络带宽压力和耗费很重。第二,这么玩儿,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也蕴含了这个 queue 的所有数据,并没有方法线性扩大你的 queue
  2. kafka:kafka 是由多个 broker 组成,每个 broker 是一个节点;每创立一个 topic,这个 topic 能够划分为 多个 partition,每个 partition 能够存在于不同的 broker 上,每个 partition 就放一部分数据。这就是人造的分布式音讯队列,就是说一个 topic 的数据,是扩散放在多个机器上的,每个机器就放一部分数据。每个 partition 的数据都会同步到其余机器上,造成本人的 多个 replica 正本,而后所有 replica 会选举一个 leader 进去,主从构造。

1.2.5. 集群负载平衡

  1. rabbitmq:rabbitMQ 的负载平衡须要独自的 loadbalancer 进行反对,如 HAProxy 和 Keepalived 等。
  2. kafka:kafka 采纳 zookeeper 对集群中的 broker、consumer 进行治理,能够注册 topic 到 zookeeper 上;通过 zookeeper 的协调机制,producer 保留对应 topic 的 broker 信息,能够随机或者轮询发送到 broker 上;并且 producer 能够基于语义指定分片,音讯发送到 broker 的某分片上。

2. 构造

2.1. 交换机模式

RabbitMQ 罕用的 Exchange Type 有 fanout、direct、topic、headers 这四种。

2.1.1.Direct Exchange

direct 类型的 Exchange 路由规定很简略,它会把音讯路由到那些 binding key 与 routing key 齐全匹配的 Queue 中。

2.1.2.Topic Exchange

后面讲到 direct 类型的 Exchange 路由规定是齐全匹配 binding key 与 routing key,但这种严格的匹配形式在很多状况下不能满足理论业务需要。topic 类型的 Exchange 与 direct 类型的 Exchage 类似,也是将音讯路由到 binding key 与 routing key 相匹配的 Queue 中,但反对含糊匹配:

  • routing key 为一个句点号“.”分隔的字符串(咱们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key 与 routing key 一样也是句点号“.”分隔的字符串
  • binding key 中能够存在两种特殊字符 ”*” 与“#”,用于做含糊匹配,其中 ” * “ 用于匹配一个单词,“#”用于匹配多个单词(能够是零个)

2.1.3.Fanout Exchange

fanout 类型的 Exchange 路由规定非常简单,它会把所有发送到 fanout Exchange 的音讯都会被转发到与该 Exchange 绑定 (Binding) 的所有 Queue 上。
Fanout Exchange 不须要解决 RouteKey。只须要简略的将队列绑定到 exchange 上。这样发送到 exchange 的音讯都会被转发到与该交换机绑定的所有队列上。相似子网播送,每台子网内的主机都取得了一份复制的音讯。所以,Fanout Exchange 转发音讯是最快的。

2.1.4.Headers Exchange

headers 类型的 Exchange 也不依赖于 routing key 与 binding key 的匹配规定来路由音讯,而是依据发送的音讯内容中的 headers 属性进行匹配。
在绑定 Queue 与 Exchange 时指定一组键值对;当音讯发送到 Exchange 时,RabbitMQ 会取到该音讯的 headers(也是一个键值对的模式),比照其中的键值对是否齐全匹配 Queue 与 Exchange 绑定时指定的键值对;如果齐全匹配则音讯会路由到该 Queue,否则不会路由到该 Queue。

2.1.5.Default Exchange 默认

严格来说,Default Exchange 并不应该和下面四个交换机在一起,因为它不属于独立的一种交换机类型,而是属于 Direct Exchange 直连交换机。

默认交换机(default exchange)实际上是一个由音讯代理事后申明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。

它有一个非凡的属性使得它对于简略利用特地有用途:那就是每个新建队列(queue)都会主动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称雷同。

举个例子:当你申明了一个名为“search-indexing-online”的队列,AMQP 代理会主动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为“search-indexing-online”。所以当你心愿将音讯投递给“search-indexing-online”的队列时,指定投递信息包含:交换机名称为空字符串,路由键为“search-indexing-online”即可。

因而 direct exchange 中的 default exchange 用法,体现出了音讯队列的 point to point,感觉像是间接投递音讯给指定名字的队列。

2.2. 长久化

尽管咱们要防止零碎宕机,然而这种“不可抗力”总会有可能产生。rabbitmq 如果宕机了,再启动便是了,大不了有短暂工夫不可用。但如果你启动起来后,发现这个 rabbitmq 服务器像是被重置了,以前的 exchange,queue 和 message 数据都没了,那就太令人解体了。不光业务零碎因为无对应 exchange 和 queue 受影响,失落的很多 message 数据更是致命的。所以如何保障 rabbitmq 的长久化,在服务应用前必须得思考到位。

长久化能够进步 RabbitMQ 的可靠性,以防在异常情况(重启、敞开、宕机等)下的数据失落。RabbitMQ 的长久化分为三个局部:交换器的长久化、队列的长久化和音讯的长久化。

2.2.1.exchange 长久化

exchange 交换器的长久化是在申明交换器的时候,将durable 设置为 true

如果交换器不设置长久化,那么在 RabbitMQ 交换器服务重启之后,相干的 交换器信息会失落,不过音讯不会失落,然而不能将音讯发送到这个交换器

spring 中创立 exchange 时,构造方法默认设置为长久化。

2.2.2.queue 长久化

队列的长久化在申明队列的时候,将durable 设置为 true

如果队列不设置长久化,那么 RabbitMQ 交换器服务重启之后,相干的 队列信息会失落,同时队列中的音讯也会失落

exchange 和 queue,如果一个是非长久化,另一个是长久化,中 bind 时会报错。

spring 中创立 exchange 时,构造方法默认设置为长久化。

2.2.3.message 长久化

要确保音讯不会失落,除了设置队列的长久化,还须要将音讯设置为长久化。通过 将音讯的投递模式(BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现音讯的长久化

  • 长久化的音讯在达到队列时就被写入到磁盘,并且如果能够,长久化的音讯也会在内存中保留一份备份,这样能够进步肯定的性能,只有在内存吃紧的时候才会从内存中革除。
  • 非长久化的音讯个别只保留在内存中,在内存吃紧的时候会被换入到磁盘中,以节俭内存空间。

如果将所有的音讯都进行长久化操作,这样会影响 RabbitMQ 的性能。写入磁盘的速度比写入内存的速度慢很,所以要在可靠性和吞吐量之间做衡量。

在 spring 中,BasicProperties 中的 deliveryMode 属性,对应的是 MessageProperties 中的 deliveryMode。平时应用的 RabbitTemplate.convertAndSend()办法默认设置为长久化,deliveryMode=2。如果须要设置非长久化发送音讯,须要手动设置:

messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

2.2.4. 残缺计划

这里解说实现音讯长久化的残缺计划。

一、exchange、queue、message

要保障音讯的长久化,在 rabbitmq 自身的构造上须要实现上面这些:

  • exchange 交换机的 durable 设置为 true。
  • queue 队列的 durable 设置为 true。
  • message 音讯的投递模式 deliveryMode 设置为 2。

二、公布确认
后面是保障了音讯在投递到 rabbitmq 中,如何保障 rabbit 中音讯的长久化。
那么还须要保障生产者能胜利公布音讯,如交换机名字写错了等等。能够在公布音讯时设置投递胜利的回调,确定音讯能胜利投递到指标队列中。

三、接管确认
对于消费者来说,如果在订阅音讯的时候,将 autoAck 设置为 true,那么消费者接管到音讯后,还没有解决,就呈现了异样挂掉了,此时,队列中曾经将音讯删除,消费者不可能在收到音讯。

这种状况能够将 autoAck 设置为 false,进行手动确认。

四、镜像队列集群
在长久化后的音讯存入 RabbitMQ 之后,还须要一段时间能力存入磁盘。RabbitMQ 并不会为每条音讯都进行同步存盘,可能仅仅是保留到操作系统缓存之中而不是物理磁盘。如果在这段时间,服务器宕机或者重启,音讯还没来得及保留到磁盘当中,从而失落。对于这种状况,能够引入 RabiitMQ 镜像队列机制。

这里强调 是镜像队列集群,而非一般集群。因为出于同步效率思考,一般集群只会同步队列的元数据,而不会同步队列中的音讯。只有升级成镜像队列集群后,能力也同步音讯。

每个镜像队列由一个 master 和一个或多个 mirrors 组成。主节点位于一个通常称为 master 的节点上。每个队列都有本人的主节点。给定队列的所有操作首先利用于队列的主节点,而后流传到镜像。这包含队列公布(enqueueing publishes)、向消费者传递音讯、跟踪消费者的确认等等。

公布到队列的音讯将复制到所有镜像。不论消费者连贯到哪个节点,都会连贯到 master,镜像会删除在 master 上已确认的音讯。因而,队列镜像进步了可用性,但不会在节点之间调配负载。如果承载队列 master 的节点呈现故障,则最旧的镜像将降级为新的 master,只有它已同步。依据队列镜像参数,也能够降级未同步的镜像。

3. 开发

java 开发上,这里以 spring-boot-starter-amqp 为例,记录在 springboot 中应用 rabbitmq 的一些关注点。pom.xml 中援用为:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.1. 简略示例

一个简略的示例,仅限于文本音讯的公布和接管。

3.1.1. 生产者

ProducerController.java

@RestController
public class ProducerController {
    private static final String HEADER_KEY_UID="uid";
    @Autowired
    private ProducerService producerService;

    @PostMapping("/sendText")
    public void sendText(@RequestParam("uid")String uid,@RequestParam("msg")String msg){MessageProperties messageProperties=new MessageProperties();
        messageProperties.setHeader(HEADER_KEY_UID,uid);
        producerService.sendText(msg,messageProperties);
    }
}

ProducerService.java

@Service
public class ProducerService {
    private static final String EXCHANGE_NAME="direct.exchange.a";
    private static final String ROUTING_KEY_NAME="direct.routingKey.a";
    @Resource
    private RabbitTemplate rabbitTemplate;


    /**
     * 发送 音讯文本
     * @param data 文本音讯
     * @param messageProperties 音讯属性
     */
    public void sendText(String data, MessageProperties messageProperties) {Message message = rabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

音讯发送的罕用办法:

  • rabbitTemplate.send(message); // 发消息,参数类型为 org.springframework.amqp.core.Message
  • rabbitTemplate.convertAndSend(object); // 转换并发送音讯。将参数对象转换为 org.springframework.amqp.core.Message 后发送
  • rabbitTemplate.convertSendAndReceive(message) // 转换并发送音讯, 且期待音讯者返回响应音讯。

3.1.2. 消费者

MessageListener.java

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "true"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        String msg = (String) messageConverter.fromMessage(message);
        log.info("生产端 Body:" + msg);
    }
}
  • @RabbitListener 能够标注在类下面,需配合 @RabbitHandler 注解一起应用
  • @RabbitListener 标注在类下面示意当有收到音讯的时候,就交给 @RabbitHandler 的办法解决,具体应用哪个办法解决,依据 MessageConverter 转换后的参数类型

3.2. 音讯序列化

rabbitmq 中音讯的序列化依赖于 MessageConvert,这是一个接口,用于音讯内容的序列化。

  • Message 分为 body 和 MessageProperties 两局部。RabbitMQ 的序列化是指 Message 的 body 属性,即咱们真正须要传输的内容,RabbitMQ 形象出一个 MessageConvert 接口解决音讯的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等。
  • 当调用了 convertAndSend 办法时,办法外部会应用 MessageConvert 进行音讯的序列化。
  • MessageConvert 是在 RabbitTemplate 中定义的属性,如果我的项目中须要应用多种 MessageConvert。因为 Spring 中 RabbitTemplate 是单例模式注入,倡议每种 MessageConvert 独自定义一种 RabbitTemplate。

3.2.1. 生产者

RabbitConfig.java

public class RabbitConfig {@Bean("jsonRabbitTemplate")
    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean("defaultRabbitTemplate")
    public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

ProducerService.java

@Service
public class ProducerService {
    private static final String EXCHANGE_NAME="direct.exchange.a";
    private static final String ROUTING_KEY_NAME="direct.routingKey.a";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;
    @Resource(name = "jsonRabbitTemplate")
    private RabbitTemplate jsonRabbitTemplate;

    /**
     * 发送 音讯对象 json
     *
     * @param data
     * @param messageProperties
     */
    public void sendObject(Object data, MessageProperties messageProperties) {Message message = jsonRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        jsonRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }

    /**
     * 发送 音讯文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

3.2.2. 消费者

MessageListener.java

@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            // 字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json 对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("生产端 Payload:" + bodyText);
    }  
}

生产者发送对象音讯时,咱们应用 Jackson2JsonMessageConverter,并用其 toMessage 办法封装。然而在消费者接管对象音讯时,咱们却没有用 Jackson2JsonMessageConverter 的 fromMessage 办法,而是应用 ObjectMapper 来反序列化 Json 对象。是因为 rabbitmq 在发送 Jackson2JsonMessageConverter 的序列化对象时,会在蕴含类的包名信息,消费者在应用 fromMessage 反序列化时,必须创立一个和生产者中包名等截然不同的类。显著不太事实。

3.3. 公布确认(生产者)

3.3.1.ConfirmCallback

ConfirmCallback 接口用于实现音讯发送到 RabbitMQ 交换器后接管 ack 回调。

  • 投递对象:exchange
  • 回调触发:无论胜利或失败,都会触发回调。
  • 投递胜利:ack=true
  • 投递失败:ack=false

应用形式在于:

  • 设置 publisher-confirm-type 为 correlated。
  • 实现 RabbitTemplate.ReturnCallback 的函数式接口,并应用。

ProducerService.java

@Slf4j
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME = "direct.exchange.a";
    private static final String ROUTING_KEY_NAME = "direct.routingKey.ab";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;

    /**
     * ConfirmCallback
     *
     * 投递对象:exchange
     * 回调触发:无论胜利或失败,都会触发回调。* 投递胜利:ack=true
     * 投递失败:ack=false
     */
    RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {log.info("ack:" + ack);
        if (!ack) {log.info("投递 exchange 失败!.... 能够进行日志记录、异样解决、弥补解决等");
        } else {log.info("投递 exchange 胜利!");
        }
    };

 
    /**
     * 发送 音讯文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        
        //confirmCallback
        defaultRabbitTemplate.setConfirmCallback(confirmCallback);

        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

配置文件须要设置:

spring.rabbitmq.publisher-confirm-type = correlated

3.3.2.ReturnCallback

ReturnCallback 接口用于实现音讯发送到 RabbitMQ 交换器,但无相应队列与交换器绑定时的回调。

  • 投递对象:queue
  • 回调触发:只有投递失败,才会触发回调。

应用形式在于:

  • 设置 publisher-returns 为 true。
  • 设置 mandatory 为 true。
  • 实现 RabbitTemplate.ReturnCallback 的函数式接口,并应用。

ProducerService.java

@Slf4j
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME = "direct.exchange.a";
    private static final String ROUTING_KEY_NAME = "direct.routingKey.ab";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;

    /**
     * ReturnCallback
     *
     * 投递对象:queue
     * 回调触发:只有投递失败,才会触发回调。*/
    RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText,
                                                    String exchange, String routingKey) -> {
        log.info("投递到 queue 失败!exchange:" + exchange + ", routingKey:"
                + routingKey + ", replyCode:" + replyCode + ", replyText:" + replyText);
    };

    /**
     * 发送 音讯文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        //returnCallback
        defaultRabbitTemplate.setMandatory(true);
        defaultRabbitTemplate.setReturnCallback(returnCallback);

        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

须要在配置文件中配置:

spring.rabbitmq.publisher-returns = true

3.4. 接管确认(消费者)

上一节解说的是,如何在生产者公布音讯时,确认音讯公布到 rabbitmq 的交换机和队列中。那么这一节解说的是,如何保障消费者能齐全“生产”了音讯。

通常状况下,rabbitmq 作为消息中间件,它把 message 推送给消费者就实现了它的使命,该 message 就主动被“签收”了。而消费者在接管到 message 后,再去实现对于该 message 的业务逻辑。可如果在实现该业务逻辑过程中产生了谬误,须要从新执行,那就难办了。因为 message 一旦被“签收”后,就从 rabbitmq 中被删除,不可能从新再发送。

如果消费者能手动管制 message 的“签收”操作,只有当对于 message 的业务逻辑执行实现后再“签收”,message 再从 rabbitmq 中删除,否则能够让 message 重发就好了。这一节就讲这个。

3.4.1.AcknowledgeMode

Acknowledge 意思是“确认”,音讯通过 ACK 确认是否被正确接管,每个 Message 都要被确认(acknowledged),能够手动去 ACK 或主动 ACK。

应用手动应答音讯,有一点须要特地留神,那就是不能遗记应答音讯,因为对于 RabbitMQ 来说解决音讯没有超时,只有不应答音讯,他就会认为仍在失常解决音讯,导致音讯队列呈现阻塞,影响业务执行。如果不想解决,能够 reject 抛弃该音讯。

音讯确认模式有:

  • AcknowledgeMode.NONE:主动确认
  • AcknowledgeMode.AUTO:依据状况确认
  • AcknowledgeMode.MANUAL:手动确认

默认是主动确认,能够通过 RabbitListenerContainerFactory 中进行开启手动 ack, 或者中配置文件中开启:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

MessageListener.java

@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            // 字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json 对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("生产端 Payload:" + bodyText);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

3.4.2.Ack/Nack/Reject

设置为手动确认后,有 3 种确认操作:

  • Ack:确认收到音讯,而后音讯从队列中删除。
  • Nack:确认没有收到音讯,音讯从新回到队列中发送。
  • Reject:回绝该音讯,间接抛弃该音讯,不会回到队列中。

如示例代码中的 basicAck 办法,须要留神的是,要传递两个参数:

  • deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会建设起一个 Channel,RabbitMQ 会用 basic.deliver 办法向消费者推送音讯,这个办法携带了一个 delivery tag,它代表了 RabbitMQ 向该 Channel 投递的这条音讯的惟一标识 ID,是一个枯燥递增的正整数,delivery tag 的范畴仅限于 Channel
  • multiple:为了缩小网络流量,手动确认能够被批处理,当该参数为 true 时,则能够一次性确认 delivery_tag 小于等于传入值的所有音讯

3.4.3. 异样重试

除了上述手动确认的形式,还有一种不太罕用的形式,能够实现反复发送音讯。在开启异样重试的前提下,在消费者代码中抛出异样,会主动重发消息。

application.properties

spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试
spring.rabbitmq.listener.simple.retry.max-attempts=5  最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒)spring.rabbitmq.listener.simple.default-requeue-rejected=false 重试次数超过下面的设置之后是否抛弃

MessageListener.java

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            // 字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json 对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("生产端 Payload:" + bodyText);
        throw new RuntimeException("重试啦");
    }

3.5. 生产模式

在 RabbitMQ 中消费者有 2 种形式获取队列中的音讯:

  • push:basic.consume 命令订阅某一个队列中的音讯,channel 会主动在解决完上一条音讯之后,接管下一条音讯。(同一个 channel 音讯解决是串行的)。除非敞开 channel 或者勾销订阅,否则客户端将会始终接管队列的音讯。
  • pull:basic.get 命令被动获取队列中的音讯,然而相对不能够通过循环调用 basic.get 来代替 basic.consume,这是因为 basic.get RabbitMQ 在理论执行的时候,是首先 consume 某一个队列,而后检索第一条音讯,而后再勾销订阅。如果是高吞吐率的消费者,最好还是倡议应用 basic.consume。

比照来说,如果有继续生产的需要,倡议用 push 的形式,通过监听器来订阅。如果只是特定时刻须要从队列中,一次性取些数据,能够用 pull 形式。

4. 名词概念

4.1.channel

咱们晓得无论是生产者还是消费者,都须要和 RabbitMQ Broker 建设连贯,这个连贯就是一条 TCP 连贯,也就是 Connection。一旦 TCP 连贯建设起来,客户端紧接着能够创立一个 AMQP 信道(Channel),每个信道都会被指派一个惟一的 ID。

信道是建设在 Connection 之上的虚构连贯,RabbitMQ 解决的每条 AMQP 指令都是通过信道实现的。

咱们齐全能够应用 Connection 就能实现信道的工作,为什么还要引入信道呢?试想这样一个场景,一个应用程序中有很多个线程须要从 RabbitMQ 中生产音讯,或者生产音讯,那么必然须要建设很多个 Connection,也就是多个 TCP 连贯。然而对于操作系统而言,建设和销毁 TCP 连贯是十分低廉的开销,如果遇到应用顶峰,性能瓶颈也随之浮现。

RabbitMQ 采纳相似 NIO(Non-blocking I/O)的做法,抉择 TCP 连贯复用,不仅能够缩小性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连贯。同时 RabbitMQ 能够确保每个线程的私密性,就像领有独立的连贯一样。当每个信道的流量不是很大时,复用繁多的 Connection 能够在产生性能瓶颈的状况下无效地节俭 TCP 连贯资源。然而信道自身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限度了。此时就须要开拓多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相干的调优策略须要依据业务本身的理论状况进行调节。

信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面开展的。比方 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等办法。RabbitMQ 相干的 API 与 AMQP 严密相连,比方 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

4.2.QoS

针对 push 形式,RabbitMQ 能够设置 basicQoS(Consumer Prefetch)来对 consumer 进行流控,从而限度未 Ack 的音讯数量。

前提包含,音讯确认模式必须是手动确认。

basicQos(int var1, boolean var2)
  • 第一个参数是限度未 Ack 音讯的最大数量。
  • 第二个参数是布尔值,(1)为 true 时,阐明是针对 channel 做的流控限度;(2)为 false 时,阐明是针对整个消费者做的流控限度。
退出移动版