关于spring:Redis实现延时任务

55次阅读

共计 6427 个字符,预计需要花费 17 分钟才能阅读完成。

1. 生成订单 30 分钟未领取,则主动勾销
2. 30 分钟未回复,则完结会话

对上述的工作,咱们给一个业余的名字来形容,那就是 延时工作


一、延时工作是什么

延时工作 不同于个别的定时工作,延时工作是在 某事件触发 后的将来某个时刻执行,没有反复的执行周期。

二、延时工作和定时工作的区别是什么

  1. 定时工作有明确的触发工夫,延时工作没有
  2. 定时工作有执行周期,而延时工作在某事件触发后一段时间内执行,没有执行周期
  3. 定时工作个别执行的是批处理多个工作,而延时工作个别是单任务解决

    三、技术比照

    本文次要解说 Redis 的 Zset 实现延时工作, 其余计划只做介绍

1. 数据库轮询

通过 定时组件 的去扫描数据库, 通过工夫来判断是否有超时的订单,而后进行 update 或 delete 等操作

长处 : 简单易行
毛病:

  1. 对服务器内存耗费大
  2. 工夫距离小, 数据库损耗极大
  3. 数据内存态,不牢靠
  4. 如果工作量过大,对数据库造成的压力很大。频繁查询数据库带来性能影响

2.JDK 的提早队列

利用 JDK 自带的 DelayQueue 来实现,这是一个无界阻塞队列,该队列只有在提早期满的时候能力从中获取元素,放入 DelayQueue 中,是必须实现 Delayed 接口 的。

长处 : 实现简略, 效率高, 工作触发时间延迟低。
毛病:

  1. 服务器重启后,数据全副隐没,怕宕机
  2. 因为内存条件限度的起因,比方下单未付款的订单数太多,那么很容易就呈现 OOM 异样
  3. 数据内存态,不牢靠

3. 工夫轮算法

工夫轮 TimingWheel 是一种高效、低提早的调度数据结构,底层采纳数组实现存储工作列表的环形队列,示意图如下:工夫轮

工夫轮算法能够类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样能够看出定时轮由个 3 个重要的属数,ticksPerWheel(一轮的 tick 数),tickDuration(一个 tick 的持续时间)以及 timeUnit(工夫单位),例如当 ticksPerWheel=60,tickDuration=1,timeUnit= 秒,这就和事实中的始终的秒针走动齐全相似了。
如果以后指针指在 1 下面,我有一个工作须要 4 秒当前执行,那么这个执行的线程回调或者音讯将会被放在 5 上。那如果须要在 20 秒之后执行怎么办,因为这个环形构造槽数只到 8,如果要 20 秒,指针须要多转 2 圈。地位是在 2 圈之后的 5 下面(20 % 8 + 1)

长处 : 效率高, 工作触发时间延迟工夫比 delayQueue 低
毛病:

  1. 服务器重启后,数据全副隐没,怕宕机
  2. 容易就呈现 OOM 异样
  3. 数据内存态,不牢靠

4. 应用音讯队列

应用 RabbitMQ 死信队列依赖于 RabbitMQ 的两个个性:TTL 和 DLX。
     TTL:Time To Live,音讯存活工夫,包含两个维度:队列音讯存活工夫和音讯自身的存活工夫。
     DLX:Dead Letter Exchange,死信交换器。

长处 : 异步交互能够削峰, 高效, 能够利用 rabbitmq 的分布式个性轻易的进行横向扩大, 音讯反对长久化减少了可靠性。
毛病:
1. 自身的易用度要依赖于 rabbitMq 的运维. 因为要援用 rabbitMq, 所以复杂度和老本变高
2.RabbitMq 是一个消息中间件; 提早队列只是其中一个小性能, 如果团队技术栈中原本就是应用 RabbitMq 那还好, 如果不是, 那为了应用提早队列而去部署一套 RabbitMq 老本有点大;

5.Redis 的 Zset 实现延时工作

为什么采纳 Redis 的 ZSet 实现提早工作?

