1.延时队列的概念

2.延时队列实用场景

3.RabbitMQ中的两种TTL

4.队列的TTL

5.音讯的TTL

6.RabbitMQ插件实现延时队列

1.延时队列的概念
延时队列,顾名思义,就是音讯等一段时间再发送进来,最重要的属性是延时属性,它侧重于延时队列中的音讯在指定的工夫到了之后再取出解决

2.延时队列实用场景

2.1)用户给本人设置了一个揭示,到点推送音讯。
2.2)订单在完结之后主动把钱打到买家账户。
2.3)订单在几分钟内未领取主动勾销。
2.4)用户发动退款,一天之内没解决就主动推送给相干人员。

以上这些场景都有一个特点,那就是在某个业务逻辑实现后,在指定的特定工夫实现某一项业务逻辑的解决。如:用户给本人设置了一个揭示,到点推送音讯。其实如果简略地来说,是不是应用定时工作解决每分钟甚至每秒钟轮询所有数据,符合条件的推送就行了?如果在数据量少的状况,且对于工夫解决不是特地严格,能够领有肯定的延时的状况,是能够这么解决的,然而一旦数据量变得十分大,并且时效性十分强的业务场景,定时工作就会解决不过去,可能无奈在几秒钟或者一分钟大量解决数据,同时也会给数据库很大压力,既不满足业务要求,而且性能也很低下。

3.RabbitMQ中的两种TTL
TTL是RabbitMQ中,一个音讯或者一个队列的属性,表明一条音讯或者队列中所有的音讯的最大存活工夫,单位是毫秒

如果一条音讯设置了TTL属性或者音讯进入了设置TTL属性的队列,在这个工夫到了的状况下还没被生产,那么就会成为死信。如果同时配置了两个TTL属性,那么较小的值将会被应用。

4.队列的TTL

pom:

        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>        </dependency>        <!--rabbitmq 依赖客户端-->        <dependency>            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>5.8.0</version>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>

yml:

server:  port: 11000spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest

在写队列之前,咱们先画一张队列与交换机的关系图,就能够更顺利地编写代码,咱们这里要写两个有过期工夫的队列,过期工夫不同。

这次咱们不应用原生API,咱们应用springBoot整合好的API:

申明队列和交换机:

