共计 4402 个字符,预计需要花费 12 分钟才能阅读完成。
前言
接入微信支付的时候,看到微信支付的回调是按照某种频率去回调的,
像 15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h
这样,其中有一次成功就不会再回调。
于是在想怎么用 Java
做这个事情。
有定时任务这类功能的框架像 Spring
和Quartz
貌似都没有直接提供以上的功能。
也是出于想练手的目的,决定自己写一写。
最终的实现效果
// 具体的业务 | |
BaseJob task = new BaseJob() { | |
// 任务执行的次数(模拟真实业务上的退出)int runTime = 1; | |
@Override | |
public void run() { | |
// 业务逻辑 | |
System.out.println("hello world"); | |
// 这里模拟了微信回调成功,任务完成 | |
if (runTime++ > 3) {this.setExit(true); | |
} | |
} | |
}; |
/** | |
* 测试按照指定时间隔执行某个任务 | |
* @throws IOException | |
*/ | |
@Test | |
public void test1() throws IOException { | |
// 新建一个产生指定时间的延迟时间生成器,内部就是个队列 | |
DesignatDTGenerator designatDTGenerator = new DesignatDTGenerator(); | |
// 设置时间间隔 | |
designatDTGenerator.addDelayTime(1_000) // 1 秒后执行 | |
.addDelayTime(4_000) // 距离上次执行 4 秒后执行 | |
.addDelayTime(15_000) // 距离上次执行 15 秒后执行 | |
.addDelayTime(180_000) // 距离上次执行 3 分钟后执行 | |
.addDelayTime(180_000) // 距离上次执行 3 分钟后执行 | |
.addDelayTime(360_000) // 距离上次执行 6 分钟后执行 | |
.addDelayTime(3_600_000); // 距离上次执行 1 小时后执行 | |
// 构造一个提交的任务,传入具体的业务对象 task,传入延迟时间生成器 designatDTGenerator | |
DelayTimeJob delayTimeJob = new DelayTimeJob(task, designatDTGenerator); | |
// 新建一个执行器,执行器可以重复使用,每次提交新的任务即可 | |
JobActuator actuator = new JobActuator(); | |
// 提交任务,开始执行任务 | |
actuator.addJob(delayTimeJob); | |
// 阻塞主线程,方便查看运行结果 | |
System.in.read();} |
/** | |
* 测试按照固定时间间隔执行某个任务 | |
* 只是延迟时间生成器不同而已,可以达到不同的调用效果 | |
* @throws IOException | |
*/ | |
@Test | |
public void test2() throws IOException { | |
// 新建一个执行器 | |
JobActuator actuator = new JobActuator(); | |
// 新建一个产生固定时间的延迟时间生成器,每 3s 执行一次 | |
FixedRateDTGenerator fixedRateDTGenerator = new FixedRateDTGenerator(3000); | |
// 新建一个任务 | |
DelayTimeJob delayTimeJob = new DelayTimeJob(task, fixedRateDTGenerator); | |
// 提交任务,开始执行任务 | |
actuator.addJob(delayTimeJob); | |
// 阻塞主线程,方便查看运行结果 | |
System.in.read();} |
类图
各个类的作用
项目地址
JobActuator
任务执行器,本身继承了Thread
,职责是在run
方法中不断从延迟任务队列DelayQueue
中获取延迟到期的任务,
再交由线程池ExecutorService
执行。延迟效果的都是依靠DelayQueue
实现。
public class JobActuator extends Thread { | |
/** 线程池 */ | |
ExecutorService es = Executors.newFixedThreadPool(2); | |
/** 任务队列 */ | |
DelayQueue<DelayTimeJob> jobs = new DelayQueue<>(); | |
/** 构造方法,实例化时启动线程 */ | |
public JobActuator() {this.start(); | |
} | |
public void addJob(DelayTimeJob job) { | |
// 设置任务队列,用于任务重新入队 | |
job.setJobs(jobs); | |
// 任务入队 | |
jobs.offer(job); | |
} | |
@Override | |
public void run() {while (true) { | |
try { | |
// 从延迟队列中获取任务 | |
DelayTimeJob job = jobs.take(); | |
// 利用线程池执行任务 | |
es.submit(job); | |
} catch (InterruptedException e) {e.printStackTrace(); | |
} | |
} | |
} | |
} |
DelayTimeJob
实现了Delayed
接口,执行实际的业务并决定任务是否重新进入延迟队列。
public class DelayTimeJob implements Runnable, Delayed { | |
/** 执行器的任务队列,用于任务重新入队 */ | |
@Setter | |
private DelayQueue<DelayTimeJob> jobs; | |
/** 延迟时间生成器 */ | |
IDelayTimeGenerator delayTimeGenerator; | |
/** 具体要执行的任务 */ | |
private BaseJob realJob; | |
private long time = 0L; | |
public DelayTimeJob(BaseJob baseJob, IDelayTimeGenerator delayTimeGenerator) { | |
this.realJob = baseJob; | |
this.delayTimeGenerator = delayTimeGenerator; | |
Integer delayTime = delayTimeGenerator.getDelayTime(); | |
if (delayTime == null) {return ;} | |
this.time = delayTime + System.currentTimeMillis();} | |
@Override | |
public void run() { | |
// 执行业务 | |
realJob.run(); | |
// 任务不再需要执行,主动退出 | |
if (realJob.isExit) {return ;} | |
// 获取延迟 | |
Integer delayTime = delayTimeGenerator.getDelayTime(); | |
// 无延迟时间,则任务不再执行 | |
if (delayTime == null) {return ;} | |
// 重新入队 | |
time += delayTime; | |
jobs.offer(this); | |
return ; | |
} | |
@Override | |
public long getDelay(TimeUnit unit) {return unit.convert(this.time - System.currentTimeMillis(), TimeUnit.MILLISECONDS); | |
} | |
@Override | |
public int compareTo(Delayed o) {DelayTimeJob other = (DelayTimeJob) o; | |
long diff = time - other.time; | |
if (diff > 0) {return 1;} | |
if (diff < 0) {return -1;} | |
return 0; | |
} | |
} |
BaseJob
用户继承此抽象类,在run
方法中编写业务代码,通过控制isExit
变量控制任务是否执行。
public abstract class BaseJob implements Runnable { | |
/** 用于控制任务是否退出 */ | |
@Setter | |
boolean isExit = false; | |
} |
IDelayTimeGenerator
延迟时间生成器接口,返回一个延迟时间。可以实现不同的策略,达到不同的延迟效果。
如DesignatDTGenerator
是定义每一次执行的时间间隔,FixedRateDTGenerator
是按照某一个固定频率执行。
public interface IDelayTimeGenerator { | |
/** 返回延迟的时间,单位:毫秒 */ | |
Integer getDelayTime();} |
/** | |
* 指定时间的时间生成器 | |
* @author cck | |
*/ | |
public class DesignatDTGenerator implements IDelayTimeGenerator {private final Deque<Integer> delayTimeQueue = new ArrayDeque<>(); | |
/** | |
* 添加延迟时间 | |
* @param delayTime | |
*/ | |
public DesignatDTGenerator addDelayTime(Integer delayTime) {delayTimeQueue.offer(delayTime); | |
return this; | |
} | |
@Override | |
public Integer getDelayTime() {return delayTimeQueue.poll(); | |
} | |
} |
/** | |
* 固定间隔的时间生成器 | |
* @author cck | |
*/ | |
public class FixedRateDTGenerator implements IDelayTimeGenerator { | |
private Integer delayTime; | |
public FixedRateDTGenerator(Integer delayTime) {this.delayTime = delayTime;} | |
@Override | |
public Integer getDelayTime() {return delayTime;} | |
} |
关键类 DelayQueue
和Delayed
DelayQueue
是 Java
提供的延迟队列,该队列只允许实现了 Delayed
接口的对象入队。
调用队列的 take
方法时,队列会阻塞,直到有延迟到期的元素才会返回。
总结
这个方式是可以实现一开始想要的按照 15s/15s/30s/3m/10m/..
指定的间隔执行任务的效果的。
定制延迟的效果只需要给出不同的 IDelayTimeGenerator
接口实现即可。
在和 spring
一起使用时,任务执行器 JobActuator
应该是单例的,
不过提交任务的整个操作相比于 spring
的一个注解,还是显得麻烦囧,使用时再封装一层会更好。
现在的实现方式是和 Java
的延迟队列绑定了的,但是延迟队列有多种实现方式,
例如 redis
,rabbitMQ
等,如果能够做出更高级的抽象,合入不同的延迟队列那会更好。
此外这种实现方式性能方面也有待验证。