Java中一个计算量很大的任务执行,如何中途取消呢?这个问题好像不难回答,书籍上提到的中断机制、循环状态的方式可以做到。其中有一个Future.cancel 的方法引了我的注意。Cancel是怎么取消的呢?所以查看了FutureTask的cancle源码。

首先要了解FutureTask 定义的其各种状态,代码如下:

   /**     *这个任务运行的状态,初始化时时NEW状态。运行状态只有在这几个方法中能转移到最终状态:     *setException、cancel、set 。在运行期间,运行状态可能会存在短暂状态:COMPLETING(当结果被设置中) 和     *INTERRUPTING。从这些中间状态到最终状态的转换使用便宜的有序/惰性写入,因为值是唯一的,无法进一步修改。     * 可能的状态转移:     * NEW -> COMPLETING -> NORMAL     * NEW -> COMPLETING -> EXCEPTIONAL     * NEW -> CANCELLED     * NEW -> INTERRUPTING -> INTERRUPTED     */    private volatile int state;    private static final int NEW          = 0;    private static final int COMPLETING   = 1;    private static final int NORMAL       = 2;    private static final int EXCEPTIONAL  = 3;    private static final int CANCELLED    = 4;    private static final int INTERRUPTING = 5;    private static final int INTERRUPTED  = 6;

再来看cancel的源码:如果状态不是NEW,直接返回取消失败。如果运行中断,将运行任务的线程中断,finishCompletion 则是在中断设置完成后,
通知唤醒调用get的线程。

//参数mayInterruptIfRunning 是运行的时候 是否能中断 public boolean cancel(boolean mayInterruptIfRunning) {        //!(任务运行的状态是NEW且可以cas修改为INTERRUPTING、CANCELLED)直接返回        if (!(state == NEW &&              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))            return false;        try {                 // 如果参数是true,将运行任务的线程直接中断,并且将future的状态设置为INTERRUPTED            if (mayInterruptIfRunning) {                try {                    Thread t = runner;                    if (t != null)                        t.interrupt();                } finally { // final state                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);                }            }        } finally {            //            finishCompletion();        }        return true;    }

