1. 生成订单30分钟未领取,则主动勾销
2. 30分钟未回复,则完结会话

对上述的工作,咱们给一个业余的名字来形容,那就是延时工作


一、延时工作是什么

延时工作不同于个别的定时工作,延时工作是在某事件触发后的将来某个时刻执行,没有反复的执行周期。

二、延时工作和定时工作的区别是什么

  1. 定时工作有明确的触发工夫,延时工作没有
  2. 定时工作有执行周期,而延时工作在某事件触发后一段时间内执行,没有执行周期
  3. 定时工作个别执行的是批处理多个工作,而延时工作个别是单任务解决

    三、技术比照

    本文次要解说Redis的Zset实现延时工作,其余计划只做介绍

1.数据库轮询

通过定时组件的去扫描数据库,通过工夫来判断是否有超时的订单,而后进行update或delete等操作

长处:简单易行
毛病:

  1. 对服务器内存耗费大
  2. 工夫距离小,数据库损耗极大
  3. 数据内存态,不牢靠
  4. 如果工作量过大,对数据库造成的压力很大 。频繁查询数据库带来性能影响

2.JDK的提早队列

利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在提早期满的时候能力从中获取元素,放入DelayQueue中,是必须实现Delayed接口的。

长处:实现简略,效率高,工作触发时间延迟低。
毛病:

  1. 服务器重启后,数据全副隐没,怕宕机
  2. 因为内存条件限度的起因,比方下单未付款的订单数太多,那么很容易就呈现OOM异样
  3. 数据内存态,不牢靠

3.工夫轮算法

工夫轮TimingWheel是一种高效、低提早的调度数据结构,底层采纳数组实现存储工作列表的环形队列,示意图如下:工夫轮

工夫轮算法能够类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样能够看出定时轮由个3个重要的属数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(工夫单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和事实中的始终的秒针走动齐全相似了。
如果以后指针指在1下面,我有一个工作须要4秒当前执行,那么这个执行的线程回调或者音讯将会被放在5上。那如果须要在20秒之后执行怎么办,因为这个环形构造槽数只到8,如果要20秒,指针须要多转2圈。地位是在2圈之后的5下面(20 % 8 + 1)

长处:效率高,工作触发时间延迟工夫比delayQueue低
毛病:

  1. 服务器重启后,数据全副隐没,怕宕机
  2. 容易就呈现OOM异样
  3. 数据内存态,不牢靠

4.应用音讯队列

应用RabbitMQ死信队列依赖于RabbitMQ的两个个性:TTL和DLX。
     TTL:Time To Live,音讯存活工夫,包含两个维度:队列音讯存活工夫和音讯自身的存活工夫。
     DLX:Dead Letter Exchange,死信交换器。

长处:异步交互能够削峰,高效,能够利用rabbitmq的分布式个性轻易的进行横向扩大,音讯反对长久化减少了可靠性。
毛病:
1.自身的易用度要依赖于rabbitMq的运维.因为要援用rabbitMq,所以复杂度和老本变高
2.RabbitMq是一个消息中间件;提早队列只是其中一个小性能,如果团队技术栈中原本就是应用RabbitMq那还好,如果不是,那为了应用提早队列而去部署一套RabbitMq老本有点大;

5.Redis的Zset实现延时工作

为什么采纳Redis的ZSet实现提早工作?

zset数据类型的去重有序(分数排序)特点进行提早。例如:工夫戳作为score进行排序

5.1 思路剖析

  1. 我的项目启动时启用一条线程,线程用于距离肯定工夫去查问redis的待执行工作。其工作jobId为业务id,值为要执行的工夫。
  2. 查问到执行的工作时,将其从redis的信息中进行删除。(删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行。)
  3. 删除redis中的记录之后,执行工作。将执行jobId也就是业务id对应的工作。
    理论场景中,还会波及延时工作批改,删除等,这些场景能够指定标记,批改标识即可,当然也能够在业务逻辑中做补充条件的判断。

5.2 Redis中Zset的简略介绍及应用

Redis 有序汇合是 string 类型元素的汇合,且不容许反复的成员。每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为汇合中的成员进行从小到大的排序。有序汇合的成员是惟一的,但分数(score)却能够反复。

常用命令

  • ZADD命令 : 将一个或多个成员元素及其分数值退出到有序集当中,或者更新已存在成员的分数
  • ZCARD命令 : 获取有序汇合的成员数
  • ZRANGEBYSCORE: 通过分数返回有序汇合指定区间内的成员
  • ZREM : 移除有序汇合中的一个或多个成员

java中操作简略介绍

    1.add(K key, V value, double score)    增加元素到变量中同时指定元素的分值。    redisTemplate.opsForZSet().add("zSetValue","A",1);            2.rangeByScore(K key, double min, double max)    依据设置的score获取区间值。    zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2);            3.rangeByScore(K key, double min, double max,long offset, long count)    依据设置的score获取区间值从给定下标和给定长度获取最终值。    zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3);            4.rangeWithScores(K key, long start, long end)    获取RedisZSetCommands.Tuples的区间值。    Set<ZSetOperations.TypedTuple<Object>> typedTupleSet = redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3);    Iterator<ZSetOperations.TypedTuple<Object>> iterator = typedTupleSet.iterator();    while (iterator.hasNext()){        ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();        Object value = typedTuple.getValue();        double score = typedTuple.getScore();      }          5.删除成员    redisTemplate.opsForZSet().remove("myZset","a","b");

