关于java:RabbitMQ-如何实现延迟队列

3次阅读

共计 3638 个字符,预计需要花费 10 分钟才能阅读完成。

提早队列是指当音讯被发送当前,并不是立刻执行,而是期待特定的工夫后,消费者才会执行该音讯。
提早队列的应用场景有以下几种:

  1. 未按时领取的订单,30 分钟过期之后勾销订单。
  2. 给活跃度比拟低的用户距离 N 天之后推送音讯,进步活跃度。
  3. 新注册会员的用户,期待几分钟之后发送欢送邮件等。

    1. 如何实现提早队列?

    提早队列有以下两种实现形式:

  4. 通过音讯过期后进入死信交换器,再由交换器转发到提早生产队列,实现提早性能;
  5. 应用官网提供的提早插件实现提早性能。

晚期,大部分公司都会采纳第一种形式,而随着 RabbitMQ 3.5.7(2015 年底公布)的提早插件的公布,因为其应用更简略、更不便,所以它当初才是大家一般会采纳的,实现提早队列的形式,所以本文也只讲第二种形式。

2. 实现提早队列

2.1 装置并启动提早队列

2.1.1 下载提早插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

留神:须要依据你本人的 RabbitMQ 服务器端版本抉择雷同版本的提早插件,能够在 RabbitMQ 控制台查看:

2.1.2 将插件放到插件目录

接下来,将上一步下载的插件放到 RabbitMQ 服务器装置目录,如果是 docker,应用一下命令复制:

docker cp 宿主机文件 容器名称或 ID: 容器目录

如下图所示:

之后,进入 docker 容器,查看插件中是否蕴含提早队列:

docker exec -it 容器名称或 ID /bin/bash
rabbitmq-plugins list

如下图所示:

2.1.3 启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下图所示:

2.1.4 重启 RabbitMQ 服务

装置完 RabbitMQ 插件之后,须要重启 RabbitMQ 服务能力失效。
如果应用的是 Docker,只须要重启 Docker 容器即可:

docker restart 容器名称或 ID

如下图所示:

2.1.5 验收后果

在 RabbitMQ 控制台查看,新建交换机时是否有提早音讯选项,如果有就阐明提早音讯插件曾经失常运行了,如下图所示:

2.1.6 手动创立提早交换器(可选)

此步骤可选(非必须),因为某些版本下通过程序创立提早交换器可能会出错,如果出错了,手动创立提早队列即可,如下图所示:

2.2 编写提早音讯实现代码

2.2.1 配置交换器和队列

import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * 提早交换器和队列
 */
@Configuration
public class DelayedExchangeConfig {
    public static final String EXCHANGE_NAME = "myDelayedExchange";
    public static final String QUEUE_NAME = "delayed.queue";
    public static final String ROUTING_KEY = "delayed.routing.key";

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange(EXCHANGE_NAME,
                "x-delayed-message", // 音讯类型
                true, // 是否长久化
                false); // 是否主动删除
    }

    @Bean
    public Queue delayedQueue() {return QueueBuilder.durable(QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();}

    @Bean
    public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();}
}

2.1.2 定义音讯发送办法

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 5000)
    public void sendDelayedMessage(String message) {
        rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
                DelayedExchangeConfig.ROUTING_KEY,
                message,
                messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(10000); // 设置延迟时间,单位毫秒
                    return messagePostProcessor;
                });
    }
}

2.1.3 发送提早音讯

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
    @Autowired
    private DelayedMessageProducer delayedMessageProducer;

    @GetMapping("/send")
    public String sendDirectMessage(@RequestParam String message) {delayedMessageProducer.sendDelayedMessage(message);
        return "Delayed message sent to Exchange:" + message;
    }
}

2.1.4 接管提早音讯

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class DelayedMessageConsumer {@RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
    public void receiveDelayedMessage(String message) {System.out.println("Received delayed message:" + message);
    }
}

PS:获取本文提早队列的实现 Demo,请加我:GG_Stone【备注:提早队列】

小结

实现 RabbitMQ 提早队列目前支流的实现形式,是采纳官网提供的提早插件来实现。而提早插件须要先下载插件、而后配置并重启 RabbitMQ 服务,之后就能够通过编写代码的形式实现提早队列了。

本文已收录到我的面试小站 www.javacn.site,其中蕴含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、音讯队列等模块。

正文完
 0