zset 数据类型的去重有序(分数排序)特点进行提早。例如:工夫戳作为 score 进行排序

5.1 思路剖析

  1. 我的项目启动时启用 一条线程,线程用于距离肯定工夫去查问 redis 的待执行工作。其工作 jobId 为业务 id,值为要执行的工夫。
  2. 查问到执行的工作时,将其从 redis 的信息中进行删除。(删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行。)
  3. 删除 redis 中的记录之后,执行工作。将执行 jobId 也就是业务 id 对应的工作。
    理论场景中,还会波及延时工作批改,删除等,这些场景能够指定标记, 批改标识即可,当然也能够在业务逻辑中做补充条件的判断。

5.2 Redis 中 Zset 的简略介绍及应用

Redis 有序汇合是 string 类型元素的汇合, 且不容许反复的成员。每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为汇合中的成员进行从小到大的排序。有序汇合的成员是惟一的, 但分数 (score) 却能够反复。

常用命令

  • ZADD 命令 : 将一个或多个成员元素及其分数值退出到有序集当中,或者更新已存在成员的分数
  • ZCARD 命令 : 获取有序汇合的成员数
  • ZRANGEBYSCORE: 通过分数返回有序汇合指定区间内的成员
  • ZREM : 移除有序汇合中的一个或多个成员

java 中操作简略介绍

    1.add(K key, V value, double score)
    增加元素到变量中同时指定元素的分值。redisTemplate.opsForZSet().add("zSetValue","A",1);
    
    
    2.rangeByScore(K key, double min, double max)
    依据设置的 score 获取区间值。zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2);
    
    
    3.rangeByScore(K key, double min, double max,long offset, long count)
    依据设置的 score 获取区间值从给定下标和给定长度获取最终值。zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3);
    
    
    4.rangeWithScores(K key, long start, long end)
    获取 RedisZSetCommands.Tuples 的区间值。Set<ZSetOperations.TypedTuple<Object>> typedTupleSet = redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3);
    Iterator<ZSetOperations.TypedTuple<Object>> iterator = typedTupleSet.iterator();
    while (iterator.hasNext()){ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();
        Object value = typedTuple.getValue();
        double score = typedTuple.getScore();}
      
    5. 删除成员
    redisTemplate.opsForZSet().remove("myZset","a","b");

以下代码能够间接应用 - 基于 Spring Boot 我的项目

5.3 延时队列工厂

代码中 正文 有具体介绍


/**
 * 延时队列工厂
 *
 **/
@Slf4j
public abstract class AbstractDelayQueueMachineFactory {


    @Autowired
    private RedisUtil redisUtil;

    @Autowired
    private ThreadPoolTaskExecutor asyncTaskExecutor;

    /**
     * 插入工作 id
     *
     * @param jobId 工作 id(队列内惟一)
     * @param time  延时工夫(单位 : 毫秒)
     * @return 是否插入胜利
     */
    public boolean addJob(String jobId, Integer time) {Calendar instance = Calendar.getInstance();
        // 减少延时工夫, 获取最终触发工夫
        instance.add(Calendar.MILLISECOND, time); 
        long delayMillisecond = instance.getTimeInMillis();
        log.info("延时队列增加问题{}",jobId);
        return redisUtil.zAdd(setDelayQueueName(), delayMillisecond, jobId);

    }

    /**
     * 删除工作 id
     *
     * @param jobId 工作 id(队列内惟一)
     */
    public boolean removeJob(String jobId) {Long num = redisUtil.zRemove(setDelayQueueName(), jobId);
        if (num > 0) return true;
        return false;

    }

