1. 生成订单30分钟未领取,则主动勾销
2. 30分钟未回复,则完结会话
对上述的工作,咱们给一个业余的名字来形容,那就是
延时工作
一、延时工作是什么
延时工作
不同于个别的定时工作,延时工作是在某事件触发
后的将来某个时刻执行,没有反复的执行周期。
二、延时工作和定时工作的区别是什么
- 定时工作有明确的触发工夫,延时工作没有
- 定时工作有执行周期,而延时工作在某事件触发后一段时间内执行,没有执行周期
定时工作个别执行的是批处理多个工作,而延时工作个别是单任务解决
三、技术比照
本文次要解说
Redis的Zset
实现延时工作,其余计划只做介绍
1.数据库轮询
通过定时组件
的去扫描数据库,通过工夫来判断是否有超时的订单,而后进行update或delete等操作
长处
:简单易行毛病
:
- 对服务器内存耗费大
- 工夫距离小,数据库损耗极大
- 数据内存态,不牢靠
- 如果工作量过大,对数据库造成的压力很大 。频繁查询数据库带来性能影响
2.JDK的提早队列
利用JDK自带的DelayQueue
来实现,这是一个无界阻塞队列,该队列只有在提早期满的时候能力从中获取元素,放入DelayQueue
中,是必须实现Delayed接口
的。
长处
:实现简略,效率高,工作触发时间延迟低。毛病
:
- 服务器重启后,数据全副隐没,怕宕机
- 因为内存条件限度的起因,比方下单未付款的订单数太多,那么很容易就呈现OOM异样
- 数据内存态,不牢靠
3.工夫轮算法
工夫轮TimingWheel是一种高效、低提早的调度数据结构,底层采纳数组实现存储工作列表的环形队列,示意图如下:工夫轮
工夫轮算法能够类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样能够看出定时轮由个3个重要的属数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(工夫单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和事实中的始终的秒针走动齐全相似了。
如果以后指针指在1下面,我有一个工作须要4秒当前执行,那么这个执行的线程回调或者音讯将会被放在5上。那如果须要在20秒之后执行怎么办,因为这个环形构造槽数只到8,如果要20秒,指针须要多转2圈。地位是在2圈之后的5下面(20 % 8 + 1)
长处
:效率高,工作触发时间延迟工夫比delayQueue低毛病
:
- 服务器重启后,数据全副隐没,怕宕机
- 容易就呈现OOM异样
- 数据内存态,不牢靠
4.应用音讯队列
应用RabbitMQ死信队列依赖于RabbitMQ的两个个性:TTL和DLX。
TTL:Time To Live,音讯存活工夫,包含两个维度:队列音讯存活工夫和音讯自身的存活工夫。
DLX:Dead Letter Exchange,死信交换器。
长处
:异步交互能够削峰,高效,能够利用rabbitmq的分布式个性轻易的进行横向扩大,音讯反对长久化减少了可靠性。毛病
:
1.自身的易用度要依赖于rabbitMq的运维.因为要援用rabbitMq,所以复杂度和老本变高
2.RabbitMq是一个消息中间件;提早队列只是其中一个小性能,如果团队技术栈中原本就是应用RabbitMq那还好,如果不是,那为了应用提早队列而去部署一套RabbitMq老本有点大;
5.Redis的Zset实现延时工作
为什么采纳Redis的ZSet实现提早工作?
zset数据类型的去重有序(分数排序)特点进行提早。例如:工夫戳作为score进行排序
5.1 思路剖析
- 我的项目启动时启用
一条线程
,线程用于距离肯定工夫去查问redis的待执行工作。其工作jobId为业务id,值为要执行的工夫。 - 查问到执行的工作时,将其从redis的信息中进行删除。(
删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行
。) - 删除redis中的记录之后,执行工作。将执行jobId也就是业务id对应的工作。
理论场景中,还会波及延时工作批改,删除等,这些场景能够指定标记,批改标识即可,当然也能够在业务逻辑中做补充条件的判断。
5.2 Redis中Zset的简略介绍及应用
Redis 有序汇合是 string 类型元素的汇合,且不容许反复的成员。每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为汇合中的成员进行从小到大的排序。有序汇合的成员是惟一的,但分数(score)却能够反复。
常用命令
- ZADD命令 : 将一个或多个成员元素及其分数值退出到有序集当中,或者更新已存在成员的分数
- ZCARD命令 : 获取有序汇合的成员数
- ZRANGEBYSCORE: 通过分数返回有序汇合指定区间内的成员
- ZREM : 移除有序汇合中的一个或多个成员
java中操作简略介绍
1.add(K key, V value, double score) 增加元素到变量中同时指定元素的分值。 redisTemplate.opsForZSet().add("zSetValue","A",1); 2.rangeByScore(K key, double min, double max) 依据设置的score获取区间值。 zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2); 3.rangeByScore(K key, double min, double max,long offset, long count) 依据设置的score获取区间值从给定下标和给定长度获取最终值。 zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3); 4.rangeWithScores(K key, long start, long end) 获取RedisZSetCommands.Tuples的区间值。 Set<ZSetOperations.TypedTuple<Object>> typedTupleSet = redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3); Iterator<ZSetOperations.TypedTuple<Object>> iterator = typedTupleSet.iterator(); while (iterator.hasNext()){ ZSetOperations.TypedTuple<Object> typedTuple = iterator.next(); Object value = typedTuple.getValue(); double score = typedTuple.getScore(); } 5.删除成员 redisTemplate.opsForZSet().remove("myZset","a","b");
以下代码能够间接应用-基于Spring Boot
我的项目
5.3 延时队列工厂
代码中正文
有具体介绍
/** * 延时队列工厂 * **/@Slf4jpublic abstract class AbstractDelayQueueMachineFactory { @Autowired private RedisUtil redisUtil; @Autowired private ThreadPoolTaskExecutor asyncTaskExecutor; /** * 插入工作id * * @param jobId 工作id(队列内惟一) * @param time 延时工夫(单位 :毫秒) * @return 是否插入胜利 */ public boolean addJob(String jobId, Integer time) { Calendar instance = Calendar.getInstance(); //减少延时工夫,获取最终触发工夫 instance.add(Calendar.MILLISECOND, time); long delayMillisecond = instance.getTimeInMillis(); log.info("延时队列增加问题{}",jobId); return redisUtil.zAdd(setDelayQueueName(), delayMillisecond, jobId); } /** * 删除工作id * * @param jobId 工作id(队列内惟一) */ public boolean removeJob(String jobId) { Long num = redisUtil.zRemove(setDelayQueueName(), jobId); if (num > 0) return true; return false; } /** * 延时队列机器开始运作 */ private void startDelayQueueMachine() { log.info("延时队列{}开始启动", setDelayQueueName()); // 监听redis队列 while (true) { try { // 获取以后工夫前的工作列表 Set<ZSetOperations.TypedTuple<Object>> tuples = redisUtil.zRangeByScore(setDelayQueueName(), 0, System.currentTimeMillis() ); // 如果工作不为空 if (!CollectionUtils.isEmpty(tuples)) { log.info("延时工作开始执行:{}", JSONUtil.toJsonStr(tuples)); Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator(); while (iterator.hasNext()){ ZSetOperations.TypedTuple<Object> typedTuple = iterator.next(); String questionId = Convert.toStr(typedTuple.getValue()); // 移除缓存,如果移除胜利则示意以后线程解决了延时工作,则执行延时工作 // 删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行 Long num = redisUtil.zRemove(setDelayQueueName(), questionId); // 如果移除胜利, 则执行 if (num > 0) { asyncTaskExecutor.execute(() -> invoke(questionId)); } } } } catch (Exception e) { log.error("解决延时工作产生异样,异样起因为{}", e.getMessage(), e); } finally { // 距离()分钟执行一次 //依据业务场景设置对应工夫 try { TimeUnit.MINUTES.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 最终执行的工作办法 * * @param jobId 工作id */ public abstract void invoke(String jobId); /** * 要实现延时队列的名字 */ public abstract String setDelayQueueName(); //Spring Boot初始化时开启一条线程运行 @PostConstruct public void init() { new Thread(this::startDelayQueueMachine).start(); }}
addJob
办法是增加工作id和延时工夫(单位毫秒)redisUtil.zRangeByScore :
:依据设置的score获取区间值@PostConstruct
注解:是针对Bean的初始化实现之后做一些事件,比方注册一些监听器..(初始化实现计划有很多可自行抉择)为什么先删除后执行业务逻辑?
删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行
5.4 RedisUtil工具类
@Component@Slf4jpublic class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 向Zset里增加成员 * * @param key key值 * @param score 分数,通常用于排序 * @param value 值 * @return 减少状态 */ public boolean zAdd(String key, long score, String value) { Boolean result = redisTemplate.opsForZSet().add(key, value, score); return result; } /** * 获取 某key 下 某一分值区间的队列 * * @param key 缓存key * @param from 开始工夫 * @param to 完结工夫 * @return 数据 */ public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) { Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to); return set; } /** * 移除 Zset队列值 * * @param key key值 * @param value 删除的汇合 * @return 删除数量 */ public Long zRemove(String key, String... value) { return redisTemplate.opsForZSet().remove(key, value); }}
5.5 测试延时队列
继承上文中的延时队列工厂重写invoke(解决业务)
和setDelayQueueName--延时队列名称也就是Zset中的key值
/** * 测试延时队列 * */@Slf4j@Componentpublic class DelayQueue extends AbstractDelayQueueMachineFactory { @Autowired private ZnjExpertConsultQuestionRecordMapper questionRecordMapper; /** * 解决业务逻辑 */ @Override public void invoke(String jobId) { Integer questionId = Convert.toInt(jobId); ZnjExpertConsultQuestionRecordEntity questionRecordEntity = questionRecordMapper.selectById(questionId); Boolean flag = znjExpertConsultService.whetherEnd(questionRecordEntity); /** * 延时队列名对立设定 */ @Override public String setDelayQueueName() { return "expert_consult:delay_queue"; }}
运行胜利,当Redis中有工作时,则执行工作即可
四、总结
应用redis zset来实现延时工作,总体类说是可行的
- 实时性: 容许存在肯定工夫内的误差(能够通过工夫设定)
- 高可用性:反对单机,反对集群
- 音讯可靠性: 保障至多被生产一次
- 音讯长久化: 基于Redis本身的长久化个性,下面的音讯可靠性基于Redis的长久化,所以如果redis数据失落,意味着提早音讯的失落,不过能够做主备和集群保障