Java简易定时任务实现

37次阅读

共计 4402 个字符,预计需要花费 12 分钟才能阅读完成。

前言

接入微信支付的时候,看到微信支付的回调是按照某种频率去回调的,
15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h 这样,其中有一次成功就不会再回调。
于是在想怎么用 Java 做这个事情。
有定时任务这类功能的框架像 SpringQuartz貌似都没有直接提供以上的功能。
也是出于想练手的目的,决定自己写一写。

最终的实现效果

// 具体的业务
BaseJob task = new BaseJob() {
    
    // 任务执行的次数(模拟真实业务上的退出)int runTime = 1;
    
    @Override
    public void run() {
        
        // 业务逻辑
        System.out.println("hello world");

        // 这里模拟了微信回调成功,任务完成
        if (runTime++ > 3) {this.setExit(true);
        }
    }
};
/**
 * 测试按照指定时间隔执行某个任务
 * @throws IOException
 */
@Test
public void test1() throws IOException {
    
    // 新建一个产生指定时间的延迟时间生成器,内部就是个队列
    DesignatDTGenerator designatDTGenerator = new DesignatDTGenerator();
    
    // 设置时间间隔
    designatDTGenerator.addDelayTime(1_000) // 1 秒后执行
                       .addDelayTime(4_000) // 距离上次执行 4 秒后执行
                       .addDelayTime(15_000) // 距离上次执行 15 秒后执行
                       .addDelayTime(180_000) // 距离上次执行 3 分钟后执行
                       .addDelayTime(180_000) // 距离上次执行 3 分钟后执行
                       .addDelayTime(360_000) // 距离上次执行 6 分钟后执行
                       .addDelayTime(3_600_000); // 距离上次执行 1 小时后执行
        
    // 构造一个提交的任务,传入具体的业务对象 task,传入延迟时间生成器 designatDTGenerator
    DelayTimeJob delayTimeJob = new DelayTimeJob(task, designatDTGenerator);
    
    // 新建一个执行器,执行器可以重复使用,每次提交新的任务即可
    JobActuator actuator = new JobActuator(); 
    
    // 提交任务,开始执行任务
    actuator.addJob(delayTimeJob);
    
    // 阻塞主线程,方便查看运行结果
    System.in.read();}
/**
 * 测试按照固定时间间隔执行某个任务
 * 只是延迟时间生成器不同而已,可以达到不同的调用效果
 * @throws IOException
 */
@Test
public void test2() throws IOException {
    
    // 新建一个执行器
    JobActuator actuator = new JobActuator(); 
    
    // 新建一个产生固定时间的延迟时间生成器,每 3s 执行一次
    FixedRateDTGenerator fixedRateDTGenerator = new FixedRateDTGenerator(3000);
    
    // 新建一个任务
    DelayTimeJob delayTimeJob = new DelayTimeJob(task, fixedRateDTGenerator);

    // 提交任务,开始执行任务
    actuator.addJob(delayTimeJob);
    
    // 阻塞主线程,方便查看运行结果
    System.in.read();}

类图

各个类的作用

项目地址

JobActuator
任务执行器,本身继承了 Thread,职责是在run 方法中不断从延迟任务队列 DelayQueue 中获取延迟到期的任务,
再交由线程池 ExecutorService 执行。延迟效果的都是依靠 DelayQueue 实现。

public class JobActuator extends Thread {

    /** 线程池 */
    ExecutorService es = Executors.newFixedThreadPool(2);
    
    /** 任务队列 */
    DelayQueue<DelayTimeJob> jobs = new DelayQueue<>();
    
    /** 构造方法,实例化时启动线程 */
    public JobActuator() {this.start();
    }
    
    public void addJob(DelayTimeJob job) {
        // 设置任务队列,用于任务重新入队
        job.setJobs(jobs);
        // 任务入队
        jobs.offer(job);
    }
    