以下代码能够间接应用-基于Spring Boot我的项目

5.3 延时队列工厂

代码中正文有具体介绍
/** * 延时队列工厂 * **/@Slf4jpublic abstract class AbstractDelayQueueMachineFactory {    @Autowired    private RedisUtil redisUtil;    @Autowired    private ThreadPoolTaskExecutor asyncTaskExecutor;    /**     * 插入工作id     *     * @param jobId 工作id(队列内惟一)     * @param time  延时工夫(单位 :毫秒)     * @return 是否插入胜利     */    public boolean addJob(String jobId, Integer time) {        Calendar instance = Calendar.getInstance();        //减少延时工夫,获取最终触发工夫        instance.add(Calendar.MILLISECOND, time);         long delayMillisecond = instance.getTimeInMillis();        log.info("延时队列增加问题{}",jobId);        return redisUtil.zAdd(setDelayQueueName(), delayMillisecond, jobId);    }    /**     * 删除工作id     *     * @param jobId 工作id(队列内惟一)     */    public boolean removeJob(String jobId) {        Long num = redisUtil.zRemove(setDelayQueueName(), jobId);        if (num > 0) return true;        return false;    }    /**     * 延时队列机器开始运作     */    private void startDelayQueueMachine() {        log.info("延时队列{}开始启动", setDelayQueueName());        // 监听redis队列        while (true) {            try {                // 获取以后工夫前的工作列表                Set<ZSetOperations.TypedTuple<Object>> tuples = redisUtil.zRangeByScore(setDelayQueueName(), 0, System.currentTimeMillis() );                // 如果工作不为空                if (!CollectionUtils.isEmpty(tuples)) {                    log.info("延时工作开始执行:{}", JSONUtil.toJsonStr(tuples));                    Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator();                    while (iterator.hasNext()){                        ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();                        String questionId = Convert.toStr(typedTuple.getValue());                        // 移除缓存,如果移除胜利则示意以后线程解决了延时工作,则执行延时工作                        // 删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行                        Long num = redisUtil.zRemove(setDelayQueueName(), questionId);                        // 如果移除胜利, 则执行                        if (num > 0) {                            asyncTaskExecutor.execute(() -> invoke(questionId));                        }                    }                }            } catch (Exception e) {                log.error("解决延时工作产生异样,异样起因为{}", e.getMessage(), e);            } finally {                // 距离()分钟执行一次                //依据业务场景设置对应工夫                try {                    TimeUnit.MINUTES.sleep(5);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }    /**     * 最终执行的工作办法     *     * @param jobId 工作id     */    public abstract void invoke(String jobId);    /**     * 要实现延时队列的名字     */    public abstract String setDelayQueueName();    //Spring Boot初始化时开启一条线程运行    @PostConstruct    public void init() {        new Thread(this::startDelayQueueMachine).start();    }}
addJob办法是增加工作id和延时工夫(单位毫秒)
redisUtil.zRangeByScore ::依据设置的score获取区间值
@PostConstruct注解:是针对Bean的初始化实现之后做一些事件,比方注册一些监听器..(初始化实现计划有很多可自行抉择)
为什么先删除后执行业务逻辑?
删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行

5.4 RedisUtil工具类

@Component@Slf4jpublic class RedisUtil {    @Autowired    private RedisTemplate<String, Object> redisTemplate;    /**     * 向Zset里增加成员     *     * @param key   key值     * @param score 分数,通常用于排序     * @param value 值     * @return 减少状态     */    public boolean zAdd(String key, long score, String value) {        Boolean result = redisTemplate.opsForZSet().add(key, value, score);        return result;    }    /**     * 获取 某key 下 某一分值区间的队列     *     * @param key  缓存key     * @param from 开始工夫     * @param to   完结工夫     * @return 数据     */    public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {        Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);        return set;    }    /**     * 移除 Zset队列值     *     * @param key   key值     * @param value 删除的汇合     * @return 删除数量     */    public Long zRemove(String key, String... value) {        return redisTemplate.opsForZSet().remove(key, value);    }}

5.5 测试延时队列

继承上文中的延时队列工厂重写invoke(解决业务)setDelayQueueName--延时队列名称也就是Zset中的key值
/** * 测试延时队列 * */@Slf4j@Componentpublic class DelayQueue extends AbstractDelayQueueMachineFactory {    @Autowired    private ZnjExpertConsultQuestionRecordMapper questionRecordMapper;    /**      * 解决业务逻辑     */     @Override    public void invoke(String jobId) {        Integer questionId = Convert.toInt(jobId);        ZnjExpertConsultQuestionRecordEntity questionRecordEntity = questionRecordMapper.selectById(questionId);        Boolean flag = znjExpertConsultService.whetherEnd(questionRecordEntity);   /**     * 延时队列名对立设定    */     @Override    public String setDelayQueueName() {        return "expert_consult:delay_queue";    }}

运行胜利,当Redis中有工作时,则执行工作即可

四、总结

应用redis zset来实现延时工作,总体类说是可行的

  • 实时性: 容许存在肯定工夫内的误差(能够通过工夫设定)
  • 高可用性:反对单机,反对集群
  • 音讯可靠性: 保障至多被生产一次
  • 音讯长久化: 基于Redis本身的长久化个性,下面的音讯可靠性基于Redis的长久化,所以如果redis数据失落,意味着提早音讯的失落,不过能够做主备和集群保障