关于android:领导谁再用定时任务实现关闭订单立马滚蛋

35次阅读

共计 11947 个字符,预计需要花费 30 分钟才能阅读完成。

在电商、领取等畛域,往往会有这样的场景,用户下单后放弃领取了,那这笔订单会在指定的时间段后进行敞开操作,仔细的你肯定发现了像某宝、某东都有这样的逻辑,而且工夫很精确,误差在 1s 内;那他们是怎么实现的呢?

个别的做法有如下几种

  • 定时工作敞开订单
  • rocketmq 提早队列
  • rabbitmq 死信队列
  • 工夫轮算法
  • redis 过期监听

一、定时工作敞开订单(最 low)

个别状况下,最不举荐的形式就是关单形式就是定时工作形式,起因咱们能够看上面的图来阐明

咱们假如,关单工夫为下单后 10 分钟,定时工作距离也是 10 分钟;通过上图咱们看出,如果在第 1 分钟下单,在第 20 分钟的时候能力被扫描到执行关单操作,这样误差达到 10 分钟,这在很多场景下是不可承受的,另外须要频繁扫描主订单号造成网络 IO 和磁盘 IO 的耗费,对实时交易造成肯定的冲击,所以 PASS

二、rocketmq 提早队列形式

提早音讯 生产者把音讯发送到音讯服务器后,并不心愿被立刻生产,而是期待指定工夫后才能够被消费者生产,这类音讯通常被称为提早音讯。在 RocketMQ 开源版本中,反对提早音讯,然而不反对任意工夫精度的提早音讯,只反对特定级别的提早音讯。音讯提早级别别离为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共 18 个级别。

发送提早音讯(生产者)

/**
     * 推送提早音讯
     * @param topic 
     * @param body 
     * @param producerGroup 
     * @return boolean
     */
    public boolean sendMessage(String topic, String body, String producerGroup)
    {
        try
        {Message recordMsg = new Message(topic, body.getBytes());
            producer.setProducerGroup(producerGroup);

            // 设置音讯提早级别,我这里设置 14,对应就是延时 10 分钟
            // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            recordMsg.setDelayTimeLevel(14);
            // 发送音讯到一个 Broker
            SendResult sendResult = producer.send(recordMsg);
            // 通过 sendResult 返回音讯是否胜利送达
            log.info("发送提早音讯后果:======sendResult:{}", sendResult);
            DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            log.info("发送工夫:{}", format.format(new Date()));

            return true;
        }
        catch (Exception e)
        {e.printStackTrace();
            log.error("提早音讯队列推送音讯异样:{}, 推送内容:{}", e.getMessage(), body);
        }
        return false;
    }

生产提早音讯(消费者)

/**
     * 接管提早音讯
     * 
     * @param topic
     * @param consumerGroup
     * @param messageHandler
     */
    public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler)
{ThreadPoolUtil.execute(() ->
        {
            try
            {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
                consumer.setConsumerGroup(consumerGroup);
                consumer.setVipChannelEnabled(false);
                consumer.setNamesrvAddr(address);
                // 设置消费者拉取音讯的策略,* 示意生产该 topic 下的所有音讯,也能够指定 tag 进行音讯过滤
                consumer.subscribe(topic, "*");
                // 消费者端启动音讯监听,一旦生产者发送音讯被监听到,就打印消息,和 rabbitmq 中的 handlerDelivery 相似
                consumer.registerMessageListener(messageHandler);
                consumer.start();
                log.info("启动提早音讯队列监听胜利:" + topic);
            }
            catch (MQClientException e)
            {log.error("启动提早音讯队列监听失败:{}", e.getErrorMessage());
                System.exit(1);
            }
        });
    }

实现监听类,解决具体逻辑

/**
 * 提早音讯监听
 * 
 */
@Component
public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent>
{

    @Resource
    private MQUtil mqUtil;

    @Resource
    private CourseOrderTimeoutHandler courseOrderTimeoutHandler;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent)
{
        // 订单超时监听
        mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler);
    }
}
/**
 *  实现监听
 */