    @Override
    public void run() {while (true) {
            
            try {
                // 从延迟队列中获取任务
                DelayTimeJob job = jobs.take();
                // 利用线程池执行任务
                es.submit(job);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }
    
}

DelayTimeJob
实现了 Delayed 接口,执行实际的业务并决定任务是否重新进入延迟队列。

public class DelayTimeJob implements Runnable, Delayed {
    
    /** 执行器的任务队列,用于任务重新入队 */
    @Setter
    private DelayQueue<DelayTimeJob> jobs;

    /** 延迟时间生成器 */
    IDelayTimeGenerator delayTimeGenerator;
    
    /** 具体要执行的任务 */
    private BaseJob realJob;
    
    private long time = 0L;
    
    public DelayTimeJob(BaseJob baseJob, IDelayTimeGenerator delayTimeGenerator) {
        
        this.realJob = baseJob;
        this.delayTimeGenerator = delayTimeGenerator;
        
        Integer delayTime = delayTimeGenerator.getDelayTime();
        if (delayTime == null) {return ;}
        
        this.time = delayTime + System.currentTimeMillis();}
    
    @Override
    public void run() {
        
        // 执行业务
        realJob.run();
        
        // 任务不再需要执行,主动退出
        if (realJob.isExit) {return ;}
        
        // 获取延迟 
        Integer delayTime = delayTimeGenerator.getDelayTime();
        
        // 无延迟时间,则任务不再执行
        if (delayTime == null) {return ;}
        
        // 重新入队
        time += delayTime;
        jobs.offer(this);
        return ;
    }

    @Override
    public long getDelay(TimeUnit unit) {return unit.convert(this.time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {DelayTimeJob other = (DelayTimeJob) o;  
        long diff = time - other.time;  
        
        if (diff > 0) {return 1;} 
        if (diff < 0) {return -1;}
        return 0;
    }
    
}

BaseJob
用户继承此抽象类,在 run 方法中编写业务代码,通过控制 isExit 变量控制任务是否执行。

public abstract class BaseJob implements Runnable {

    /** 用于控制任务是否退出 */
    @Setter
    boolean isExit = false;
    
}

IDelayTimeGenerator
延迟时间生成器接口,返回一个延迟时间。可以实现不同的策略,达到不同的延迟效果。
DesignatDTGenerator 是定义每一次执行的时间间隔,FixedRateDTGenerator是按照某一个固定频率执行。

public interface IDelayTimeGenerator {
    
    /** 返回延迟的时间,单位:毫秒 */
    Integer getDelayTime();}
/**
 * 指定时间的时间生成器
 * @author cck
 */
public class DesignatDTGenerator implements IDelayTimeGenerator {private final Deque<Integer> delayTimeQueue = new ArrayDeque<>();
    
    /**
     * 添加延迟时间
     * @param delayTime
     */
    public DesignatDTGenerator addDelayTime(Integer delayTime) {delayTimeQueue.offer(delayTime);
        return this;
    }
    
    @Override
    public Integer getDelayTime() {return delayTimeQueue.poll();
    }

}
/**
 * 固定间隔的时间生成器
 * @author cck
 */
public class FixedRateDTGenerator implements IDelayTimeGenerator {

    private Integer delayTime;
    
    public FixedRateDTGenerator(Integer delayTime) {this.delayTime = delayTime;}
    
    @Override
    public Integer getDelayTime() {return delayTime;}

}

关键类 DelayQueueDelayed

DelayQueueJava 提供的延迟队列,该队列只允许实现了 Delayed 接口的对象入队。
调用队列的 take 方法时,队列会阻塞,直到有延迟到期的元素才会返回。

总结

这个方式是可以实现一开始想要的按照 15s/15s/30s/3m/10m/.. 指定的间隔执行任务的效果的。
定制延迟的效果只需要给出不同的 IDelayTimeGenerator 接口实现即可。

在和 spring 一起使用时,任务执行器 JobActuator 应该是单例的,
不过提交任务的整个操作相比于 spring 的一个注解,还是显得麻烦囧,使用时再封装一层会更好。

现在的实现方式是和 Java 的延迟队列绑定了的,但是延迟队列有多种实现方式,
例如 redisrabbitMQ 等,如果能够做出更高级的抽象,合入不同的延迟队列那会更好。
此外这种实现方式性能方面也有待验证。

正文完
 0