@Configurationpublic class QueueConfig {        private String xExchange = "x";    private String queueA = "QA";    private String queueB = "QB";        private String yDeadLetterExchange = "Y";    private String deadLetterQueue = "QD";     @Bean("xExchange")    public DirectExchange xExchange(){        return new DirectExchange(xExchange);    }    @Bean("yExchange")    public DirectExchange yExchange(){        return new DirectExchange(yDeadLetterExchange);    }    @Bean("queueA")    public Queue queueA(){        Map<String, Object> args = new HashMap<>(3);        //申明以后队列绑定的死信交换机        args.put("x-dead-letter-exchange", yDeadLetterExchange);        //申明以后队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");        //申明队列的 TTL        args.put("x-message-ttl", 10000);        return QueueBuilder.durable(queueA).withArguments(args).build();    }    @Bean    public Binding queueABindX(@Qualifier("queueA") Queue queueA,                               @Qualifier("xExchange") DirectExchange xExchange){        return BindingBuilder.bind(queueA).to(xExchange).with("XA");    }    @Bean("queueB")    public Queue queueB(){        Map<String, Object> args = new HashMap<>(3);        //申明以后队列绑定的死信交换机        args.put("x-dead-letter-exchange", yDeadLetterExchange);        //申明以后队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");        //申明队列的 TTL        args.put("x-message-ttl", 40000);        return QueueBuilder.durable(queueB).withArguments(args).build();    } @Bean    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,                                  @Qualifier("xExchange") DirectExchange xExchange){        return BindingBuilder.bind(queueB).to(xExchange).with("XB");    }    //申明死信队列 QD    @Bean("queueD")    public Queue queueD(){        return new Queue(deadLetterQueue);    }    //申明死信队列 QD 绑定关系    @Bean    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,                                        @Qualifier("yExchange") DirectExchange yExchange){        return BindingBuilder.bind(queueD).to(yExchange).with("YD");    }}

音讯生产者代码:

@Slf4j@RequestMapping("/ttl")@RestControllerpublic class SendMsgController {    @Autowired    private RabbitTemplate rabbitTemplate;    @GetMapping("/sendMsg/{message}")    public void sendMsg(@PathVariable String message) {        log.info("以后工夫:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);        rabbitTemplate.convertAndSend("x", "XA", "音讯来自 ttl 为 10S 的队列: " + message);        rabbitTemplate.convertAndSend("x", "XB", "音讯来自 ttl 为 40S 的队列: " + message);    }}

音讯消费者代码:

@Slf4j@Componentpublic class DeadLetterQueueConsumer {        @RabbitListener(queues = "QD")    public void receiveD(Message message, Channel channel) throws IOException {        String msg = new String(message.getBody());        log.info("以后工夫:{},收到死信队列信息{}", new Date().toString(), msg);    }}

咱们发送两条音讯:
http://localhost:11000/ttl/se...

咱们发现,第一条音讯在10s之后变成了死信音讯,第二条音讯在40s之后变成了死信音讯,而后被生产,一个延时队列就实现了。

然而这种架构,扩展性不强。如果每次减少一个新的需要,有不同的延时工夫要求,那么就要减少一个队列

5.音讯的TTL
为了解决下面的问题,咱们就须要给音讯设置一个过期工夫,这样就能够避免队列的无序扩张,音讯到期后主动收回去就能够了。

申明队列:

    private String queueC = "QC";    //申明队列 C 死信交换机    @Bean("queueC")    public Queue queueC(){        Map<String, Object> args = new HashMap<>(3);        //申明以后队列绑定的死信交换机        args.put("x-dead-letter-exchange", yDeadLetterExchange);        //申明以后队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");        //没有申明 TTL 属性        return QueueBuilder.durable(queueC).withArguments(args).build();    }        @Bean    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,                                  @Qualifier("xExchange") DirectExchange xExchange){        return BindingBuilder.bind(queueC).to(xExchange).with("XC");    }

生产者:

    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {        rabbitTemplate.convertAndSend("x", "XC", message,                correlationData -> {                    correlationData.getMessageProperties().setExpiration(ttlTime);                    return correlationData;                });        log.info("以后工夫:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);    }

发送申请:

http://localhost:11000/ttl/se...能够啊10000/10000

http://localhost:11000/ttl/se...能够啊20000/20000

能够看出,期待音讯过期后,就会主动把音讯收回,不过这么做有一个bug

RabbitMQ只会查看第一个音讯是否过期,如果过期则转发至死信交换机,如果第一个音讯的过期工夫很长很长,而第二个音讯的过期工夫很短很短,那么在第一个音讯发送胜利之前,第二个音讯不会先失去执行。

6.RabbitMQ插件实现延时队列

下面提到的问题,如果咱们的队列受困于第一条音讯的过期工夫,那么这个音讯并不是一个残缺的音讯队列,那咱们该如何解决呢?

咱们须要装置一个延时队列的插件:

https://github.com/rabbitmq/r...

到这个网站去下载,要留神一下rabbitMQ的版本,我的是3.8.3,所以要下载对应版本,放在rabbitMQ的plugIn文件夹下

在该文件夹下执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange,或者重启服务,就能够领有插件延时队列了。

架构图:

申明队列:

 public static final String DELAYED_QUEUE_NAME = "delayed.queue";    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";    @Bean    public Queue delayedQueue() {        return new Queue(DELAYED_QUEUE_NAME);    }    @Bean    public CustomExchange delayedExchange() {        Map<String, Object> args = new HashMap<>();        //自定义交换机的类型        args.put("x-delayed-type", "direct");        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);    }    @Bean    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,                                       @Qualifier("delayedExchange") CustomExchange                                               delayedExchange) {        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();    }

生产者:

    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";    @GetMapping("sendDelayMsg/{message}/{delayTime}")    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,                correlationData -> {                    correlationData.getMessageProperties().setDelay(delayTime);                    return correlationData;                });        log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);    }

消费者:

    public static final String DELAYED_QUEUE_NAME = "delayed.queue";    @RabbitListener(queues = DELAYED_QUEUE_NAME)    public void receiveDelayedQueue(Message message) {        String msg = new String(message.getBody());        log.info("以后工夫:{},收到延时队列的音讯:{}", new Date().toString(), msg);    }

咱们先发送一个过期工夫比拟长的音讯,再发送一条过期工夫比拟短的音讯:

http://localhost:11000/ttl/se...能够啊20000/20000

http://localhost:11000/ttl/se...能够啊10000/10000

咱们发现音讯的生产时合乎咱们的预期的,应用插件完满解决了音讯发送的问题。