@Slf4j
@Component
public class CourseOrderTimeoutHandler implements MessageListenerConcurrently
{

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list)
        {
            // 失去音讯体
            String body = new String(msg.getBody());
            JSONObject userJson = JSONObject.parseObject(body);
            TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class);

            // 解决具体的业务逻辑,,,,,DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
           log.info("生产工夫:{}", format.format(new Date()));
           
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

这种形式相比定时工作好了很多,然而有一个致命的毛病,就是提早等级只有 18 种(商业版本反对自定义工夫),如果咱们想把敞开订单工夫设置在 15 分钟该如何解决呢?显然不够灵便。

三、rabbitmq 死信队列的形式

Rabbitmq 自身是没有提早队列的,只能通过 Rabbitmq 自身队列的个性来实现,想要 Rabbitmq 实现提早队列,须要应用 Rabbitmq 的死信交换机(Exchange)和音讯的存活工夫 TTL(Time To Live)

死信交换机 一个音讯在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机能够对应很多队列。

一个音讯被 Consumer 拒收了,并且 reject 办法的参数里 requeue 是 false。也就是说不会被再次放在队列里,被其余消费者应用。下面的音讯的 TTL 到了,音讯过期了。

队列的长度限度满了。排在后面的音讯会被抛弃或者扔到死信路由上。死信交换机就是一般的交换机,只是因为咱们把过期的音讯扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

音讯 TTL(音讯存活工夫) 音讯的 TTL 就是音讯的存活工夫。RabbitMQ 能够对队列和音讯别离设置 TTL。对队列设置就是队列没有消费者连着的保留工夫,也能够对每一个独自的音讯做独自的设置。超过了这个工夫,咱们认为这个音讯就死了,称之为死信。如果队列设置了,音讯也设置了,那么会取值较小的。所以一个音讯如果被路由到不同的队列中,这个音讯死亡的工夫有可能不一样(不同的队列设置)。这里单讲单个音讯的 TTL,因为它才是实现提早工作的要害。

byte[] messageBodyBytes = "Hello, world!".getBytes();  
AMQP.BasicProperties properties = new AMQP.BasicProperties();  
properties.setExpiration("60000");  
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);  

能够通过设置音讯的 expiration 字段或者 x -message-ttl 属性来设置工夫,两者是一样的成果。只是 expiration 字段是字符串参数,所以要写个 int 类型的字符串:当下面的音讯扔到队列中后,过了 60 秒,如果没有被生产,它就死了。不会被消费者生产到。这个音讯前面的,没有“死掉”的音讯对顶上来,被消费者生产。死信在队列中并不会被删除和开释,它会被统计到队列的音讯数中去

解决流程图

创立交换机(Exchanges)和队列(Queues)

创立死信交换机

如图所示,就是创立一个一般的交换机,这里为了不便辨别,把交换机的名字取为:delay

创立主动过期音讯队列 这个队列的次要作用是让音讯定时过期的,比方咱们须要 2 小时候敞开订单,咱们就须要把音讯放进这个队列外面,把音讯过期工夫设置为 2 小时

创立一个一个名为 delay\_queue1 的主动过期的队列,当然图片下面的参数并不会让音讯主动过期,因为咱们并没有设置 x -message-ttl 参数,如果整个队列的音讯有音讯都是雷同的,能够设置,这里为了灵便,所以并没有设置,另外两个参数 x -dead-letter-exchange 代表音讯过期后,音讯要进入的交换机,这里配置的是 delay,也就是死信交换机,x-dead-letter-routing-key 是配置音讯过期后,进入死信交换机的 routing-key, 跟发送音讯的 routing-key 一个情理,依据这个 key 将音讯放入不同的队列

创立音讯解决队列 这个队列才是真正解决音讯的队列,所有进入这个队列的音讯都会被解决

音讯队列的名字为 delay\_queue2 音讯队列绑定到交换机 进入交换机详情页面,将创立的 2 个队列(delayqueue1 和 delayqueue2)绑定到交换机下面

