概述
最近我的项目上反馈某个重要的定时工作忽然不执行了,很头疼,开发环境和测试环境都没有呈现过这个问题。定时工作采纳的是ScheduledThreadPoolExecutor
,起初一看代码发现踩了一个大坑….
还原”大坑”
这个坑就是如果ScheduledThreadPoolExecutor
中执行的工作出错抛出异样后,不仅不会打印异样堆栈信息,同时还会勾销前面的调度, 间接看例子。
@Test
public void testException() throws InterruptedException {
// 创立1个线程的调度工作线程池
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// 创立一个工作
Runnable runnable = new Runnable() {
volatile int num = 0;
@Override
public void run() {
num ++;
// 模仿执行报错
if(num > 5) {
throw new RuntimeException("执行谬误");
}
log.info("exec num: [{}].....", num);
}
};
// 每隔1秒钟执行一次工作
scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
Thread.sleep(10000);
}
运行后果:
- 只执行了5次后,就不打印,不执行了,因为报错了
- 工作报错,也没有打印一次堆栈,更导致调度工作勾销,结果非常重大。
解决方案
解决办法也非常简单,只有通过try catch捕捉异样即可。
运行后果:
看到不仅打印了异样堆栈,而且也会进行周期性的调度。
更举荐的做法
更好的倡议能够在本人的我的项目中封装一个包装类,要求所有的调度都提交通过咱们对立的包装类, 如下代码:
@Slf4j
public class RunnableWrapper implements Runnable {
// 理论要执行的线程工作
private Runnable task;
// 线程工作被创立进去的工夫
private long createTime;
// 线程工作被线程池运行的开始工夫
private long startTime;
// 线程工作被线程池运行的完结工夫
private long endTime;
// 线程信息
private String taskInfo;
private boolean showWaitLog;
/**
* 执行间隔时间多久,打印日志
*/
private long durMs = 1000L;
// 当这个工作被创立进去的时候,就会设置他的创立工夫
// 然而接下来有可能这个工作提交到线程池后,会进入线程池的队列排队
public RunnableWrapper(Runnable task, String taskInfo) {
this.task = task;
this.taskInfo = taskInfo;
this.createTime = System.currentTimeMillis();
}
public void setShowWaitLog(boolean showWaitLog) {
this.showWaitLog = showWaitLog;
}
public void setDurMs(long durMs) {
this.durMs = durMs;
}
// 当工作在线程池排队的时候,这个run办法是不会被运行的
// 然而当工作完结了排队,失去线程池运行机会的时候,这个办法会被调用
// 此时就能够设置线程工作的开始运行工夫
@Override
public void run() {
this.startTime = System.currentTimeMillis();
// 此处能够通过调用监控零碎的API,实现监控指标上报
// 用线程工作的startTime-createTime,其实就是工作排队工夫
// 这边打印日志输入,也能够输入到监控零碎中
if(showWaitLog) {
log.info("工作信息: [{}], 工作排队工夫: [{}]ms", taskInfo, startTime - createTime);
}
// 接着能够调用包装的理论工作的run办法
try {
task.run();
} catch (Exception e) {
log.error("run task error", e);
throw e;
}
// 工作运行结束当前,会设置工作运行完结的工夫
this.endTime = System.currentTimeMillis();
// 此处能够通过调用监控零碎的API,实现监控指标上报
// 用线程工作的endTime - startTime,其实就是工作运行工夫
// 这边打印工作执行工夫,也能够输入到监控零碎中
if(endTime - startTime > durMs) {
log.info("工作信息: [{}], 工作执行工夫: [{}]ms", taskInfo, endTime - startTime);
}
}
}
应用:
咱们还能够在包装类外面封装各种监控行为,如本例打印日志执行工夫等。
原理探索
那大家有没有想过为什么工作出错会导致异样无奈打印,甚至调度都勾销了呢?让咱们从源码登程,一探到底。
- 上面是调度工作的入口办法。
// ScheduledThreadPoolExecutor#scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 将执行工作和参数包装成ScheduledFutureTask对象
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 提早执行
delayedExecute(t);
return t;
}
这个办法次要做了两个事件:
- 将执行工作和参数包装成ScheduledFutureTask对象
- 调用
delayedExecute
办法提早执行工作
- 提早或周期性工作的次要执行办法, 次要是将工作丢到队列中,后续由工作线程获取执行。
// ScheduledThreadPoolExecutor#delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 将工作丢到阻塞队列中
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 开启工作线程,去执行工作,或者从队列中获取工作执行
ensurePrestart();
}
}
- 当初工作曾经在队列中了,咱们看下工作执行的内容是什么,还记得后面的包装对象
ScheduledFutureTask
类,它的实现类是ScheduledFutureTask
,继承了Runnable类。
// ScheduledFutureTask#run办法
public void run() {
// 是不是周期性工作
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 不是周期性工作的话, 间接调用一次上面的run
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期性工作,则调用runAndReset办法,如果返回true,继续执行
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次调度工夫
setNextRunTime();
// 从新执行调度工作
reExecutePeriodic(outerTask);
}
}
- 这里的要害就是看
ScheduledFutureTask.super.runAndReset()
办法是否返回true,如果是true的话持续调度。
- runAndReset办法也很简略,要害就是看报异样如何解决。
// FutureTask#runAndReset
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
// 是否持续下次调度,默认false
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 执行工作
c.call();
// 执行胜利的话,设置为true
ran = true;
// 异样解决,关键点
} catch (Throwable ex) {
// 不会批改ran的值,最终是false,同时也不打印异样堆栈
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 返回后果
return ran && s == NEW;
}
- 关键点ran变量,最终返回是不是下次持续调度执行
- 如果抛出异样的话,能够看到不会批改ran为true。
总结
Java的ScheduledThreadPoolExecutor定时工作线程池所调度的工作中如果抛出了异样,并且异样没有捕捉间接抛到框架中,会导致ScheduledThreadPoolExecutor定时工作不调度了。这个论断心愿大家肯定要记住,不然十分坑,要害是有时候测试环境、开发环境还无奈复现,有肯定的随机性,真的到了生产就完蛋了。
对于这些知识点,咱们不仅要知其然,还要知其所以然,这样才会记忆粗浅,不然很容易忘记。
发表回复