起源:blog.csdn.net/qq330983778/article/details/99341671
业务流程
首先咱们剖析下这个流程
- 用户提交工作。首先将工作推送至提早队列中。
- 提早队列接管到工作后,首先将工作推送至job pool中,而后计算其执行工夫。
- 而后生成提早工作(仅仅蕴含工作id)放入某个桶中
- 工夫组件时刻轮询各个桶,当工夫达到的时候从job pool中取得工作元信息。
- 监测工作的合法性如果曾经删除则pass。持续轮询。如果工作非法则再次计算工夫
- 如果非法则计算工夫,如果工夫非法:依据topic将工作放入对应的ready queue,而后从bucket中移除。如果工夫不非法,则从新计算工夫再次放入bucket,并移除之前的bucket中的内容
- 生产端轮询对应topic的ready queue。获取job后做本人的业务逻辑。与此同时,服务端将曾经被生产端获取的job依照其设定的TTR,从新计算执行工夫,并将其放入bucket。
- 实现生产后,发送finish音讯,服务端依据job id删除对应信息。
对象
咱们当初能够理解到两头存在的几个组件
- 提早队列,为Redis提早队列。实现消息传递
- Job pool 工作池保留job元信息。依据文章形容应用K/V的数据结构,key为ID,value为job
- Delay Bucket 用来保留业务的提早工作。文章中形容应用轮询形式放入某一个Bucket能够晓得其并没有应用topic来辨别,集体这里默认应用程序插入
- Timer 工夫组件,负责扫描各个Bucket。依据文章形容存在多个Timer,然而同一个Timer同一时间只能扫描一个Bucket
- Ready Queue 负责寄存须要被实现的工作,然而依据形容依据Topic的不同存在多个Ready Queue。
其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的汇合。
工作状态
- ready:可执行状态,
- delay:不可执行状态,期待时钟周期。
- reserved:已被消费者读取,但没有实现生产。
- deleted:已被生产实现或者已被删除。
对外提供的接口
额定的内容
- 首先依据状态状态形容,finish和delete操作都是将工作设置成deleted状态。
- 依据文章形容的操作,在执行finish或者delete的操作的时候工作曾经从元数据中移除,此时deleted状态可能只存在极短时间,所以理论实现中就间接删除了。
- 文章中并没有阐明响应超时后如何解决,所以集体当初将其从新投入了待处理队列。
- 文章中因为应用了集群,所以应用redis的setnx锁来保障多个工夫循环解决多个桶的时候不会呈现反复循环。这里因为是简略的实现,所以就很简略的每个桶设置一个工夫队列解决。也是为了不便简略解决。对于分布式锁能够看我之前的文章外面有形容。
实现
当初咱们依据设计内容实现设计。这一块设计咱们分四步实现
工作及相干对象
目前须要两个对象,一个是工作对象(job)一个负责保留工作援用的对象(delay job),Spring Boot 根底就不介绍了,举荐下这个实战教程:
https://github.com/javastacks...
工作对象
@Data@AllArgsConstructor@NoArgsConstructorpublic class Job implements Serializable { /** * 提早工作的惟一标识,用于检索工作 */ @JsonSerialize(using = ToStringSerializer.class) private Long id; /** * 工作类型(具体业务类型) */ private String topic; /** * 工作的延迟时间 */ private long delayTime; /** * 工作的执行超时工夫 */ private long ttrTime; /** * 工作具体的音讯内容,用于解决具体业务逻辑用 */ private String message; /** * 重试次数 */ private int retryCount; /** * 工作状态 */ private JobStatus status;}
工作援用对象
@Data@AllArgsConstructorpublic class DelayJob implements Serializable { /** * 提早工作的惟一标识 */ private long jodId; /** * 工作的执行工夫 */ private long delayDate; /** * 工作类型(具体业务类型) */ private String topic; public DelayJob(Job job) { this.jodId = job.getId(); this.delayDate = System.currentTimeMillis() + job.getDelayTime(); this.topic = job.getTopic(); } public DelayJob(Object value, Double score) { this.jodId = Long.parseLong(String.valueOf(value)); this.delayDate = System.currentTimeMillis() + score.longValue(); }}
容器
目前咱们须要实现三个容器的创立,Job工作池、提早工作容器、待实现工作容器
job工作池,为一般的K/V构造,提供根底的操作
@Component@Slf4jpublic class JobPool { @Autowired private RedisTemplate redisTemplate; private String NAME = "job.pool"; private BoundHashOperations getPool () { BoundHashOperations ops = redisTemplate.boundHashOps(NAME); return ops; } /** * 增加工作 * @param job */ public void addJob (Job job) { log.info("工作池增加工作:{}", JSON.toJSONString(job)); getPool().put(job.getId(),job); return ; } /** * 取得工作 * @param jobId * @return */ public Job getJob(Long jobId) { Object o = getPool().get(jobId); if (o instanceof Job) { return (Job) o; } return null; } /** * 移除工作 * @param jobId */ public void removeDelayJob (Long jobId) { log.info("工作池移除工作:{}",jobId); // 移除工作 getPool().delete(jobId); }}
提早工作,应用可排序的ZSet保留数据,提供取出最小值等操作
@Slf4j@Componentpublic class DelayBucket { @Autowired private RedisTemplate redisTemplate; private static AtomicInteger index = new AtomicInteger(0); @Value("${thread.size}") private int bucketsSize; private List <String> bucketNames = new ArrayList <>(); @Bean public List <String> createBuckets() { for (int i = 0; i < bucketsSize; i++) { bucketNames.add("bucket" + i); } return bucketNames; } /** * 取得桶的名称 * @return */ private String getThisBucketName() { int thisIndex = index.addAndGet(1); int i1 = thisIndex % bucketsSize; return bucketNames.get(i1); } /** * 取得桶汇合 * @param bucketName * @return */ private BoundZSetOperations getBucket(String bucketName) { return redisTemplate.boundZSetOps(bucketName); } /** * 放入延时工作 * @param job */ public void addDelayJob(DelayJob job) { log.info("增加提早工作:{}", JSON.toJSONString(job)); String thisBucketName = getThisBucketName(); BoundZSetOperations bucket = getBucket(thisBucketName); bucket.add(job,job.getDelayDate()); } /** * 取得最新的延期工作 * @return */ public DelayJob getFirstDelayTime(Integer index) { String name = bucketNames.get(index); BoundZSetOperations bucket = getBucket(name); Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1); if (CollectionUtils.isEmpty(set)) { return null; } ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0]; Object value = typedTuple.getValue(); if (value instanceof DelayJob) { return (DelayJob) value; } return null; } /** * 移除延时工作 * @param index * @param delayJob */ public void removeDelayTime(Integer index,DelayJob delayJob) { String name = bucketNames.get(index); BoundZSetOperations bucket = getBucket(name); bucket.remove(delayJob); }}
待实现工作,外部应用topic进行细分,每个topic对应一个list汇合
@Component@Slf4jpublic class ReadyQueue { @Autowired private RedisTemplate redisTemplate; private String NAME = "process.queue"; private String getKey(String topic) { return NAME + topic; } /** * 取得队列 * @param topic * @return */ private BoundListOperations getQueue (String topic) { BoundListOperations ops = redisTemplate.boundListOps(getKey(topic)); return ops; } /** * 设置工作 * @param delayJob */ public void pushJob(DelayJob delayJob) { log.info("执行队列增加工作:{}",delayJob); BoundListOperations listOperations = getQueue(delayJob.getTopic()); listOperations.leftPush(delayJob); } /** * 移除并取得工作 * @param topic * @return */ public DelayJob popJob(String topic) { BoundListOperations listOperations = getQueue(topic); Object o = listOperations.leftPop(); if (o instanceof DelayJob) { log.info("执行队列取出工作:{}", JSON.toJSONString((DelayJob) o)); return (DelayJob) o; } return null; } }
轮询解决
设置了线程池为每个bucket设置一个轮询操作
@Componentpublic class DelayTimer implements ApplicationListener <ContextRefreshedEvent> { @Autowired private DelayBucket delayBucket; @Autowired private JobPool jobPool; @Autowired private ReadyQueue readyQueue; @Value("${thread.size}") private int length; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { ExecutorService executorService = new ThreadPoolExecutor( length, length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); for (int i = 0; i < length; i++) { executorService.execute( new DelayJobHandler( delayBucket, jobPool, readyQueue, i)); } }}
测试申请
/** * 测试用申请 * @author daify **/@RestController@RequestMapping("delay")public class DelayController { @Autowired private JobService jobService; /** * 增加 * @param request * @return */ @RequestMapping(value = "add",method = RequestMethod.POST) public String addDefJob(Job request) { DelayJob delayJob = jobService.addDefJob(request); return JSON.toJSONString(delayJob); } /** * 获取 * @return */ @RequestMapping(value = "pop",method = RequestMethod.GET) public String getProcessJob(String topic) { Job process = jobService.getProcessJob(topic); return JSON.toJSONString(process); } /** * 实现一个执行的工作 * @param jobId * @return */ @RequestMapping(value = "finish",method = RequestMethod.DELETE) public String finishJob(Long jobId) { jobService.finishJob(jobId); return "success"; } @RequestMapping(value = "delete",method = RequestMethod.DELETE) public String deleteJob(Long jobId) { jobService.deleteJob(jobId); return "success"; } }
测试
增加提早工作
通过postman申请:localhost:8000/delay/add
此时这条延时工作被增加进了线程池中
2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 工作池增加工作:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}2019-08-12 21:21:36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 增加提早工作:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
依据设置10秒钟之后工作会被增加至ReadyQueue中
2019-08-12 21:21:46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列增加工作:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
取得工作
这时候咱们申请localhost:8000/delay/pop
这个时候工作被响应,批改状态的同时设置其超时工夫,而后搁置在DelayBucket中
2019-08-09 19:36:02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出工作:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 工作池增加工作:{"delayTime":10000,"id":1,"message":"提早10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}2019-08-09 19:36:02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 增加提早工作:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
依照设计在30秒后,工作如果没有被生产将会从新搁置在ReadyQueue中
2019-08-12 21:21:48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出工作:{"delayDate":1565616106609,"jodId":3,"topic":"test"}2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 工作池增加工作:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
工作的删除/生产
当初咱们申请:localhost:8000/delay/delete
此时在Job pool中此工作将会被移除,此时元数据曾经不存在,但工作还在DelayBucket中循环,然而在循环中当检测到元数据曾经不存的话此延时工作会被移除。
2019-08-12 21:21:54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 工作池移除工作:32019-08-12 21:21:59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在工作:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
近期热文举荐:
1.1,000+ 道 Java面试题及答案整顿(2021最新版)
2.别在再满屏的 if/ else 了,试试策略模式,真香!!
3.卧槽!Java 中的 xx ≠ null 是什么新语法?
4.Spring Boot 2.5 重磅公布,光明模式太炸了!
5.《Java开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞+转发哦!