主动过期音讯队列的 routing key 设置为 delay 绑定 delayqueue2

delayqueue2 的 key 要设置为创立主动过期的队列的 x -dead-letter-routing-key 参数,这样当音讯过期的时候就能够主动把音讯放入 delay\_queue2 这个队列中了 绑定后的治理页面如下图:

当然这个绑定也能够应用代码来实现,只是为了直观体现,所以本文应用的治理平台来操作 发送音讯

String msg = "hello word";  
MessageProperties messageProperties = newMessageProperties();  
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = newMessage(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);

设置了让音讯 6 秒后过期 留神:因为要让音讯主动过期,所以肯定不能设置 delay\_queue1 的监听,不能让这个队列外面的音讯被承受到,否则音讯一旦被生产,就不存在过期了

接管音讯 接管音讯配置好 delay\_queue2 的监听就好了

package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;  
import org.springframework.amqp.core.Binding;  
import org.springframework.amqp.core.BindingBuilder;  
import org.springframework.amqp.core.DirectExchange;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;
@Configuration
publicclassDelayQueue{  
    /** 音讯交换机的名字 */
    publicstaticfinalString EXCHANGE = "delay";
    /** 队列 key1*/
    publicstaticfinalString ROUTINGKEY1 = "delay";
    /** 队列 key2*/
    publicstaticfinalString ROUTINGKEY2 = "delay_key";
    /**
     * 配置链接信息
     * @return
     */
    @Bean
    publicConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);
        connectionFactory.setUsername("kberp");
        connectionFactory.setPassword("kberp");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); // 必须要设置
        return connectionFactory;
    }
    /**  
     * 配置音讯交换机
     * 针对消费者配置  
        FanoutExchange: 将音讯散发到所有的绑定队列,无 routingkey 的概念  
        HeadersExchange:通过增加属性 key-value 匹配  
        DirectExchange: 依照 routingkey 散发到指定队列  
        TopicExchange: 多关键字匹配  
     */  
    @Bean  
    publicDirectExchange defaultExchange() {returnnewDirectExchange(EXCHANGE, true, false);
    }
    /**
     * 配置音讯队列 2
     * 针对消费者配置  
     * @return
     */
    @Bean
    publicQueue queue() {returnnewQueue("delay_queue2", true); // 队列长久  
    }
    /**
     * 将音讯队列 2 与交换机绑定
     * 针对消费者配置  
     * @return
     */
    @Bean  
    @Autowired
    publicBinding binding() {returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  
    }
    /**
     * 承受音讯的监听,这个监听会承受音讯队列 1 的音讯
     * 针对消费者配置  
     * @return
     */
    @Bean  
    @Autowired
    publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置确认模式手工确认  
        container.setMessageListener(newChannelAwareMessageListener() {publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{byte[] body = message.getBody();  
                System.out.println("delay_queue2 收到音讯 :"+ newString(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认音讯胜利生产  
            }  
        });  
        return container;  
    }  
}

这种形式能够自定义进入死信队列的工夫;是不是很完满,然而有的小伙伴的状况是消息中间件就是 rocketmq,公司也不可能会用商业版,怎么办?那就进入下一节

四、工夫轮算法

(1)创立环形队列,例如能够创立一个蕴含 3600 个 slot 的环形队列(实质是个数组)

(2)工作汇合,环上每一个 slot 是一个 Set 同时,启动一个 timer,这个 timer 每隔 1s,在上述环形队列中挪动一格,有一个 Current Index 指针来标识正在检测的 slot。

Task 构造中有两个很重要的属性:(1)Cycle-Num:当 Current Index 第几圈扫描到这个 Slot 时,执行工作 (2)订单号,要敞开的订单号(也能够是其余信息,比方:是一个基于某个订单号的工作)

