关于java:JUC可回调任务FutureTask原理

37次阅读

共计 3313 个字符,预计需要花费 9 分钟才能阅读完成。

前言

上一篇察看 ThreadPoolExecutor 的 submit 办法的时候,发现了它是靠 FutureTask 实现后果回调的:

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
    // ## 申明一个可回调工作,实质是一个 FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 线程池篇剖析过
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}

一、FutureTask 应用样例

// 1. 申明一个可回调工作
FutureTask<String> task = new FutureTask(()->"hello world");
Thread threadA = new Thread(task);
threadA.start();

// 2. 阻塞形式获取工作执行后果:threadA 未执行完,以后线程 TreadB 会阻塞于此
System.out.println(task.get());

执行过程

FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture=Runnable 接口 +Future 接口

  1. 线程 A 执行 start 办法时,会调用 FutureTask 的 run()办法(Runnable 接口)
    run()办法会触发 FutureTask 外部的 state 状态变更,并调用 Callable 的 call()办法
  2. 此时线程 B 以及其它线程调用 FutureTask 的 get()办法(Future 接口),这些线程会阻塞期待 run()办法实现
    源码层面会构建一个名为 waiters 的单项链表,以 LockSupport.part 的模式将线程阻塞在节点上
  3. call()办法执行实现,state 状态最终变更为 NORMAL,同时开释阻塞线程

要害属性

// ## 状态
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

// 后果返回接口
private Callable<V> callable;
// 线程执行办法的返回后果
private Object outcome; // non-volatile, protected by state reads/writes
// 正在执行 callable 接口发的线程
private volatile Thread runner;
// 期待节点
private volatile WaitNode waiters;

二、run()

public void run() {
    if (state != NEW 
        // runner 变量赋值
        || !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
        
    try {
        Callable<V> c = callable;
        // NEW 状态下执行
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 调用 Callable 的 call 办法,获取返回值
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                // == call 办法执行胜利,设置后果
                set(result);
        }
    } 
}
protected void set(V v) {
    // 状态变更 NEW->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 执行后果赋值给 outcome
        outcome = v;
        // 状态变更 COMPLETING->NORMAL,示意执行实现
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // == 开释期待队列中的阻塞线程
        finishCompletion();}
}
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // cas 形式将 waiters 变量设置为 null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // ## 遍历队列(单向链表)中的 WaitNode 节点,开释全副的期待线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    // 提供的监听办法,需用户自定义实现
    done();

    callable = null;        // to reduce footprint
}

三、get()

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // NEW 和 COMPLETING 状态触发期待
    if (s <= COMPLETING)
        // == 期待实现
        s = awaitDone(false, 0L);
    return report(s);
}


private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {if (Thread.interrupted()) {removeWaiter(q);
            throw new InterruptedException();}

        int s = state;
        // -- 查看状态,如果此时已变成 NORMAL 则无需期待
        if (s > COMPLETING) {if (q != null)
                q.thread = null;
            return s;
        }
        // -- 查看状态,如果此时是 COMPLETING,切换其它线程执行
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // -- 新建期待节点
        else if (q == null)
            q = new WaitNode();
        // -- waiters 变量赋值
        else if (!queued)
            // ## 头插
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // -- 有超时设置状况
        else if (timed) {nanos = deadline - System.nanoTime();
            // ## 如果已超时,移除节点
            if (nanos <= 0L) {removeWaiter(q);
                return state;
            }
            // ## 如果未超时,阻塞指定工夫
            LockSupport.parkNanos(this, nanos);
        }
        // -- 线程阻塞
        else
            LockSupport.park(this);
    }
}

正文完
 0