1. 前言

在《RabbitMQ(1)-根底开发利用》中,咱们曾经介绍了RabbitMQ的根底开发利用。本文基于这些根底再做一些扩大,延长出一些高级的用法,如:死信队列、提早队列和优先队列。不过还是以死信队列为主,因为提早队列是死信队列的衍生概念,而且优先队列也比较简单,所以先还是在代码层面上,把死信队列搞透。

1.1. 创立队列、交换机

咱们在应用RabbitMQ之前,须要先创立好相干的队列和交换机,并且设置一些绑定关系。因为几篇文章都是联合springboot来开发,上面就联合springboot介绍几种创立形式:

  1. 间接拜访 RabbitMQ Management 治理页面,在页面上创立;或者应用 RabbitMQ其余的客户端来创立治理。
  2. 在springboot上基于生产端开发时,@RabbitListener 注解的 bindings属性,能够简略实现相似性能。
@RabbitListener(bindings = @QueueBinding(            value = @Queue(value = "direct.queue.d",                    durable = "true"),            exchange = @Exchange(value = "direct.exchange.a",                    durable = "true",                    type = ExchangeTypes.DIRECT,                    ignoreDeclarationExceptions = "true"),            key = "direct.routingKey.a"    )    )
  1. 在配置类下定义@Bean,即向Ioc容器中注册Queue、Exchange、Binding的实例。
package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @description: * @date: 2020/7/12 11:26 下午 * @author: kerry */@Configurationpublic class RabbitConfig {       public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a";    public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a";    public static final String NORMAL_QUEUE_A="demo.direct.queue.a";    /**     * NORMAL 交换机     * @return     */    @Bean    public Exchange ExchangeA(){        return ExchangeBuilder                .directExchange(NORMAL_EXCHANGE_A)                .durable(true)                .build();    }    /**     * NORMAL 队列     * @return     */    @Bean    public Queue QueueA(){        return QueueBuilder                .durable(NORMAL_QUEUE_A)                .build();    }    /**     * 绑定 NORMAL队列 和 NORMAL交换机     * @return     */    @Bean    public Binding normalBinding(){        return new Binding(NORMAL_QUEUE_A,                Binding.DestinationType.QUEUE,                NORMAL_EXCHANGE_A,                NORMAL_ROUTING_KEY_A,                null);    }}

我集体举荐第三种,而且倡议是在生产者端定义,消费者应该更关注生产的逻辑。然而如果用代码来创立,有一个很大的毛病,就是不能删除和批改,至多我目前还没找到方法。

因而要联合第一种和第三种来应用,当然都用第一种也是能够的。只是开发人员,更心愿队列、交换机的创立、绑定的逻辑,都体现在代码外面,通过代码能够更好的浏览架构设计。

2. 死信队列

2.1. 简介

死信队列这个名字听起来很特地,但它解决的是日常开发中最常见的问题:不能失常生产的音讯,该如何解决。咱们在第一篇文章中有应用到手动Ack,对于须要nack并且无需重回队列的音讯,冀望有对立的异样解决;包含有些音讯是有时效性的,如果领取订单个别都最大领取时常,超时后就应该勾销订单;等等。

死信队列就是应答这些状况的,它呈现的条件如下:

  1. 音讯被否定确认,应用basicNackbasicReject ,并且此时requeue 属性被设置为false。
  2. 音讯在队列的存活工夫超过设置的TTL工夫。
  3. 音讯队列的音讯数量曾经超过最大队列长度。

死信队列的架构如下:

生产者 --> 音讯 --> 业务交换机 --> 业务队列 --> 音讯变成死信 --> 死信交换机 -->死信队列 --> 消费者

2.2. 配置

如何配置死信队列呢?其实很简略,大略能够分为以下步骤:

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

留神,并不是间接申明一个公共的死信队列,而后所以死信音讯就本人跑到死信队列里去了。而是为每个须要应用死信的业务队列配置一个死信交换机,这里同一个我的项目的死信交换机能够共用一个,而后为每个业务队列调配一个独自的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,而后绑定在死信交换机上。也就是说,死信队列并不是什么非凡的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么非凡的交换机,只不过是用来承受死信的交换机,所以能够为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列调配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,个别会为每个重要的业务队列配置一个死信队列。

2.3. 代码(生产者)

依照简介中死信队列的架构,咱们在配置文件中定义了NORMAL的业务队列和业务交换机,以及DLX的死信队列和死信交换机,并在业务队列中设置了死信交换机。

RabbitConfig.java

