SpringBoot实战电商我的项目mall(35k+star)地址:https://github.com/macrozheng/mall
摘要
RabbitMQ实现提早音讯的形式有两种,一种是应用死信队列
实现,另一种是应用提早插件
实现。死信队列
实现咱们以前已经讲过,具体参考《mall整合RabbitMQ实现提早音讯》,这次咱们讲个更简略的,应用提早插件
实现。
学前筹备
学习本文须要对RabbitMQ有所理解,还不理解的敌人能够看下:《花了3天总结的RabbitMQ实用技巧,有点货色!》
插件装置
首先咱们须要下载并装置RabbitMQ的提早插件。
- 去RabbitMQ的官网下载插件,插件地址:https://www.rabbitmq.com/comm...
- 间接搜寻
rabbitmq_delayed_message_exchange
即可找到咱们须要下载的插件,下载和RabbitMQ配套的版本,不要弄错;
- 将插件文件复制到RabbitMQ装置目录的
plugins
目录下;
- 进入RabbitMQ装置目录的
sbin
目录下,应用如下命令启用提早插件;
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 启用插件胜利后就能够看到如下信息,之后重新启动RabbitMQ服务即可。
实现提早音讯
接下来咱们须要在SpringBoot中实现提早音讯性能,这次仍然沿用商品下单的场景。比如说有个用户下单了,他60分钟不领取订单,订单就会被勾销,这就是一个典型的提早音讯应用场景。
- 首先咱们须要在
pom.xml
文件中增加AMQP
相干依赖;
<!--音讯队列相干依赖--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 之后在
application.yml
增加RabbitMQ的相干配置;
spring: rabbitmq: host: localhost # rabbitmq的连贯地址 port: 5672 # rabbitmq的连贯端口号 virtual-host: /mall # rabbitmq的虚构host username: mall # rabbitmq的用户名 password: mall # rabbitmq的明码 publisher-confirms: true #如果对异步音讯须要回调必须设置为true
- 接下来创立RabbitMQ的Java配置,次要用于配置交换机、队列和绑定关系;
/** * 音讯队列配置 * Created by macro on 2018/9/14. */@Configurationpublic class RabbitMqConfig { /** * 订单提早插件音讯队列所绑定的交换机 */ @Bean CustomExchange orderPluginDirect() { //创立一个自定义交换机,能够发送提早音讯 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args); } /** * 订单提早插件队列 */ @Bean public Queue orderPluginQueue() { return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName()); } /** * 将订单提早插件队列绑定到交换机 */ @Bean public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) { return BindingBuilder .bind(orderPluginQueue) .to(orderPluginDirect) .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()) .noargs(); }}
- 创立一个勾销订单音讯的收回者,通过给音讯设置
x-delay
头来设置音讯从交换机发送到队列的延迟时间;
/** * 勾销订单音讯的收回者 * Created by macro on 2018/9/14. */@Componentpublic class CancelOrderSender { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId,final long delayTimes){ //给提早队列发送音讯 amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给音讯设置提早毫秒值 message.getMessageProperties().setHeader("x-delay",delayTimes); return message; } }); LOGGER.info("send delay message orderId:{}",orderId); }}
- 创立一个勾销订单音讯的接收者,用于解决订单提早插件队列中的音讯。
/** * 勾销订单音讯的解决者 * Created by macro on 2018/9/14. */@Component@RabbitListener(queues = "mall.order.cancel.plugin")public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); @Autowired private OmsPortalOrderService portalOrderService; @RabbitHandler public void handle(Long orderId){ LOGGER.info("receive delay message orderId:{}",orderId); portalOrderService.cancelOrder(orderId); }}
- 而后在咱们的订单业务实现类中增加如下逻辑,当下单胜利之前,往音讯队列中发送一个勾销订单的提早音讯,这样如果订单没有被领取的话,就能勾销订单了;
/** * 前台订单治理Service * Created by macro on 2018/8/30. */@Servicepublic class OmsPortalOrderServiceImpl implements OmsPortalOrderService { private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class); @Autowired private CancelOrderSender cancelOrderSender; @Override public CommonResult generateOrder(OrderParam orderParam) { //todo 执行一系类下单操作,具体参考mall我的项目 LOGGER.info("process generateOrder"); //下单实现后开启一个提早音讯,用于当用户没有付款时勾销订单(orderId应该在下单后生成) sendDelayMessageCancelOrder(11L); return CommonResult.success(null, "下单胜利"); } @Override public void cancelOrder(Long orderId) { //todo 执行一系类勾销订单操作,具体参考mall我的项目 LOGGER.info("process cancelOrder orderId:{}",orderId); } private void sendDelayMessageCancelOrder(Long orderId) { //获取订单超时工夫,假如为60分钟(测试用的30秒) long delayTimes = 30 * 1000; //发送提早音讯 cancelOrderSender.sendMessage(orderId, delayTimes); }}
- 启动我的项目后,在Swagger中调用下单接口;
- 调用实现后查看控制台日志能够发现,从音讯发送和音讯接管解决正好相差了
30s
,咱们设置的延迟时间。
2020-06-08 13:46:01.474 INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl : process generateOrder2020-06-08 13:46:01.482 INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender : send delay message orderId:112020-06-08 13:46:31.517 INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver : receive delay message orderId:112020-06-08 13:46:31.520 INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl : process cancelOrder orderId:11
两种实现形式比照
咱们之前应用过死信队列的形式,这里咱们把两种形式做个比照,先来聊下这两种形式的实现原理。
死信队列
死信队列是这样一个队列,如果音讯发送到该队列并超过了设置的工夫,就会被转发到设置好的解决超时音讯的队列当中去,利用该个性能够实现提早音讯。
提早插件
通过装置插件,自定义交换机,让交换机领有提早发送音讯的能力,从而实现提早音讯。
论断
因为死信队列形式须要创立两个交换机(死信队列交换机+解决队列交换机)、两个队列(死信队列+解决队列),而提早插件形式只需创立一个交换机和一个队列,所以后者应用起来更简略。
我的项目源码地址
https://github.com/macrozheng...
公众号
mall我的项目全套学习教程连载中,关注公众号第一工夫获取。