乐趣区

关于rabbitmq:系统学习消息队列RabbitMQ的延时队列

1. 延时队列的概念

2. 延时队列实用场景

3.RabbitMQ 中的两种 TTL

4. 队列的 TTL

5. 音讯的 TTL

6.RabbitMQ 插件实现延时队列

1. 延时队列的概念
延时队列 ,顾名思义,就是 音讯等一段时间再发送进来 ,最重要的属性是 延时属性 ,它侧重于延时队列中的 音讯在指定的工夫到了之后再取出解决

2. 延时队列实用场景

2.1)用户给本人设置了一个揭示,到点推送音讯。
2.2)订单在 完结之后主动 把钱打到买家账户。
2.3)订单在 几分钟内 未领取主动勾销。
2.4)用户发动退款,一天之内 没解决就 主动推送 给相干人员。

以上这些场景都有一个特点,那就是在某个业务逻辑实现后,在指定的特定工夫实现某一项业务逻辑的解决 。如:用户给本人 设置了一个揭示,到点推送音讯 。其实如果简略地来说,是不是应用 定时工作解决 每分钟甚至每秒钟轮询所有数据 ,符合条件的推送就行了?如果在 数据量少 的状况,且 对于工夫解决不是特地严格 ,能够 领有肯定的延时的状况 ,是能够这么解决的,然而 一旦数据量变得十分大 ,并且 时效性十分强 的业务场景,定时工作就会解决不过去,可能无奈在几秒钟或者一分钟大量解决数据,同时也会给数据库很大压力,既不满足业务要求,而且性能也很低下。

3.RabbitMQ 中的两种 TTL
TTL 是 RabbitMQ 中,一个 音讯 或者一个 队列 的属性,表明一条音讯或者队列中所有的音讯的 最大存活工夫 ,单位是 毫秒

如果一条音讯设置了 TTL 属性或者音讯进入了设置 TTL 属性的队列,在这个工夫到了的状况下还没被生产,那么就会成为死信 。如果同时配置了两个 TTL 属性,那么 较小 的值将会被应用。

4. 队列的 TTL

pom:

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--rabbitmq 依赖客户端 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

yml:

server:
  port: 11000
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

在写队列之前,咱们先画一张队列与交换机的关系图,就能够更顺利地编写代码,咱们这里要写两个有过期工夫的队列,过期工夫不同。

这次咱们不应用原生 API,咱们应用 springBoot 整合好的 API:

申明队列和交换机:

@Configuration
public class QueueConfig {
    
    private String xExchange = "x";
    private String queueA = "QA";
    private String queueB = "QB";
    
    private String yDeadLetterExchange = "Y";
    private String deadLetterQueue = "QD";

     @Bean("xExchange")
    public DirectExchange xExchange(){return new DirectExchange(xExchange);
    }

    @Bean("yExchange")
    public DirectExchange yExchange(){return new DirectExchange(yDeadLetterExchange);
    }

    @Bean("queueA")
    public Queue queueA(){Map<String, Object> args = new HashMap<>(3);
        // 申明以后队列绑定的死信交换机
        args.put("x-dead-letter-exchange", yDeadLetterExchange);
        // 申明以后队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        // 申明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(queueA).withArguments(args).build();}

    @Bean
    public Binding queueABindX(@Qualifier("queueA") Queue queueA,
                               @Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean("queueB")
    public Queue queueB(){Map<String, Object> args = new HashMap<>(3);
        // 申明以后队列绑定的死信交换机
        args.put("x-dead-letter-exchange", yDeadLetterExchange);
        // 申明以后队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        // 申明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(queueB).withArguments(args).build();}

 @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    // 申明死信队列 QD
    @Bean("queueD")
    public Queue queueD(){return new Queue(deadLetterQueue);
    }

    // 申明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

音讯生产者代码:

@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {log.info("以后工夫:{}, 发送一条信息给两个 TTL 队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("x", "XA", "音讯来自 ttl 为 10S 的队列:" + message);
        rabbitTemplate.convertAndSend("x", "XB", "音讯来自 ttl 为 40S 的队列:" + message);
    }

}

音讯消费者代码:

@Slf4j
@Component
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());
        log.info("以后工夫:{}, 收到死信队列信息{}", new Date().toString(), msg);
    }

}

咱们发送两条音讯:
http://localhost:11000/ttl/se…

咱们发现,第一条音讯在 10s 之后变成了死信音讯,第二条音讯在 40s 之后变成了死信音讯,而后被生产,一个延时队列就实现了。

然而这种架构,扩展性不强 。如果 每次减少一个新的需要 ,有 不同的延时工夫要求 ,那么就要 减少一个队列

5. 音讯的 TTL
为了解决下面的问题,咱们就须要 给音讯 设置一个 过期工夫,这样就能够避免队列的无序扩张,音讯到期后主动收回去就能够了。

申明队列:

    private String queueC = "QC";

    // 申明队列 C 死信交换机
    @Bean("queueC")
    public Queue queueC(){Map<String, Object> args = new HashMap<>(3);
        // 申明以后队列绑定的死信交换机
        args.put("x-dead-letter-exchange", yDeadLetterExchange);
        // 申明以后队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        // 没有申明 TTL 属性
        return QueueBuilder.durable(queueC).withArguments(args).build();}

    
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

生产者:

    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        rabbitTemplate.convertAndSend("x", "XC", message,
                correlationData -> {correlationData.getMessageProperties().setExpiration(ttlTime);
                    return correlationData;
                });
        log.info("以后工夫:{}, 发送一条时长 {} 毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
    }

发送申请:

http://localhost:11000/ttl/se… 能够啊 10000/10000

http://localhost:11000/ttl/se… 能够啊 20000/20000

能够看出,期待音讯过期后,就会主动把音讯收回,不过这么做有一个bug

RabbitMQ只会查看第一个音讯是否过期 ,如果过期则转发至死信交换机,如果 第一个音讯的过期工夫很长很长 ,而 第二个音讯的过期工夫很短很短 ,那么在 第一个音讯发送胜利之前,第二个音讯不会先失去执行。

6.RabbitMQ 插件实现延时队列

下面提到的问题,如果咱们的队列受困于第一条音讯的过期工夫,那么这个音讯并不是一个残缺的音讯队列,那咱们该如何解决呢?

咱们须要装置一个延时队列的插件:

https://github.com/rabbitmq/r…

到这个网站去下载,要留神一下 rabbitMQ 的版本,我的是 3.8.3,所以要下载对应版本,放在 rabbitMQ 的 plugIn 文件夹下

在该文件夹下执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange,或者重启服务,就能够领有插件延时队列了。

架构图:

申明队列:

 public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";


    @Bean
    public Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();
        // 自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange
                                               delayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}

生产者:

    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";


    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                correlationData -> {correlationData.getMessageProperties().setDelay(delayTime);
                    return correlationData;
                });
        log.info("当 前 时 间:{}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
    }

消费者:

    public static final String DELAYED_QUEUE_NAME = "delayed.queue";

    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());
        log.info("以后工夫:{}, 收到延时队列的音讯:{}", new Date().toString(), msg);
    }

咱们先发送一个过期工夫比拟长的音讯,再发送一条过期工夫比拟短的音讯:

http://localhost:11000/ttl/se… 能够啊 20000/20000

http://localhost:11000/ttl/se… 能够啊 10000/10000

咱们发现音讯的生产时合乎咱们的预期的,应用插件完满解决了音讯发送的问题。

退出移动版