SpringBoot23整合RabbitMQ实现延迟消费消息

28次阅读

共计 3954 个字符,预计需要花费 10 分钟才能阅读完成。

1. 源码获取地址

文章末尾有源代码地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章节主要实现消息的延迟消费,在学习延迟消费之前必须先了解 RabbitMQ 两个基本概念,消息的 TTL 和死信 Exchange,通过这两者的组合来实现消息的延迟消费。
不想看原理讲解的,直接通过标题 6 看代码实现

2. 消息的 TTL(Time To Live)

消息的 TTL 就是消息的存活时间。RabbitMQ 可以对队列和消息分别设置 TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

3. 死信交换器 Dead Letter Exchanges

  • 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
  • 一个消息被 Consumer 拒收了,并且 reject 方法的参数里 requeue 是 false。也就是说不会被再次放在队列里,被其他消费者使用。
  • 上面的消息的 TTL 到了,消息过期了
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。死信交换器 (Dead Letter Exchange) 其实就是一种普通的 exchange,和创建其他 exchange 没有两样。只是在某一个设置 Dead Letter Exchange 的队列中有消息过期了,会自动触发消息的转发,发送到 Dead Letter Exchange 中去。

4. 实现延迟消费原理

  • 大概原理: 首先发送消息到死信队列,死信队列设置 ttl 过期时间,到期之后会自动将消息发送到一般队列实现消息的消费
  • 实现步骤如下
  • 创建死信交换器
  • 创建死信队列
  • 将死信队列与死信交换机绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由 key)消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息.
  • 创建正常交换器
  • 创建正常队列
  • 将正常队列绑定到正常交换器

5. 基于案例实现消息的延迟消费

这里我们以最熟悉的 12306 购票为例进行案例场景的分析,12306 购票步骤如下:

  • 首先登录 12306 根据日期和起点站等条件进行抢票下订单
  • 抢到票订单处于未支付状态, 并提示支付时间 30 分钟内

  • 这里就可以使用到延时队列,在下订单完成的时候将订单号发送到 MQ 的死信队列,并设置 30 分钟过期,30 分钟以后死信队列的数据会转发到正常队列,从正常队列中获取到下订单的订单号,然后我们根据订单号查询订单的支付状态,如果已经支付我们不做任何操作,如果未支付取消订单,关闭支付状态,将票回滚到票池供其他用户购买

6. 代码实现

  • 在 RabbitMQConfig 中创建队列、交换机以及绑定关系

    @Configuration
    public class RabbitMQConfig {

        /**
         * 测试发送消息到 MQ
         * @return
             */
            @Bean
            public Queue testHello() {return new Queue(SysConstant.QUEUE_TEST_HELLO);
            }


            /**
             * 死信交换机
             * @return
             */
            @Bean
            public DirectExchange sysOrderDelayExchange() {return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE);
            }

            /**
             * 死信队列
             * @return
             */
            @Bean
            public Queue sysOrderDelayQueue() {Map<String, Object> map = new HashMap<String, Object>(16);
                    map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); // 指定死信送往的交换机
                    map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); // 指定死信的 routingkey
                    return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map);
            }

            /**
             * 给死信队列绑定死信交换机
             * @return
             */
            @Bean
            public Binding sysOrderDelayBinding() {return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY);
            }

            /**
             * 死信接收交换机, 用于接收死信队列的消息
             * @return
             */
            @Bean
            public DirectExchange sysOrderReceiveExchange() {return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE);
            }

            /**
             * 死信接收队列
             * @return
             */
            @Bean
            public Queue sysOrderReceiveQueue() {return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE);
            }

            /**
             * 死信接收交换机绑定接收死信队列消费队列
             * @return
             */
            @Bean
            public Binding sysOrdeReceiveBinding() {return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY);
            }
    }
    
  • 发送延时消息到死信交换器方法

    @Service
    public class MsgService {

        @Autowired
        private RabbitTemplate rabbitTemplate;
        /**
         * 发送延时消息到 mq
         * @param exchange 死信交换机
         * @param routeKey 路由 key
         * @param data 发送数据
         * @param delayTime 过期时间,单位毫秒
                     */
                    public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) {
                            rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> {message.getMessageProperties().setExpiration(delayTime + "");
                                    return message;
                            });
                    }
            }
  • 监听队列消息 ReceiveMsgListener 类

    /**

        * 获取到的延时消息
        * 这里接收到消息进行对应的业务处理(例如: 取消订单,关闭支付,回滚库存等 ...)
        * @param msg
             */
            @RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE)
            @RabbitHandler
            public void getdelayMsg(String msg) {log.info("MQ 接收消息时间:{}, 消息内容:{}", DateUtil.formatDateTime(DateUtil.date()),msg);
                    log.info("-------> 这里实现订单关闭、支付关闭、回滚库存业务逻辑...");
            }
  • 创建 Controller 向队列发送消息,设置过期时间 10 秒

    @RestController
    @RequestMapping(“mq”)
    @Slf4j
    public class MQController {

        @Autowired
        private MsgService msgService;
    
        @GetMapping("sendMsg")
        public String sendMsg() {log.info("发送延时消息时间:" + DateUtil.formatDateTime(DateUtil.date()));
    
                OrderInfo orderInfo = new OrderInfo();
                orderInfo.setOrderId(IdUtil.fastSimpleUUID());
                orderInfo.setOrderState("待支付");
                orderInfo.setPayMoney(999.88);
                msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);// 1 分钟
                return JSONUtil.toJsonStr("发送延时消息成功");
        }

    }

  • 启动服务,可以看到 MQ 中创建对应的队列和交换器


  • 控制台日志可以看到发送消息与消费消息间隔时间是 10s

7. 更多 MQ 技术文档获取

https://www.sunnyblog.top/index.html?tagId=1264009609236971520

详细开发技术文档尽在 点击这里查看技术文档;更多技术文章:https://www.sunnyblog.top;任何疑问加 QQ 群咨询:534073451

正文完
 0