package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @description: * @date: 2020/7/12 11:26 下午 * @author: kerry */@Configurationpublic class RabbitConfig {    /**     * DLX,定义参数     */    public static final String X_DEAD_LETTER_EXCHANGE="x-dead-letter-exchange";    public static final String X_DEAD_LETTER_ROUTING_KEY="x-dead-letter-routing-key";    public static final String X_MESSAGE_TTL="x-message-ttl";    public static final String X_MAX_LENGTH="x-max-length";    /**     * DLX,命名     */    public static final String DEAD_LETTER_EXCHANGE_A="demo.direct.dlx.exchange.a";    public static final String DEAD_LETTER_ROUTING_KEY_A="demo.direct.dlx.routingKey.a";    public static final String DEAD_LETTER_QUEUE_A="demo.direct.dlx.queue.a";    /*     * NORMAL,命名     */    public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a";    public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a";    public static final String NORMAL_QUEUE_A="demo.direct.queue.a";    @Bean("jsonRabbitTemplate")    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());        return rabbitTemplate;    }    @Bean("defaultRabbitTemplate")    public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);        return rabbitTemplate;    }    /**     * DLX 交换机     * @return     */    @Bean    public Exchange dlxExchangeA(){        return ExchangeBuilder                .directExchange(DEAD_LETTER_EXCHANGE_A)                .durable(true)                .build();    }    /**     * DLX 队列     * @return     */    @Bean    public Queue dlxQueueA(){        return QueueBuilder                .durable(DEAD_LETTER_QUEUE_A)                .build();    }    /**     * NORMAL 交换机     * @return     */    @Bean    public Exchange ExchangeA(){        return ExchangeBuilder                .directExchange(NORMAL_EXCHANGE_A)                .durable(true)                .build();    }    /**     * NORMAL 队列     * @return     */    @Bean    public Queue QueueA(){        return QueueBuilder                .durable(NORMAL_QUEUE_A)                //设置 死信交换机                .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A)                .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A)                //设置 队列所有音讯 存活工夫8秒                .withArgument(X_MESSAGE_TTL,8000)                //设置 队列最大长度 10条                .withArgument(X_MAX_LENGTH,10)                .build();    }    /**     * 绑定 DLX队列 和 DLX交换机     * @return     */    @Bean    public Binding dlxBinding(){        return new Binding(DEAD_LETTER_QUEUE_A,                Binding.DestinationType.QUEUE,DEAD_LETTER_EXCHANGE_A,                DEAD_LETTER_ROUTING_KEY_A,                null);    }    /**     * 绑定 NORMAL队列 和 NORMAL交换机     * @return     */    @Bean    public Binding normalBinding(){        return new Binding(NORMAL_QUEUE_A,                Binding.DestinationType.QUEUE,                NORMAL_EXCHANGE_A,                NORMAL_ROUTING_KEY_A,                null);    }}

ProducerService.java

@Slf4j@Servicepublic class ProducerService {    public void sendText(String data, MessageProperties messageProperties) {        /**         * 对单个音讯 设置TTL         */        //messageProperties.setExpiration(String.valueOf(3000));        Message message = defaultRabbitTemplate                .getMessageConverter()                .toMessage(data, messageProperties);       defaultRabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_A,                RabbitConfig.NORMAL_ROUTING_KEY_A,                message);    }}

2.4. 代码(消费者)

消费者的逻辑比较简单,次要是别离监听业务队列和死信队列,这里将两个队列的音讯输入日志。这里模仿的是消费者nack音讯,并且不退回队列的状况。

MessageListener.java

/** * @description: * @date: 2020/7/12 11:07 下午 * @author: kerry */@Component@Slf4jpublic class MessageListener {    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private ObjectMapper objectMapper;    /**     * @param message     * @param channel     * 监听 业务队列     *     * @throws Exception     */    @RabbitListener(queues = RabbitConfig.NORMAL_QUEUE_A)    @RabbitHandler    public void onMessage(Message message, Channel channel) throws Exception {        String contentType = message.getMessageProperties().getContentType();        String bodyText = null;        switch (contentType) {            //字符串            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);                break;            //json对象            case MessageProperties.CONTENT_TYPE_JSON:                User user = objectMapper.readValue(message.getBody(), User.class);                bodyText = user.toString();                break;        }        log.info("业务队列-回绝音讯: " + bodyText);        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);         /**         * 提早队列,被生产的音讯须要重回队列         */        //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);    }    /**     * @param message     * @param channel     * 监听 死信队列     *     * @throws Exception     */    @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE_A)    @RabbitHandler    public void onMessageDLX(Message message, Channel channel) throws Exception {        String contentType = message.getMessageProperties().getContentType();        String bodyText = null;        switch (contentType) {            //字符串            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);                break;            //json对象            case MessageProperties.CONTENT_TYPE_JSON:                User user = objectMapper.readValue(message.getBody(), User.class);                bodyText = user.toString();                break;        }        log.info("死信队列-接管音讯: " + bodyText);        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);     }}

2.5. 剖析

结尾说过,导致音讯转为死信队列的形式有3种,上面就从代码中剖析这3种状况。

咱们回过头来看看消费者这边,定义业务队列的办法:

    /**     * NORMAL 队列     * @return     */    @Bean    public Queue QueueA(){        return QueueBuilder                .durable(NORMAL_QUEUE_A)                //设置 死信交换机                .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A)                .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A)                //设置 队列所有音讯 存活工夫8秒                .withArgument(X_MESSAGE_TTL,8000)                //设置 队列最大长度 10条                .withArgument(X_MAX_LENGTH,10)                .build();    }
1. nack回绝音讯,requeue=false

在死信队列的架构中,只有在业务队列中设置了死信交换机x-dead-letter-exchange。消费者代码中,咱们在业务队列的生产过程中nack音讯,并且requeue=false即可。x-dead-letter-routing-key能够不设置,如果不设置,默认取音讯原来的路由键。
代码如下:

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

channel.basicNack办法的参数如下:

  1. deliveryTag:该音讯的index。
  2. multiple:是否批量.true:将一次性回绝所有小于deliveryTag的音讯。
  3. requeue:被回绝的是否从新入队列。
2. 音讯超时TTL

TTL(Time-To-Live)指音讯的存活工夫,咱们有两种形式设置音讯的TTL:

  1. 设置队列的TTL,即参数x-message-ttl,设置后,进入该队列的所有音讯的TTL都为对应的值。
  2. 设置单个音讯的TTL,在生产者代码中,给音讯的属性中设置过期工夫。如:messageProperties.setExpiration(String.valueOf(3000));,就是设置音讯的TTL为3秒。

不过要留神的是,如果曾经设置了nack的死信逻辑,TTL的死信就不失效了。情理也很简略,因为nack音讯和requeue=false一起用,代表音讯被生产了,并且音讯不会重回队列,间接被抛弃或进入死信队列,又怎么会在队列中超时了呢。

3. 超过队列长度

能够设置队列长度,例如最大接管音讯的数量。当音讯在队列中曾经达到最大数量,那么前面再来的音讯,就会被间接丢进死信队列。
咱们也是中定义业务队列的代码中,有通过x-max-length参数,设置业务队列的长度。

3. 提早队列

在我还不晓得提早队列之前,我就感觉消息中间件应该具备这样的性能。在音讯公布到队列后,我冀望每个音讯提早指定工夫后,再被消费者获取到。例如:在领取模块中,当用户生成订单,再到领取实现订单,是有一段时间的。而咱们个别会给这个订单设置超时工夫,如果超过了这段时间,订单就应该被勾销,无奈再领取。那么将订单作为音讯,就能够利用提早队列来实现勾销订单的逻辑。

RabbitMQ并不间接反对提早队列的性能,而是作为一个概念,你能够利用死信队列来实现“提早队列”。利用TTL超时工夫的死信形式,来实现提早队列。

回顾一下上段中TTL的形式,咱们在业务队列中除了定义死信交换机x-dead-letter-exchange,还能够定义队列的生存工夫x-message-ttl,或者设置音讯的过期工夫。而如果咱们不生产这个业务队列中的音讯,那么音讯在达到TTL后,就会主动转到死信队列中。如果咱们只生产死信队列中的音讯,疏忽掉业务队列这个“中转站”,就相当于音讯在被公布后,通过指定时间延迟,在死信队列中被生产,这就造成了一个“提早队列”。

因为提早队列就是死信队列的一种实现,所以代码层面上能够间接参考上段中TTL的局部。

4. 优先队列

优先队列,顾名思义,领有高优先级的队列具备高的优先权,优先级高的音讯具备优先被生产的势力。因而在优先队列中有两个逻辑点:队列是优先队列音讯有优先级。可参考死信队列章节中,TTL局部的代码,上面对代码改变中央做一下阐明:

  1. 设置队列最大优先级,即音讯可应用的最大优先级数字,可通过x-max-priority参数来设置。
  2. 设置音讯的优先级,在生产者代码中,在音讯的属性中设置优先级,优先级越大,越先被生产。如:messageProperties.setPriority(3),即设置该音讯的优先级为3。

优先队列的应用场景是在:音讯有分优先级的需要,并且并发量较大。要求并发量大,是因为如果所有音讯在公布之后,马上就被生产了,那么分优先级的必要性就不大了。