关于java:ScheduledThreadPoolExecutor踩过最痛的坑

概述

最近我的项目上反馈某个重要的定时工作忽然不执行了,很头疼,开发环境和测试环境都没有呈现过这个问题。定时工作采纳的是ScheduledThreadPoolExecutor,起初一看代码发现踩了一个大坑….

还原”大坑”

这个坑就是如果ScheduledThreadPoolExecutor中执行的工作出错抛出异样后,不仅不会打印异样堆栈信息,同时还会勾销前面的调度, 间接看例子。

@Test
public void testException() throws InterruptedException {
    // 创立1个线程的调度工作线程池
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    // 创立一个工作
    Runnable runnable = new Runnable() {

        volatile int num = 0;

        @Override
        public void run() {
            num ++;
            // 模仿执行报错
            if(num > 5) {
                throw new RuntimeException("执行谬误");
            }
            log.info("exec num: [{}].....", num);
        }
    };

    // 每隔1秒钟执行一次工作
    scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
    Thread.sleep(10000);
}

运行后果:

  • 只执行了5次后,就不打印,不执行了,因为报错了
  • 工作报错,也没有打印一次堆栈,更导致调度工作勾销,结果非常重大。

解决方案

解决办法也非常简单,只有通过try catch捕捉异样即可。

运行后果:

看到不仅打印了异样堆栈,而且也会进行周期性的调度。

更举荐的做法

更好的倡议能够在本人的我的项目中封装一个包装类,要求所有的调度都提交通过咱们对立的包装类, 如下代码:

@Slf4j
public class RunnableWrapper implements Runnable {
    // 理论要执行的线程工作
    private Runnable task;
    // 线程工作被创立进去的工夫
    private long createTime;
    // 线程工作被线程池运行的开始工夫
    private long startTime;
    // 线程工作被线程池运行的完结工夫
    private long endTime;
    // 线程信息
    private String taskInfo;

    private boolean showWaitLog;

    /**
     * 执行间隔时间多久,打印日志
     */
    private long durMs = 1000L;

    // 当这个工作被创立进去的时候,就会设置他的创立工夫
    // 然而接下来有可能这个工作提交到线程池后,会进入线程池的队列排队
    public RunnableWrapper(Runnable task, String taskInfo) {
        this.task = task;
        this.taskInfo = taskInfo;
        this.createTime = System.currentTimeMillis();
    }

    public void setShowWaitLog(boolean showWaitLog) {
        this.showWaitLog = showWaitLog;
    }

    public void setDurMs(long durMs) {
        this.durMs = durMs;
    }

    // 当工作在线程池排队的时候,这个run办法是不会被运行的
    // 然而当工作完结了排队,失去线程池运行机会的时候,这个办法会被调用
    // 此时就能够设置线程工作的开始运行工夫
    @Override
    public void run() {
        this.startTime = System.currentTimeMillis();

        // 此处能够通过调用监控零碎的API,实现监控指标上报
        // 用线程工作的startTime-createTime,其实就是工作排队工夫
        // 这边打印日志输入,也能够输入到监控零碎中
        if(showWaitLog) {
            log.info("工作信息: [{}], 工作排队工夫: [{}]ms", taskInfo, startTime - createTime);
        }

        // 接着能够调用包装的理论工作的run办法
        try {
            task.run();
        } catch (Exception e) {
            log.error("run task error", e);
            throw e;
        }

        // 工作运行结束当前,会设置工作运行完结的工夫
        this.endTime = System.currentTimeMillis();

        // 此处能够通过调用监控零碎的API,实现监控指标上报
        // 用线程工作的endTime - startTime,其实就是工作运行工夫
        // 这边打印工作执行工夫,也能够输入到监控零碎中
        if(endTime - startTime > durMs) {
            log.info("工作信息: [{}], 工作执行工夫: [{}]ms", taskInfo, endTime - startTime);
        }

    }
}

应用:

咱们还能够在包装类外面封装各种监控行为,如本例打印日志执行工夫等。

原理探索

那大家有没有想过为什么工作出错会导致异样无奈打印,甚至调度都勾销了呢?让咱们从源码登程,一探到底。

  1. 上面是调度工作的入口办法。
// ScheduledThreadPoolExecutor#scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 将执行工作和参数包装成ScheduledFutureTask对象
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 提早执行
    delayedExecute(t);
    return t;
}

这个办法次要做了两个事件:

  • 将执行工作和参数包装成ScheduledFutureTask对象
  • 调用delayedExecute办法提早执行工作
  1. 提早或周期性工作的次要执行办法, 次要是将工作丢到队列中,后续由工作线程获取执行。
// ScheduledThreadPoolExecutor#delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            // 将工作丢到阻塞队列中
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 开启工作线程,去执行工作,或者从队列中获取工作执行
                ensurePrestart();
        }
    }
  1. 当初工作曾经在队列中了,咱们看下工作执行的内容是什么,还记得后面的包装对象ScheduledFutureTask类,它的实现类是ScheduledFutureTask,继承了Runnable类。
// ScheduledFutureTask#run办法
public void run() {
    // 是不是周期性工作
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 不是周期性工作的话, 间接调用一次上面的run    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性工作,则调用runAndReset办法,如果返回true,继续执行
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次调度工夫
        setNextRunTime();
        // 从新执行调度工作
        reExecutePeriodic(outerTask);
    }
}
  • 这里的要害就是看ScheduledFutureTask.super.runAndReset()办法是否返回true,如果是true的话持续调度。
  1. runAndReset办法也很简略,要害就是看报异样如何解决。
// FutureTask#runAndReset
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    // 是否持续下次调度,默认false
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 执行工作
                c.call(); 
                // 执行胜利的话,设置为true
                ran = true;

                // 异样解决,关键点
            } catch (Throwable ex) {
                // 不会批改ran的值,最终是false,同时也不打印异样堆栈
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 返回后果
    return ran && s == NEW;
}
  • 关键点ran变量,最终返回是不是下次持续调度执行
  • 如果抛出异样的话,能够看到不会批改ran为true。

总结

Java的ScheduledThreadPoolExecutor定时工作线程池所调度的工作中如果抛出了异样,并且异样没有捕捉间接抛到框架中,会导致ScheduledThreadPoolExecutor定时工作不调度了。这个论断心愿大家肯定要记住,不然十分坑,要害是有时候测试环境、开发环境还无奈复现,有肯定的随机性,真的到了生产就完蛋了。

对于这些知识点,咱们不仅要知其然,还要知其所以然,这样才会记忆粗浅,不然很容易忘记。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理