    /**
     * 延时队列机器开始运作
     */
    private void startDelayQueueMachine() {log.info("延时队列 {} 开始启动", setDelayQueueName());

        // 监听 redis 队列
        while (true) {
            try {
                // 获取以后工夫前的工作列表
                Set<ZSetOperations.TypedTuple<Object>> tuples = redisUtil.zRangeByScore(setDelayQueueName(), 0, System.currentTimeMillis());

                // 如果工作不为空
                if (!CollectionUtils.isEmpty(tuples)) {log.info("延时工作开始执行:{}", JSONUtil.toJsonStr(tuples));
                    Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator();
                    while (iterator.hasNext()){ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();
                        String questionId = Convert.toStr(typedTuple.getValue());
                        // 移除缓存,如果移除胜利则示意以后线程解决了延时工作,则执行延时工作
                        // 删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行
                        Long num = redisUtil.zRemove(setDelayQueueName(), questionId);
                        // 如果移除胜利, 则执行
                        if (num > 0) {asyncTaskExecutor.execute(() -> invoke(questionId));
                        }
                    }
                }

            } catch (Exception e) {log.error("解决延时工作产生异样, 异样起因为{}", e.getMessage(), e);
            } finally {// 距离 () 分钟执行一次
                // 依据业务场景设置对应工夫
                try {TimeUnit.MINUTES.sleep(5);
                } catch (InterruptedException e) {e.printStackTrace();
                }

            }
        }

    }

    /**
     * 最终执行的工作办法
     *
     * @param jobId 工作 id
     */
    public abstract void invoke(String jobId);


    /**
     * 要实现延时队列的名字
     */
    public abstract String setDelayQueueName();


    //Spring Boot 初始化时开启一条线程运行
    @PostConstruct
    public void init() {new Thread(this::startDelayQueueMachine).start();}

}

addJob办法是增加工作 id 和延时工夫 (单位毫秒)
redisUtil.zRangeByScore :: 依据设置的 score 获取区间值
@PostConstruct 注解: 是针对 Bean 的初始化实现之后做一些事件,比方注册一些监听器..(初始化实现计划有很多可自行抉择)
为什么先删除后执行业务逻辑?
删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行

5.4 RedisUtil 工具类

@Component
@Slf4j
public class RedisUtil {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 向 Zset 里增加成员
     *
     * @param key   key 值
     * @param score 分数,通常用于排序
     * @param value 值
     * @return 减少状态
     */
    public boolean zAdd(String key, long score, String value) {Boolean result = redisTemplate.opsForZSet().add(key, value, score);
        return result;

    }


    /**
     * 获取 某 key 下 某一分值区间的队列
     *
     * @param key  缓存 key
     * @param from 开始工夫
     * @param to   完结工夫
     * @return 数据
     */
    public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);
        return set;
    }

    /**
     * 移除 Zset 队列值
     *
     * @param key   key 值
     * @param value 删除的汇合
     * @return 删除数量
     */
    public Long zRemove(String key, String... value) {return redisTemplate.opsForZSet().remove(key, value);
    }


}

5.5 测试延时队列

继承上文中的延时队列工厂重写 invoke(解决业务)setDelayQueueName-- 延时队列名称也就是 Zset 中的 key 值

/**
 * 测试延时队列
 *
 */
@Slf4j
@Component
public class DelayQueue extends AbstractDelayQueueMachineFactory {

    @Autowired
    private ZnjExpertConsultQuestionRecordMapper questionRecordMapper;

    /** 
     * 解决业务逻辑
     */ 
    @Override
    public void invoke(String jobId) {Integer questionId = Convert.toInt(jobId);
        ZnjExpertConsultQuestionRecordEntity questionRecordEntity = questionRecordMapper.selectById(questionId);
        Boolean flag = znjExpertConsultService.whetherEnd(questionRecordEntity);

   /** 
    * 延时队列名对立设定
    */ 
    @Override
    public String setDelayQueueName() {return "expert_consult:delay_queue";}
}

运行胜利, 当 Redis 中有工作时, 则执行工作即可

四、总结

应用 redis zset 来实现延时工作, 总体类说是可行的

  • 实时性: 容许存在肯定工夫内的误差(能够通过工夫设定)
  • 高可用性: 反对单机, 反对集群
  • 音讯可靠性: 保障至多被生产一次
  • 音讯长久化: 基于 Redis 本身的长久化个性, 下面的音讯可靠性基于 Redis 的长久化, 所以如果 redis 数据失落, 意味着提早音讯的失落, 不过能够做主备和集群保障

正文完
 0