共计 5942 个字符,预计需要花费 15 分钟才能阅读完成。
Java 中一个计算量很大的任务执行,如何中途取消呢?这个问题好像不难回答,书籍上提到的中断机制、循环状态的方式可以做到。其中有一个 Future.cancel 的方法引了我的注意。Cancel 是怎么取消的呢?所以查看了 FutureTask 的 cancle 源码。
首先要了解 FutureTask 定义的其各种状态,代码如下:
/**
* 这个任务运行的状态,初始化时时 NEW 状态。运行状态只有在这几个方法中能转移到最终状态:*setException、cancel、set。在运行期间,运行状态可能会存在短暂状态:COMPLETING(当结果被设置中)和
*INTERRUPTING。从这些中间状态到最终状态的转换使用便宜的有序 / 惰性写入,因为值是唯一的,无法进一步修改。* 可能的状态转移:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
再来看 cancel 的源码:如果状态不是 NEW,直接返回取消失败。如果运行中断,将运行任务的线程中断,finishCompletion 则是在中断设置完成后,
通知唤醒调用 get 的线程。
// 参数 mayInterruptIfRunning 是运行的时候 是否能中断
public boolean cancel(boolean mayInterruptIfRunning) {//!( 任务运行的状态是 NEW 且可以 cas 修改为 INTERRUPTING、CANCELLED) 直接返回
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果参数是 true,将运行任务的线程直接中断,并且将 future 的状态设置为 INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//
finishCompletion();}
return true;
}
从源码开看,是将运行线程设置了中断。如果运行任务的线程没有中断处理策略,任务能被取消吗? 试验一下:首先定义一个需要一定时间的任务。
public class BigCalculationTask implements Callable<BigDecimal> {
long CAL_NUMBER=0L;
public BigCalculationTask(long calNumber){CAL_NUMBER=calNumber;}
@Override
public BigDecimal call() throws Exception {System.out.println("执行 call 方法");
BigDecimal rs=BigDecimal.ZERO;
for (int i=0;i<CAL_NUMBER;i++){BigDecimal divideValue=new BigDecimal("8965426.32").divide(new BigDecimal("1.236984524").add(new BigDecimal(i)),10,BigDecimal.ROUND_HALF_UP);
System.out.println("计算次数"+i);
rs=rs.add(divideValue);
}
return rs;
}
}```
测试代码:
public class CancelTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService= Executors.newFixedThreadPool(1);
Future task=executorService.submit(new BigCalculationTask(1000000));
task.cancel(true);
executorService.shutdown();
System.out.println(task.get());
}
}
运行结果如下,抛出 CancellationException。
Exception in thread “main” java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
但是 call 方法没有打印,我猜测是否是 futureTask 还没启动。从输出的日志先来看 report 方法,report 是被 get 方法调用的。再看 cancle 方法,我们传入的参数是 true,多以当 cancel 方法被调用且当其状态时 NEW,那么状态先转为:INTERRUPTING,在转为 INTERRUPTED。report 方法检测到状态 >CANCELLED, 直接抛出了 CancellationException。为了验证我的猜测,我在 System.out.println(task.get()); 加入断点,debug 截图为:最终状态为 6(INTERRUPTED)。如果我们将 cancle 参数改为 false, 会发生什么呢?控制台输出结果跟 true 相同,同样设置断点,task 最终状态为:CANCELLED. 截图如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 等待任务状态终结(NOMAL、EXCEPTIONAL、CANCELLED、INTERUPTED)s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
我们改动测试代码:
public class BigCalculationTask implements Callable<BigDecimal> {
CountDownLatch latch;
long CAL_NUMBER=0L;
public BigCalculationTask(long calNumber,CountDownLatch latch){
CAL_NUMBER=calNumber;
this.latch=latch;
}
@Override
public BigDecimal call() throws Exception {latch.countDown();
System.out.println("执行 call 方法");
BigDecimal rs=BigDecimal.ZERO;
for (int i=0;i<CAL_NUMBER;i++){BigDecimal divideValue=new BigDecimal("8965426.32").divide(new BigDecimal("1.236984524").add(new BigDecimal(i)),10,BigDecimal.ROUND_HALF_UP);
System.out.println("计算次数"+i);
rs=rs.add(divideValue);
}
return rs;
}
}
public class CancelTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService= Executors.newFixedThreadPool(1);
CountDownLatch latch=new CountDownLatch(1);
Future<BigDecimal> task=executorService.submit(new BigCalculationTask(1000000,latch));
latch.await();
task.cancel(true);
executorService.shutdown();
executorService.shutdown();
BigDecimal result=task.get();}
}
代码我们进行了改造,使用闭锁确保计算任务先启动,在取消。从控制台 get 方法抛出 CancellationException,但是计算任务并没有停止, 即 cancel 方法没有成功取消任务。通过 cancel 源码,我们参数时 true,cancel 将执行计算任务的线程标志为中断,因为我们没有加入中断策略,所以没有通过中断取消成功。改造我们的计算任务代码,再合适的位置加入处理中断策略即可。代码如下:
public BigDecimal call() throws Exception {
latch.countDown();
System.out.println("执行 call 方法");
BigDecimal rs=BigDecimal.ZERO;
for (int i=0;i<CAL_NUMBER;i++){BigDecimal divideValue=new BigDecimal("8965426.32").divide(new BigDecimal("1.236984524").add(new BigDecimal(i)),10,BigDecimal.ROUND_HALF_UP);
System.out.println("计算次数"+i);
rs=rs.add(divideValue);
if(Thread.interrupted()){break;}
}
return rs;
}
还有一个疑问,get 方法时如何做到一直等待到任务执行成功返回结果的呢?get 方法有个 awaitDone 方法,奥秘就在这个方法里。在看 awaitDone 方法前,先看一个 FutureTask 的一个内部类,WaitNode.WaitNode 很简单,是个链表结构,构造函数将 thread 属性赋值为当前的线程。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
再看 awaitDone 的源码。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 如果本线程已经被中断了,抛出异常
if (Thread.interrupted()) {
// 尝试取消链接超时或中断的等待节点,以避免积累垃圾
removeWaiter(q);
throw new InterruptedException();}
int s = state;
// 如果 futureTask 的状态时最终的状态之一,则直接返回。if (s > COMPLETING) {if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 如果时执行完成 赋值阶段,将调用 get 方法的线程挂起
Thread.yield();
else if (q == null)
q = new WaitNode();// 如果 q ==null, 创建一个 waitNode
else if (!queued)
// 如果已经创建了 waitNode 还没有加入 waiters 列表,则用 cas 方式将 q.next 设置成原来的 waiters,将 futureTask 的 waiters 指向 q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {// 如果有堵塞的时间限制
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {// 如果达到堵塞的时间限制
removeWaiter(q);
return state;
}
// 将当前线程挂起一定的时间
LockSupport.parkNanos(this, nanos);
}
else
// 将当前线程挂起
LockSupport.park(this);
}
}
总结:1、如果开发可以取消的任务,最好采用提供中断策略的方式。2、FutureTask 任务在开始之前调用 cancle,任务将不执行。3、FutureTask 任务在开始之前调用 cancle,如果没有提供中断策略,任务不会正常取消。参考:《Java 并发编程实战》http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/