RabbitMQ实现延迟消息居然如此简单整个插件就完事了

40次阅读

共计 4646 个字符,预计需要花费 12 分钟才能阅读完成。

SpringBoot 实战电商我的项目 mall(35k+star)地址:https://github.com/macrozheng/mall

摘要

RabbitMQ 实现提早音讯的形式有两种,一种是应用 死信队列 实现,另一种是应用 提早插件 实现。死信队列 实现咱们以前已经讲过,具体参考《mall 整合 RabbitMQ 实现提早音讯》,这次咱们讲个更简略的,应用 提早插件 实现。

学前筹备

学习本文须要对 RabbitMQ 有所理解,还不理解的敌人能够看下:《花了 3 天总结的 RabbitMQ 实用技巧,有点货色!》

插件装置

首先咱们须要下载并装置 RabbitMQ 的提早插件。

  • 去 RabbitMQ 的官网下载插件,插件地址:https://www.rabbitmq.com/comm…
  • 间接搜寻 rabbitmq_delayed_message_exchange 即可找到咱们须要下载的插件,下载和 RabbitMQ 配套的版本,不要弄错;

  • 将插件文件复制到 RabbitMQ 装置目录的 plugins 目录下;

  • 进入 RabbitMQ 装置目录的 sbin 目录下,应用如下命令启用提早插件;
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 启用插件胜利后就能够看到如下信息,之后重新启动 RabbitMQ 服务即可。

实现提早音讯

接下来咱们须要在 SpringBoot 中实现提早音讯性能,这次仍然沿用商品下单的场景。比如说有个用户下单了,他 60 分钟不领取订单,订单就会被勾销,这就是一个典型的提早音讯应用场景。

  • 首先咱们须要在 pom.xml 文件中增加 AMQP 相干依赖;
<!-- 音讯队列相干依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 之后在 application.yml 增加 RabbitMQ 的相干配置;
spring:
  rabbitmq:
    host: localhost # rabbitmq 的连贯地址
    port: 5672 # rabbitmq 的连贯端口号
    virtual-host: /mall # rabbitmq 的虚构 host
    username: mall # rabbitmq 的用户名
    password: mall # rabbitmq 的明码
    publisher-confirms: true #如果对异步音讯须要回调必须设置为 true
  • 接下来创立 RabbitMQ 的 Java 配置,次要用于配置交换机、队列和绑定关系;
/**
 * 音讯队列配置
 * Created by macro on 2018/9/14.
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 订单提早插件音讯队列所绑定的交换机
     */
    @Bean
    CustomExchange  orderPluginDirect() {
        // 创立一个自定义交换机,能够发送提早音讯
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args);
    }

    /**
     * 订单提早插件队列
     */
    @Bean
    public Queue orderPluginQueue() {return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());
    }

    /**
     * 将订单提早插件队列绑定到交换机
     */
    @Bean
    public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) {
        return BindingBuilder
                .bind(orderPluginQueue)
                .to(orderPluginDirect)
                .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey())
                .noargs();}

}
  • 创立一个勾销订单音讯的收回者,通过给音讯设置 x-delay 头来设置音讯从交换机发送到队列的延迟时间;
/**
 * 勾销订单音讯的收回者
 * Created by macro on 2018/9/14.
 */
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        // 给提早队列发送音讯
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 给音讯设置提早毫秒值
                message.getMessageProperties().setHeader("x-delay",delayTimes);
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}
  • 创立一个勾销订单音讯的接收者,用于解决订单提早插件队列中的音讯。
/**
 * 勾销订单音讯的解决者
 * Created by macro on 2018/9/14.
 */
@Component
@RabbitListener(queues = "mall.order.cancel.plugin")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}
  • 而后在咱们的订单业务实现类中增加如下逻辑,当下单胜利之前,往音讯队列中发送一个勾销订单的提早音讯,这样如果订单没有被领取的话,就能勾销订单了;
/**
 * 前台订单治理 Service
 * Created by macro on 2018/8/30.
 */
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
    @Autowired
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
        //todo 执行一系类下单操作,具体参考 mall 我的项目
        LOGGER.info("process generateOrder");
        // 下单实现后开启一个提早音讯,用于当用户没有付款时勾销订单(orderId 应该在下单后生成)sendDelayMessageCancelOrder(11L);
        return CommonResult.success(null, "下单胜利");
    }

    @Override
    public void cancelOrder(Long orderId) {
        //todo 执行一系类勾销订单操作,具体参考 mall 我的项目
        LOGGER.info("process cancelOrder orderId:{}",orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {// 获取订单超时工夫,假如为 60 分钟(测试用的 30 秒)
        long delayTimes = 30 * 1000;
        // 发送提早音讯
        cancelOrderSender.sendMessage(orderId, delayTimes);
    }

}
  • 启动我的项目后,在 Swagger 中调用下单接口;

  • 调用实现后查看控制台日志能够发现,从音讯发送和音讯接管解决正好相差了30s,咱们设置的延迟时间。
2020-06-08 13:46:01.474  INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process generateOrder
2020-06-08 13:46:01.482  INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender   : send delay message orderId:11
2020-06-08 13:46:31.517  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver    : receive delay message orderId:11
2020-06-08 13:46:31.520  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process cancelOrder orderId:11

两种实现形式比照

咱们之前应用过死信队列的形式,这里咱们把两种形式做个比照,先来聊下这两种形式的实现原理。

死信队列

死信队列是这样一个队列,如果音讯发送到该队列并超过了设置的工夫,就会被转发到设置好的解决超时音讯的队列当中去,利用该个性能够实现提早音讯。

提早插件

通过装置插件,自定义交换机,让交换机领有提早发送音讯的能力,从而实现提早音讯。

论断

因为死信队列形式须要创立两个交换机(死信队列交换机 + 解决队列交换机)、两个队列(死信队列 + 解决队列),而提早插件形式只需创立一个交换机和一个队列,所以后者应用起来更简略。

我的项目源码地址

https://github.com/macrozheng…

公众号

mall 我的项目全套学习教程连载中,关注公众号 第一工夫获取。

正文完
 0