前言

上一篇察看ThreadPoolExecutor的submit办法的时候,发现了它是靠FutureTask实现后果回调的:

public <T> Future<T> submit(Callable<T> task) {    if (task == null) throw new NullPointerException();    // ## 申明一个可回调工作,实质是一个FutureTask    RunnableFuture<T> ftask = newTaskFor(task);    // 线程池篇剖析过    execute(ftask);    return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {    return new FutureTask<T>(callable);}

一、FutureTask应用样例

// 1.申明一个可回调工作FutureTask<String> task = new FutureTask(()->"hello world");Thread threadA = new Thread(task);threadA.start();// 2.阻塞形式获取工作执行后果:threadA未执行完,以后线程TreadB会阻塞于此System.out.println(task.get());

执行过程

FutureTask实现了RunnableFuture接口,而RunnableFuture=Runnable接口+Future接口

  1. 线程A执行start办法时,会调用FutureTask的run()办法(Runnable接口)
    run()办法会触发FutureTask外部的state状态变更,并调用Callable的call()办法
  2. 此时线程B以及其它线程调用FutureTask的get()办法(Future接口),这些线程会阻塞期待run()办法实现
    源码层面会构建一个名为waiters的单项链表,以LockSupport.part的模式将线程阻塞在节点上
  3. call()办法执行实现,state状态最终变更为NORMAL,同时开释阻塞线程

要害属性

// ## 状态private volatile int state;private static final int NEW          = 0;private static final int COMPLETING   = 1;private static final int NORMAL       = 2;private static final int EXCEPTIONAL  = 3;private static final int CANCELLED    = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED  = 6;// 后果返回接口private Callable<V> callable;// 线程执行办法的返回后果private Object outcome; // non-volatile, protected by state reads/writes// 正在执行callable接口发的线程private volatile Thread runner;// 期待节点private volatile WaitNode waiters;

二、run()

public void run() {    if (state != NEW         // runner变量赋值        || !UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))        return;            try {        Callable<V> c = callable;        // NEW状态下执行        if (c != null && state == NEW) {            V result;            boolean ran;            try {                // 调用Callable的call办法,获取返回值                result = c.call();                ran = true;            } catch (Throwable ex) {                result = null;                ran = false;                setException(ex);            }            if (ran)                // == call办法执行胜利,设置后果                set(result);        }    } }
protected void set(V v) {    // 状态变更NEW->COMPLETING    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {        // 执行后果赋值给outcome        outcome = v;        // 状态变更COMPLETING->NORMAL,示意执行实现        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state        // == 开释期待队列中的阻塞线程        finishCompletion();    }}
private void finishCompletion() {    // assert state > COMPLETING;    for (WaitNode q; (q = waiters) != null;) {        // cas形式将waiters变量设置为null        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {            // ## 遍历队列(单向链表)中的WaitNode节点,开释全副的期待线程            for (;;) {                Thread t = q.thread;                if (t != null) {                    q.thread = null;                    LockSupport.unpark(t);                }                WaitNode next = q.next;                if (next == null)                    break;                q.next = null; // unlink to help gc                q = next;            }            break;        }    }    // 提供的监听办法,需用户自定义实现    done();    callable = null;        // to reduce footprint}

三、get()

public V get() throws InterruptedException, ExecutionException {    int s = state;    // NEW和COMPLETING状态触发期待    if (s <= COMPLETING)        // == 期待实现        s = awaitDone(false, 0L);    return report(s);}private int awaitDone(boolean timed, long nanos)    throws InterruptedException {    final long deadline = timed ? System.nanoTime() + nanos : 0L;    WaitNode q = null;    boolean queued = false;    for (;;) {        if (Thread.interrupted()) {            removeWaiter(q);            throw new InterruptedException();        }        int s = state;        // -- 查看状态,如果此时已变成NORMAL则无需期待        if (s > COMPLETING) {            if (q != null)                q.thread = null;            return s;        }        // -- 查看状态,如果此时是COMPLETING,切换其它线程执行        else if (s == COMPLETING) // cannot time out yet            Thread.yield();        // -- 新建期待节点        else if (q == null)            q = new WaitNode();        // -- waiters变量赋值        else if (!queued)            // ## 头插            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                 q.next = waiters, q);        // -- 有超时设置状况        else if (timed) {            nanos = deadline - System.nanoTime();            // ## 如果已超时,移除节点            if (nanos <= 0L) {                removeWaiter(q);                return state;            }            // ## 如果未超时,阻塞指定工夫            LockSupport.parkNanos(this, nanos);        }        // -- 线程阻塞        else            LockSupport.park(this);    }}