假如以后 Current Index 指向第 0 格,例如在 3610 秒之后,有一个订单须要敞开,只需:(1)计算这个订单应该放在哪一个 slot,当咱们计算的时候当初指向 1,3610 秒之后,应该是第 10 格,所以这个 Task 应该放在第 10 个 slot 的 Set 中 (2)计算这个 Task 的 Cycle-Num,因为环形队列是 3600 格(每秒挪动一格,正好 1 小时),这个工作是 3610 秒后执行,所以应该绕 3610/3600= 1 圈之后再执行,于是 Cycle-Num=1

Current Index 不停的挪动,每秒挪动到一个新 slot,这个 slot 中对应的 Set,每个 Task 看 Cycle-Num 是不是 0:(1)如果不是 0,阐明还须要多挪动几圈,将 Cycle-Num 减 1 (2)如果是 0,阐明马上要执行这个关单 Task 了,取出订单号执行关单 (能够用独自的线程来执行 Task),并把这个订单信息从 Set 中删除即可。(1) 无需再轮询全副订单,效率高 (2)一个订单,工作只执行一次 (3)时效性好,准确到秒(管制 timer 挪动频率能够控制精度)

五、redis 过期监听

1. 批改 redis.windows.conf 配置文件中 notify-keyspace-events 的值 默认配置 notify-keyspace-events 的值为 “” 批改为 notify-keyspace-events Ex 这样便开启了过期事件

2. 创立配置类 RedisListenerConfig(配置 RedisMessageListenerContainer 这个 Bean)

package com.zjt.shop.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
 
 
@Configuration
public class RedisListenerConfig {
 
    @Autowired
    private RedisTemplate redisTemplate;
 
    /**
     * @return
     */
    @Bean
    public RedisTemplate redisTemplateInit() {
 
        // key 序列化
        redisTemplate.setKeySerializer(new StringRedisSerializer());
 
        //val 实例化
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
 
        return redisTemplate;
    }
 
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
 
}

3. 继承 KeyExpirationEventMessageListener 创立 redis 过期事件的监听类

package com.zjt.shop.common.util;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.zjt.shop.modules.order.service.OrderInfoService;
import com.zjt.shop.modules.product.entity.OrderInfoEntity;
import com.zjt.shop.modules.product.mapper.OrderInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
 

@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);
    }
 
    @Autowired
    private OrderInfoMapper orderInfoMapper;
 
    /**
     * 针对 redis 数据生效事件,进行数据处理
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
      try {String key = message.toString();
          // 从生效 key 中筛选代表订单生效的 key
          if (key != null && key.startsWith("order_")) {
              // 截取订单号,查问订单,如果是未领取状态则为 - 勾销订单
              String orderNo = key.substring(6);
              QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>();
              queryWrapper.eq("order_no",orderNo);
              OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper);
              if (orderInfo != null) {if (orderInfo.getOrderState() == 0) {   // 待领取
                      orderInfo.setOrderState(4);         // 已勾销
                      orderInfoMapper.updateById(orderInfo);
                      log.info("订单号为【" + orderNo + "】超时未领取 - 主动批改为已勾销状态");
                  }
              }
          }
      } catch (Exception e) {e.printStackTrace();
          log.error("【批改领取订单过期状态异样】:" + e.getMessage());
      }
    }
}

4:测试 通过 redis 客户端存一个无效工夫为 3s 的订单:

后果:

总结: 以上办法只是集体对于关单的一些想法,可能有些中央有疏漏,请在公众号间接留言进行指出,当然如果你有更好的关单形式也能够随时沟通交流

本文转自 https://juejin.cn/post/6987233263660040206,如有侵权,请分割删除。

相干视频:

【2021 最新版】Android studio 装置教程 +Android(安卓)零基础教程视频(适宜 Android 0 根底,Android 初学入门)含音视频_哔哩哔哩_bilibili

【Android 进阶课程】——colin Compose 的绘制原理解说(一)_哔哩哔哩_bilibili

https://www.bilibili.com/video/BV1g34y1U73v?spm_id_from=333.999.0.0_哔哩哔哩_bilibili

正文完
 0