概述
最近我的项目上反馈某个重要的定时工作忽然不执行了,很头疼,开发环境和测试环境都没有呈现过这个问题。定时工作采纳的是ScheduledThreadPoolExecutor
,起初一看代码发现踩了一个大坑....
还原"大坑"
这个坑就是如果ScheduledThreadPoolExecutor
中执行的工作出错抛出异样后,不仅不会打印异样堆栈信息,同时还会勾销前面的调度, 间接看例子。
@Testpublic void testException() throws InterruptedException { // 创立1个线程的调度工作线程池 ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); // 创立一个工作 Runnable runnable = new Runnable() { volatile int num = 0; @Override public void run() { num ++; // 模仿执行报错 if(num > 5) { throw new RuntimeException("执行谬误"); } log.info("exec num: [{}].....", num); } }; // 每隔1秒钟执行一次工作 scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS); Thread.sleep(10000);}
运行后果:
- 只执行了5次后,就不打印,不执行了,因为报错了
- 工作报错,也没有打印一次堆栈,更导致调度工作勾销,结果非常重大。
解决方案
解决办法也非常简单,只有通过try catch捕捉异样即可。
运行后果:
看到不仅打印了异样堆栈,而且也会进行周期性的调度。
更举荐的做法
更好的倡议能够在本人的我的项目中封装一个包装类,要求所有的调度都提交通过咱们对立的包装类, 如下代码:
@Slf4jpublic class RunnableWrapper implements Runnable { // 理论要执行的线程工作 private Runnable task; // 线程工作被创立进去的工夫 private long createTime; // 线程工作被线程池运行的开始工夫 private long startTime; // 线程工作被线程池运行的完结工夫 private long endTime; // 线程信息 private String taskInfo; private boolean showWaitLog; /** * 执行间隔时间多久,打印日志 */ private long durMs = 1000L; // 当这个工作被创立进去的时候,就会设置他的创立工夫 // 然而接下来有可能这个工作提交到线程池后,会进入线程池的队列排队 public RunnableWrapper(Runnable task, String taskInfo) { this.task = task; this.taskInfo = taskInfo; this.createTime = System.currentTimeMillis(); } public void setShowWaitLog(boolean showWaitLog) { this.showWaitLog = showWaitLog; } public void setDurMs(long durMs) { this.durMs = durMs; } // 当工作在线程池排队的时候,这个run办法是不会被运行的 // 然而当工作完结了排队,失去线程池运行机会的时候,这个办法会被调用 // 此时就能够设置线程工作的开始运行工夫 @Override public void run() { this.startTime = System.currentTimeMillis(); // 此处能够通过调用监控零碎的API,实现监控指标上报 // 用线程工作的startTime-createTime,其实就是工作排队工夫 // 这边打印日志输入,也能够输入到监控零碎中 if(showWaitLog) { log.info("工作信息: [{}], 工作排队工夫: [{}]ms", taskInfo, startTime - createTime); } // 接着能够调用包装的理论工作的run办法 try { task.run(); } catch (Exception e) { log.error("run task error", e); throw e; } // 工作运行结束当前,会设置工作运行完结的工夫 this.endTime = System.currentTimeMillis(); // 此处能够通过调用监控零碎的API,实现监控指标上报 // 用线程工作的endTime - startTime,其实就是工作运行工夫 // 这边打印工作执行工夫,也能够输入到监控零碎中 if(endTime - startTime > durMs) { log.info("工作信息: [{}], 工作执行工夫: [{}]ms", taskInfo, endTime - startTime); } }}
应用:
咱们还能够在包装类外面封装各种监控行为,如本例打印日志执行工夫等。
原理探索
那大家有没有想过为什么工作出错会导致异样无奈打印,甚至调度都勾销了呢?让咱们从源码登程,一探到底。
- 上面是调度工作的入口办法。
// ScheduledThreadPoolExecutor#scheduleAtFixedRatepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 将执行工作和参数包装成ScheduledFutureTask对象 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 提早执行 delayedExecute(t); return t;}
这个办法次要做了两个事件:
- 将执行工作和参数包装成ScheduledFutureTask对象
- 调用
delayedExecute
办法提早执行工作
- 提早或周期性工作的次要执行办法, 次要是将工作丢到队列中,后续由工作线程获取执行。
// ScheduledThreadPoolExecutor#delayedExecuteprivate void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { // 将工作丢到阻塞队列中 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 开启工作线程,去执行工作,或者从队列中获取工作执行 ensurePrestart(); } }
- 当初工作曾经在队列中了,咱们看下工作执行的内容是什么,还记得后面的包装对象
ScheduledFutureTask
类,它的实现类是ScheduledFutureTask
,继承了Runnable类。
// ScheduledFutureTask#run办法public void run() { // 是不是周期性工作 boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); // 不是周期性工作的话, 间接调用一次上面的run else if (!periodic) ScheduledFutureTask.super.run(); // 如果是周期性工作,则调用runAndReset办法,如果返回true,继续执行 else if (ScheduledFutureTask.super.runAndReset()) { // 设置下次调度工夫 setNextRunTime(); // 从新执行调度工作 reExecutePeriodic(outerTask); }}
- 这里的要害就是看
ScheduledFutureTask.super.runAndReset()
办法是否返回true,如果是true的话持续调度。
- runAndReset办法也很简略,要害就是看报异样如何解决。
// FutureTask#runAndResetprotected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; // 是否持续下次调度,默认false boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { // 执行工作 c.call(); // 执行胜利的话,设置为true ran = true; // 异样解决,关键点 } catch (Throwable ex) { // 不会批改ran的值,最终是false,同时也不打印异样堆栈 setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 返回后果 return ran && s == NEW;}
- 关键点ran变量,最终返回是不是下次持续调度执行
- 如果抛出异样的话,能够看到不会批改ran为true。
总结
Java的ScheduledThreadPoolExecutor定时工作线程池所调度的工作中如果抛出了异样,并且异样没有捕捉间接抛到框架中,会导致ScheduledThreadPoolExecutor定时工作不调度了。这个论断心愿大家肯定要记住,不然十分坑,要害是有时候测试环境、开发环境还无奈复现,有肯定的随机性,真的到了生产就完蛋了。
对于这些知识点,咱们不仅要知其然,还要知其所以然,这样才会记忆粗浅,不然很容易忘记。