从源码开看,是将运行线程设置了中断。如果运行任务的线程没有中断处理策略,任务能被取消吗?试验一下:首先定义一个需要一定时间的任务。

    public class BigCalculationTask implements Callable<BigDecimal> {    long CAL_NUMBER=0L;    public  BigCalculationTask(long calNumber){        CAL_NUMBER=calNumber;    }    @Override    public BigDecimal call() throws Exception {        System.out.println("执行call方法");        BigDecimal rs=BigDecimal.ZERO;        for (int i=0;i<CAL_NUMBER;i++){            BigDecimal divideValue=new BigDecimal("8965426.32").divide(new BigDecimal("1.236984524").add(new BigDecimal(i)),10,BigDecimal.ROUND_HALF_UP);           System.out.println("计算次数"+i);            rs=rs.add(divideValue);        }        return rs;    }}```测试代码:

public class CancelTaskTest {

public static void main(String[] args) throws ExecutionException, InterruptedException {    ExecutorService executorService= Executors.newFixedThreadPool(1);    Future task=executorService.submit(new BigCalculationTask(1000000));    task.cancel(true);     executorService.shutdown();    System.out.println(task.get());}

}

 运行结果如下,抛出CancellationException。

Exception in thread "main" java.util.concurrent.CancellationException

at java.util.concurrent.FutureTask.report(FutureTask.java:121)at java.util.concurrent.FutureTask.get(FutureTask.java:192)
 但是call方法没有打印,我猜测是否是futureTask还没启动。从输出的日志先来看report方法,report是被get方法调用的。再看cancle方法,我们传入的参数是true,多以当cancel方法被调用且当其状态时NEW,那么状态先转为:INTERRUPTING,在转为INTERRUPTED。report方法检测到状态>CANCELLED,直接抛出了CancellationException。为了验证我的猜测,我在System.out.println(task.get()); 加入断点,debug截图为:最终状态为6(INTERRUPTED)。如果我们将cancle参数改为false,会发生什么呢?控制台输出结果跟true相同,同样设置断点,task最终状态为:CANCELLED.截图如下:

public V get() throws InterruptedException, ExecutionException {

    int s = state;    if (s <= COMPLETING)        //等待任务状态终结(NOMAL、EXCEPTIONAL、CANCELLED、INTERUPTED)        s = awaitDone(false, 0L);    return report(s);}

private V report(int s) throws ExecutionException {

    Object x = outcome;    if (s == NORMAL)        return (V)x;    if (s >= CANCELLED)        throw new CancellationException();    throw new ExecutionException((Throwable)x);}  
我们改动测试代码:

public class BigCalculationTask implements Callable<BigDecimal> {

CountDownLatch latch;long CAL_NUMBER=0L;public  BigCalculationTask(long calNumber,CountDownLatch latch){    CAL_NUMBER=calNumber;    this.latch=latch;}@Overridepublic BigDecimal call() throws Exception {    latch.countDown();    System.out.println("执行call方法");    BigDecimal rs=BigDecimal.ZERO;    for (int i=0;i<CAL_NUMBER;i++){        BigDecimal divideValue=new BigDecimal("8965426.32").divide(new BigDecimal("1.236984524").add(new BigDecimal(i)),10,BigDecimal.ROUND_HALF_UP);       System.out.println("计算次数"+i);        rs=rs.add(divideValue);    }    return rs;}

}
public class CancelTaskTest {

public static void main(String[] args) throws ExecutionException, InterruptedException {    ExecutorService executorService= Executors.newFixedThreadPool(1);    CountDownLatch latch=new CountDownLatch(1);    Future<BigDecimal> task=executorService.submit(new BigCalculationTask(1000000,latch));     latch.await();    task.cancel(true);    executorService.shutdown();    executorService.shutdown();    BigDecimal result=task.get();}

}

代码我们进行了改造,使用闭锁确保计算任务先启动,在取消 。从控制台get方法抛出CancellationException,但是计算任务并没有停止,即cancel方法没有成功取消任务。通过cancel源码,我们参数时true,cancel将执行计算任务的线程标志为中断,因为我们没有加入中断策略,所以没有通过中断取消成功。改造我们的计算任务代码,再合适的位置加入处理中断策略即可。代码如下:

public BigDecimal call() throws Exception {

    latch.countDown();    System.out.println("执行call方法");    BigDecimal rs=BigDecimal.ZERO;    for (int i=0;i<CAL_NUMBER;i++){        BigDecimal divideValue=new BigDecimal("8965426.32").divide(new BigDecimal("1.236984524").add(new BigDecimal(i)),10,BigDecimal.ROUND_HALF_UP);       System.out.println("计算次数"+i);        rs=rs.add(divideValue);        if(Thread.interrupted()){            break;        }    }    return rs;}
还有一个疑问,get方法时如何做到一直等待到任务执行成功返回结果的呢?get方法有个awaitDone方法,奥秘就在这个方法里。在看awaitDone 方法前,先看一个FutureTask的一个内部类,WaitNode.WaitNode很简单,是个链表结构,构造函数将thread属性赋值为当前的线程。

static final class WaitNode {

    volatile Thread thread;    volatile WaitNode next;    WaitNode() { thread = Thread.currentThread(); }

}

再看awaitDone的源码。

private int awaitDone(boolean timed, long nanos)

    throws InterruptedException {    final long deadline = timed ? System.nanoTime() + nanos : 0L;    WaitNode q = null;    boolean queued = false;    for (;;) {        //如果本线程已经被中断了,抛出异常        if (Thread.interrupted()) {        //尝试取消链接超时或中断的等待节点,以避免积累垃圾            removeWaiter(q);            throw new InterruptedException();        }                int s = state;        //如果futureTask的状态时最终的状态之一,则直接返回。        if (s > COMPLETING) {            if (q != null)                q.thread = null;            return s;        }        else if (s == COMPLETING) // 如果时执行完成 赋值阶段,将调用get方法的线程挂起            Thread.yield();        else if (q == null)            q = new WaitNode();//如果q==null,创建一个waitNode        else if (!queued)            //如果已经创建了waitNode还没有加入waiters列表,则用cas方式将q.next设置成原来的waiters,将futureTask的waiters指向q            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                 q.next = waiters, q);                                                         else if (timed) {//如果有堵塞的时间限制            nanos = deadline - System.nanoTime();            if (nanos <= 0L) {//如果达到堵塞的时间限制                removeWaiter(q);                return state;            }            //将当前线程挂起一定的时间            LockSupport.parkNanos(this, nanos);        }        else            //将当前线程挂起            LockSupport.park(this);    }}
总结:1、如果开发可以取消的任务,最好采用提供中断策略的方式。      2、FutureTask 任务在开始之前调用cancle,任务将不执行。      3、FutureTask 任务在开始之前调用cancle,如果没有提供中断策略,任务不会正常取消。参考:《Java并发编程实战》       http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/