共计 6427 个字符,预计需要花费 17 分钟才能阅读完成。
1. 生成订单 30 分钟未领取,则主动勾销
2. 30 分钟未回复,则完结会话
对上述的工作,咱们给一个业余的名字来形容,那就是
延时工作
一、延时工作是什么
延时工作
不同于个别的定时工作,延时工作是在某事件触发
后的将来某个时刻执行,没有反复的执行周期。
二、延时工作和定时工作的区别是什么
- 定时工作有明确的触发工夫,延时工作没有
- 定时工作有执行周期,而延时工作在某事件触发后一段时间内执行,没有执行周期
-
定时工作个别执行的是批处理多个工作,而延时工作个别是单任务解决
三、技术比照
本文次要解说
Redis 的 Zset
实现延时工作, 其余计划只做介绍
1. 数据库轮询
通过 定时组件
的去扫描数据库, 通过工夫来判断是否有超时的订单,而后进行 update 或 delete 等操作
长处
: 简单易行毛病
:
- 对服务器内存耗费大
- 工夫距离小, 数据库损耗极大
- 数据内存态,不牢靠
- 如果工作量过大,对数据库造成的压力很大。频繁查询数据库带来性能影响
2.JDK 的提早队列
利用 JDK 自带的 DelayQueue
来实现,这是一个无界阻塞队列,该队列只有在提早期满的时候能力从中获取元素,放入 DelayQueue
中,是必须实现 Delayed 接口
的。
长处
: 实现简略, 效率高, 工作触发时间延迟低。毛病
:
- 服务器重启后,数据全副隐没,怕宕机
- 因为内存条件限度的起因,比方下单未付款的订单数太多,那么很容易就呈现 OOM 异样
- 数据内存态,不牢靠
3. 工夫轮算法
工夫轮 TimingWheel 是一种高效、低提早的调度数据结构,底层采纳数组实现存储工作列表的环形队列,示意图如下:工夫轮
工夫轮算法能够类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样能够看出定时轮由个 3 个重要的属数,ticksPerWheel(一轮的 tick 数),tickDuration(一个 tick 的持续时间)以及 timeUnit(工夫单位),例如当 ticksPerWheel=60,tickDuration=1,timeUnit= 秒,这就和事实中的始终的秒针走动齐全相似了。
如果以后指针指在 1 下面,我有一个工作须要 4 秒当前执行,那么这个执行的线程回调或者音讯将会被放在 5 上。那如果须要在 20 秒之后执行怎么办,因为这个环形构造槽数只到 8,如果要 20 秒,指针须要多转 2 圈。地位是在 2 圈之后的 5 下面(20 % 8 + 1)
长处
: 效率高, 工作触发时间延迟工夫比 delayQueue 低毛病
:
- 服务器重启后,数据全副隐没,怕宕机
- 容易就呈现 OOM 异样
- 数据内存态,不牢靠
4. 应用音讯队列
应用 RabbitMQ 死信队列依赖于 RabbitMQ 的两个个性:TTL 和 DLX。
TTL:Time To Live,音讯存活工夫,包含两个维度:队列音讯存活工夫和音讯自身的存活工夫。
DLX:Dead Letter Exchange,死信交换器。
长处
: 异步交互能够削峰, 高效, 能够利用 rabbitmq 的分布式个性轻易的进行横向扩大, 音讯反对长久化减少了可靠性。毛病
:
1. 自身的易用度要依赖于 rabbitMq 的运维. 因为要援用 rabbitMq, 所以复杂度和老本变高
2.RabbitMq 是一个消息中间件; 提早队列只是其中一个小性能, 如果团队技术栈中原本就是应用 RabbitMq 那还好, 如果不是, 那为了应用提早队列而去部署一套 RabbitMq 老本有点大;
5.Redis 的 Zset 实现延时工作
为什么采纳 Redis 的 ZSet 实现提早工作?
zset 数据类型的去重有序(分数排序)特点进行提早。例如:工夫戳作为 score 进行排序
5.1 思路剖析
- 我的项目启动时启用
一条线程
,线程用于距离肯定工夫去查问 redis 的待执行工作。其工作 jobId 为业务 id,值为要执行的工夫。 - 查问到执行的工作时,将其从 redis 的信息中进行删除。(
删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行
。) - 删除 redis 中的记录之后,执行工作。将执行 jobId 也就是业务 id 对应的工作。
理论场景中,还会波及延时工作批改,删除等,这些场景能够指定标记, 批改标识即可,当然也能够在业务逻辑中做补充条件的判断。
5.2 Redis 中 Zset 的简略介绍及应用
Redis 有序汇合是 string 类型元素的汇合, 且不容许反复的成员。每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为汇合中的成员进行从小到大的排序。有序汇合的成员是惟一的, 但分数 (score) 却能够反复。
常用命令
- ZADD 命令 : 将一个或多个成员元素及其分数值退出到有序集当中,或者更新已存在成员的分数
- ZCARD 命令 : 获取有序汇合的成员数
- ZRANGEBYSCORE: 通过分数返回有序汇合指定区间内的成员
- ZREM : 移除有序汇合中的一个或多个成员
java 中操作简略介绍
1.add(K key, V value, double score)
增加元素到变量中同时指定元素的分值。redisTemplate.opsForZSet().add("zSetValue","A",1);
2.rangeByScore(K key, double min, double max)
依据设置的 score 获取区间值。zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2);
3.rangeByScore(K key, double min, double max,long offset, long count)
依据设置的 score 获取区间值从给定下标和给定长度获取最终值。zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3);
4.rangeWithScores(K key, long start, long end)
获取 RedisZSetCommands.Tuples 的区间值。Set<ZSetOperations.TypedTuple<Object>> typedTupleSet = redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3);
Iterator<ZSetOperations.TypedTuple<Object>> iterator = typedTupleSet.iterator();
while (iterator.hasNext()){ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();
Object value = typedTuple.getValue();
double score = typedTuple.getScore();}
5. 删除成员
redisTemplate.opsForZSet().remove("myZset","a","b");
以下代码能够间接应用 - 基于 Spring Boot
我的项目
5.3 延时队列工厂
代码中
正文
有具体介绍
/**
* 延时队列工厂
*
**/
@Slf4j
public abstract class AbstractDelayQueueMachineFactory {
@Autowired
private RedisUtil redisUtil;
@Autowired
private ThreadPoolTaskExecutor asyncTaskExecutor;
/**
* 插入工作 id
*
* @param jobId 工作 id(队列内惟一)
* @param time 延时工夫(单位 : 毫秒)
* @return 是否插入胜利
*/
public boolean addJob(String jobId, Integer time) {Calendar instance = Calendar.getInstance();
// 减少延时工夫, 获取最终触发工夫
instance.add(Calendar.MILLISECOND, time);
long delayMillisecond = instance.getTimeInMillis();
log.info("延时队列增加问题{}",jobId);
return redisUtil.zAdd(setDelayQueueName(), delayMillisecond, jobId);
}
/**
* 删除工作 id
*
* @param jobId 工作 id(队列内惟一)
*/
public boolean removeJob(String jobId) {Long num = redisUtil.zRemove(setDelayQueueName(), jobId);
if (num > 0) return true;
return false;
}
/**
* 延时队列机器开始运作
*/
private void startDelayQueueMachine() {log.info("延时队列 {} 开始启动", setDelayQueueName());
// 监听 redis 队列
while (true) {
try {
// 获取以后工夫前的工作列表
Set<ZSetOperations.TypedTuple<Object>> tuples = redisUtil.zRangeByScore(setDelayQueueName(), 0, System.currentTimeMillis());
// 如果工作不为空
if (!CollectionUtils.isEmpty(tuples)) {log.info("延时工作开始执行:{}", JSONUtil.toJsonStr(tuples));
Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator();
while (iterator.hasNext()){ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();
String questionId = Convert.toStr(typedTuple.getValue());
// 移除缓存,如果移除胜利则示意以后线程解决了延时工作,则执行延时工作
// 删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行
Long num = redisUtil.zRemove(setDelayQueueName(), questionId);
// 如果移除胜利, 则执行
if (num > 0) {asyncTaskExecutor.execute(() -> invoke(questionId));
}
}
}
} catch (Exception e) {log.error("解决延时工作产生异样, 异样起因为{}", e.getMessage(), e);
} finally {// 距离 () 分钟执行一次
// 依据业务场景设置对应工夫
try {TimeUnit.MINUTES.sleep(5);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
/**
* 最终执行的工作办法
*
* @param jobId 工作 id
*/
public abstract void invoke(String jobId);
/**
* 要实现延时队列的名字
*/
public abstract String setDelayQueueName();
//Spring Boot 初始化时开启一条线程运行
@PostConstruct
public void init() {new Thread(this::startDelayQueueMachine).start();}
}
addJob
办法是增加工作 id 和延时工夫 (单位毫秒)redisUtil.zRangeByScore :
: 依据设置的 score 获取区间值@PostConstruct
注解: 是针对 Bean 的初始化实现之后做一些事件,比方注册一些监听器..(初始化实现计划有很多可自行抉择)为什么先删除后执行业务逻辑?
删除胜利才执行延时工作,否则不执行,这样能够防止分布式系统延时工作屡次执行
5.4 RedisUtil 工具类
@Component
@Slf4j
public class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 向 Zset 里增加成员
*
* @param key key 值
* @param score 分数,通常用于排序
* @param value 值
* @return 减少状态
*/
public boolean zAdd(String key, long score, String value) {Boolean result = redisTemplate.opsForZSet().add(key, value, score);
return result;
}
/**
* 获取 某 key 下 某一分值区间的队列
*
* @param key 缓存 key
* @param from 开始工夫
* @param to 完结工夫
* @return 数据
*/
public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);
return set;
}
/**
* 移除 Zset 队列值
*
* @param key key 值
* @param value 删除的汇合
* @return 删除数量
*/
public Long zRemove(String key, String... value) {return redisTemplate.opsForZSet().remove(key, value);
}
}
5.5 测试延时队列
继承上文中的延时队列工厂重写
invoke(解决业务)
和setDelayQueueName-- 延时队列名称也就是 Zset 中的 key 值
/**
* 测试延时队列
*
*/
@Slf4j
@Component
public class DelayQueue extends AbstractDelayQueueMachineFactory {
@Autowired
private ZnjExpertConsultQuestionRecordMapper questionRecordMapper;
/**
* 解决业务逻辑
*/
@Override
public void invoke(String jobId) {Integer questionId = Convert.toInt(jobId);
ZnjExpertConsultQuestionRecordEntity questionRecordEntity = questionRecordMapper.selectById(questionId);
Boolean flag = znjExpertConsultService.whetherEnd(questionRecordEntity);
/**
* 延时队列名对立设定
*/
@Override
public String setDelayQueueName() {return "expert_consult:delay_queue";}
}
运行胜利, 当 Redis 中有工作时, 则执行工作即可
四、总结
应用 redis zset 来实现延时工作, 总体类说是可行的
- 实时性: 容许存在肯定工夫内的误差(能够通过工夫设定)
- 高可用性: 反对单机, 反对集群
- 音讯可靠性: 保障至多被生产一次
- 音讯长久化: 基于 Redis 本身的长久化个性, 下面的音讯可靠性基于 Redis 的长久化, 所以如果 redis 数据失落, 意味着提早音讯的失落, 不过能够做主备和集群保障