在电商、领取等畛域,往往会有这样的场景,用户下单后放弃领取了,那这笔订单会在指定的时间段后进行敞开操作,仔细的你肯定发现了像某宝、某东都有这样的逻辑,而且工夫很精确,误差在1s内;那他们是怎么实现的呢?
个别的做法有如下几种
- 定时工作敞开订单
- rocketmq提早队列
- rabbitmq死信队列
- 工夫轮算法
- redis过期监听
一、定时工作敞开订单(最low)
个别状况下,最不举荐的形式就是关单形式就是定时工作形式,起因咱们能够看上面的图来阐明
咱们假如,关单工夫为下单后10分钟,定时工作距离也是10分钟;通过上图咱们看出,如果在第1分钟下单,在第20分钟的时候能力被扫描到执行关单操作,这样误差达到10分钟,这在很多场景下是不可承受的,另外须要频繁扫描主订单号造成网络IO和磁盘IO的耗费,对实时交易造成肯定的冲击,所以PASS
二、rocketmq提早队列形式
提早音讯 生产者把音讯发送到音讯服务器后,并不心愿被立刻生产,而是期待指定工夫后才能够被消费者生产,这类音讯通常被称为提早音讯。 在RocketMQ开源版本中,反对提早音讯,然而不反对任意工夫精度的提早音讯,只反对特定级别的提早音讯。 音讯提早级别别离为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。
发送提早音讯(生产者)
/** * 推送提早音讯 * @param topic * @param body * @param producerGroup * @return boolean */ public boolean sendMessage(String topic, String body, String producerGroup) { try { Message recordMsg = new Message(topic, body.getBytes()); producer.setProducerGroup(producerGroup); //设置音讯提早级别,我这里设置14,对应就是延时10分钟 // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" recordMsg.setDelayTimeLevel(14); // 发送音讯到一个Broker SendResult sendResult = producer.send(recordMsg); // 通过sendResult返回音讯是否胜利送达 log.info("发送提早音讯后果:======sendResult:{}", sendResult); DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("发送工夫:{}", format.format(new Date())); return true; } catch (Exception e) { e.printStackTrace(); log.error("提早音讯队列推送音讯异样:{},推送内容:{}", e.getMessage(), body); } return false; }
生产提早音讯(消费者)
/** * 接管提早音讯 * * @param topic * @param consumerGroup * @param messageHandler */ public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler){ ThreadPoolUtil.execute(() -> { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup(consumerGroup); consumer.setVipChannelEnabled(false); consumer.setNamesrvAddr(address); //设置消费者拉取音讯的策略,*示意生产该topic下的所有音讯,也能够指定tag进行音讯过滤 consumer.subscribe(topic, "*"); //消费者端启动音讯监听,一旦生产者发送音讯被监听到,就打印消息,和rabbitmq中的handlerDelivery相似 consumer.registerMessageListener(messageHandler); consumer.start(); log.info("启动提早音讯队列监听胜利:" + topic); } catch (MQClientException e) { log.error("启动提早音讯队列监听失败:{}", e.getErrorMessage()); System.exit(1); } }); }
实现监听类,解决具体逻辑
/** * 提早音讯监听 * */@Componentpublic class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent>{ @Resource private MQUtil mqUtil; @Resource private CourseOrderTimeoutHandler courseOrderTimeoutHandler; @Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent){ // 订单超时监听 mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler); }}
/** * 实现监听 */@Slf4j@Componentpublic class CourseOrderTimeoutHandler implements MessageListenerConcurrently{ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { // 失去音讯体 String body = new String(msg.getBody()); JSONObject userJson = JSONObject.parseObject(body); TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class); // 解决具体的业务逻辑,,,,, DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("生产工夫:{}", format.format(new Date())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
这种形式相比定时工作好了很多,然而有一个致命的毛病,就是提早等级只有18种(商业版本反对自定义工夫),如果咱们想把敞开订单工夫设置在15分钟该如何解决呢?显然不够灵便。
三、rabbitmq死信队列的形式
Rabbitmq自身是没有提早队列的,只能通过Rabbitmq自身队列的个性来实现,想要Rabbitmq实现提早队列,须要应用Rabbitmq的死信交换机(Exchange)和音讯的存活工夫TTL(Time To Live)
死信交换机 一个音讯在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机能够对应很多队列。
一个音讯被Consumer拒收了,并且reject办法的参数里requeue是false。也就是说不会被再次放在队列里,被其余消费者应用。 下面的音讯的TTL到了,音讯过期了。
队列的长度限度满了。排在后面的音讯会被抛弃或者扔到死信路由上。 死信交换机就是一般的交换机,只是因为咱们把过期的音讯扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机
音讯TTL(音讯存活工夫) 音讯的TTL就是音讯的存活工夫。RabbitMQ能够对队列和音讯别离设置TTL。对队列设置就是队列没有消费者连着的保留工夫,也能够对每一个独自的音讯做独自的设置。超过了这个工夫,咱们认为这个音讯就死了,称之为死信。如果队列设置了,音讯也设置了,那么会取值较小的。所以一个音讯如果被路由到不同的队列中,这个音讯死亡的工夫有可能不一样(不同的队列设置)。这里单讲单个音讯的TTL,因为它才是实现提早工作的要害。
byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
能够通过设置音讯的expiration字段或者x-message-ttl属性来设置工夫,两者是一样的成果。只是expiration字段是字符串参数,所以要写个int类型的字符串:当下面的音讯扔到队列中后,过了60秒,如果没有被生产,它就死了。不会被消费者生产到。这个音讯前面的,没有“死掉”的音讯对顶上来,被消费者生产。死信在队列中并不会被删除和开释,它会被统计到队列的音讯数中去
解决流程图
创立交换机(Exchanges)和队列(Queues)
创立死信交换机
如图所示,就是创立一个一般的交换机,这里为了不便辨别,把交换机的名字取为:delay
创立主动过期音讯队列 这个队列的次要作用是让音讯定时过期的,比方咱们须要2小时候敞开订单,咱们就须要把音讯放进这个队列外面,把音讯过期工夫设置为2小时
创立一个一个名为delay\_queue1的主动过期的队列,当然图片下面的参数并不会让音讯主动过期,因为咱们并没有设置x-message-ttl参数,如果整个队列的音讯有音讯都是雷同的,能够设置,这里为了灵便,所以并没有设置,另外两个参数x-dead-letter-exchange代表音讯过期后,音讯要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置音讯过期后,进入死信交换机的routing-key,跟发送音讯的routing-key一个情理,依据这个key将音讯放入不同的队列
创立音讯解决队列 这个队列才是真正解决音讯的队列,所有进入这个队列的音讯都会被解决
音讯队列的名字为delay\_queue2 音讯队列绑定到交换机 进入交换机详情页面,将创立的2个队列(delayqueue1和delayqueue2)绑定到交换机下面
主动过期音讯队列的routing key 设置为delay 绑定delayqueue2
delayqueue2 的key要设置为创立主动过期的队列的x-dead-letter-routing-key参数,这样当音讯过期的时候就能够主动把音讯放入delay\_queue2这个队列中了 绑定后的治理页面如下图:
当然这个绑定也能够应用代码来实现,只是为了直观体现,所以本文应用的治理平台来操作 发送音讯
String msg = "hello word"; MessageProperties messageProperties = newMessageProperties(); messageProperties.setExpiration("6000");messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());Message message = newMessage(msg.getBytes(), messageProperties);rabbitTemplate.convertAndSend("delay", "delay",message);
设置了让音讯6秒后过期 留神:因为要让音讯主动过期,所以肯定不能设置delay\_queue1的监听,不能让这个队列外面的音讯被承受到,否则音讯一旦被生产,就不存在过期了
接管音讯 接管音讯配置好delay\_queue2的监听就好了
package wang.raye.rabbitmq.demo1;import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@ConfigurationpublicclassDelayQueue{ /** 音讯交换机的名字*/ publicstaticfinalString EXCHANGE = "delay"; /** 队列key1*/ publicstaticfinalString ROUTINGKEY1 = "delay"; /** 队列key2*/ publicstaticfinalString ROUTINGKEY2 = "delay_key"; /** * 配置链接信息 * @return */ @Bean publicConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672); connectionFactory.setUsername("kberp"); connectionFactory.setPassword("kberp"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); // 必须要设置 return connectionFactory; } /** * 配置音讯交换机 * 针对消费者配置 FanoutExchange: 将音讯散发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过增加属性key-value匹配 DirectExchange:依照routingkey散发到指定队列 TopicExchange:多关键字匹配 */ @Bean publicDirectExchange defaultExchange() { returnnewDirectExchange(EXCHANGE, true, false); } /** * 配置音讯队列2 * 针对消费者配置 * @return */ @Bean publicQueue queue() { returnnewQueue("delay_queue2", true); //队列长久 } /** * 将音讯队列2与交换机绑定 * 针对消费者配置 * @return */ @Bean @Autowired publicBinding binding() { returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); } /** * 承受音讯的监听,这个监听会承受音讯队列1的音讯 * 针对消费者配置 * @return */ @Bean @Autowired publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(newChannelAwareMessageListener() { publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{ byte[] body = message.getBody(); System.out.println("delay_queue2 收到音讯 : "+ newString(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认音讯胜利生产 } }); return container; } }
这种形式能够自定义进入死信队列的工夫;是不是很完满,然而有的小伙伴的状况是消息中间件就是rocketmq,公司也不可能会用商业版,怎么办?那就进入下一节
四、工夫轮算法
(1)创立环形队列,例如能够创立一个蕴含3600个slot的环形队列(实质是个数组)
(2)工作汇合,环上每一个slot是一个Set 同时,启动一个timer,这个timer每隔1s,在上述环形队列中挪动一格,有一个Current Index指针来标识正在检测的slot。
Task构造中有两个很重要的属性: (1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行工作 (2)订单号,要敞开的订单号(也能够是其余信息,比方:是一个基于某个订单号的工作)
假如以后Current Index指向第0格,例如在3610秒之后,有一个订单须要敞开,只需: (1)计算这个订单应该放在哪一个slot,当咱们计算的时候当初指向1,3610秒之后,应该是第10格,所以这个Task应该放在第10个slot的Set中 (2)计算这个Task的Cycle-Num,因为环形队列是3600格(每秒挪动一格,正好1小时),这个工作是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1
Current Index不停的挪动,每秒挪动到一个新slot,这个slot中对应的Set,每个Task看Cycle-Num是不是0: (1)如果不是0,阐明还须要多挪动几圈,将Cycle-Num减1 (2)如果是0,阐明马上要执行这个关单Task了,取出订单号执行关单(能够用独自的线程来执行Task),并把这个订单信息从Set中删除即可。 (1)无需再轮询全副订单,效率高 (2)一个订单,工作只执行一次 (3)时效性好,准确到秒(管制timer挪动频率能够控制精度)
五、redis过期监听
1.批改redis.windows.conf配置文件中notify-keyspace-events的值 默认配置notify-keyspace-events的值为 "" 批改为 notify-keyspace-events Ex 这样便开启了过期事件
2. 创立配置类RedisListenerConfig(配置RedisMessageListenerContainer这个Bean)
package com.zjt.shop.config;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer; @Configurationpublic class RedisListenerConfig { @Autowired private RedisTemplate redisTemplate; /** * @return */ @Bean public RedisTemplate redisTemplateInit() { // key序列化 redisTemplate.setKeySerializer(new StringRedisSerializer()); //val实例化 redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); return redisTemplate; } @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
3.继承KeyExpirationEventMessageListener创立redis过期事件的监听类
package com.zjt.shop.common.util;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;import com.zjt.shop.modules.order.service.OrderInfoService;import com.zjt.shop.modules.product.entity.OrderInfoEntity;import com.zjt.shop.modules.product.mapper.OrderInfoMapper;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.stereotype.Component; @Slf4j@Componentpublic class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Autowired private OrderInfoMapper orderInfoMapper; /** * 针对redis数据生效事件,进行数据处理 * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern) { try { String key = message.toString(); //从生效key中筛选代表订单生效的key if (key != null && key.startsWith("order_")) { //截取订单号,查问订单,如果是未领取状态则为-勾销订单 String orderNo = key.substring(6); QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("order_no",orderNo); OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper); if (orderInfo != null) { if (orderInfo.getOrderState() == 0) { //待领取 orderInfo.setOrderState(4); //已勾销 orderInfoMapper.updateById(orderInfo); log.info("订单号为【" + orderNo + "】超时未领取-主动批改为已勾销状态"); } } } } catch (Exception e) { e.printStackTrace(); log.error("【批改领取订单过期状态异样】:" + e.getMessage()); } }}
4:测试 通过redis客户端存一个无效工夫为3s的订单:
后果:
总结: 以上办法只是集体对于关单的一些想法,可能有些中央有疏漏,请在公众号间接留言进行指出,当然如果你有更好的关单形式也能够随时沟通交流
本文转自 https://juejin.cn/post/6987233263660040206,如有侵权,请分割删除。
相干视频:
【2021最新版】Android studio装置教程+Android(安卓)零基础教程视频(适宜Android 0根底,Android初学入门)含音视频_哔哩哔哩_bilibili
【Android进阶课程】——colin Compose的绘制原理解说(一)_哔哩哔哩_bilibili
https://www.bilibili.com/video/BV1g34y1U73v?spm_id_from=333.999.0.0_哔哩哔哩_bilibili