前言
上一篇察看ThreadPoolExecutor的submit办法的时候,发现了它是靠FutureTask实现后果回调的:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // ## 申明一个可回调工作,实质是一个FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 线程池篇剖析过 execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}
一、FutureTask应用样例
// 1.申明一个可回调工作FutureTask<String> task = new FutureTask(()->"hello world");Thread threadA = new Thread(task);threadA.start();// 2.阻塞形式获取工作执行后果:threadA未执行完,以后线程TreadB会阻塞于此System.out.println(task.get());
执行过程
FutureTask实现了RunnableFuture接口,而RunnableFuture=Runnable接口+Future接口
- 线程A执行start办法时,会调用FutureTask的run()办法(Runnable接口)
run()办法会触发FutureTask外部的state状态变更,并调用Callable的call()办法 - 此时线程B以及其它线程调用FutureTask的get()办法(Future接口),这些线程会阻塞期待run()办法实现
源码层面会构建一个名为waiters的单项链表,以LockSupport.part的模式将线程阻塞在节点上 - call()办法执行实现,state状态最终变更为NORMAL,同时开释阻塞线程
要害属性
// ## 状态private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;// 后果返回接口private Callable<V> callable;// 线程执行办法的返回后果private Object outcome; // non-volatile, protected by state reads/writes// 正在执行callable接口发的线程private volatile Thread runner;// 期待节点private volatile WaitNode waiters;
二、run()
public void run() { if (state != NEW // runner变量赋值 || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // NEW状态下执行 if (c != null && state == NEW) { V result; boolean ran; try { // 调用Callable的call办法,获取返回值 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // == call办法执行胜利,设置后果 set(result); } } }
protected void set(V v) { // 状态变更NEW->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 执行后果赋值给outcome outcome = v; // 状态变更COMPLETING->NORMAL,示意执行实现 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // == 开释期待队列中的阻塞线程 finishCompletion(); }}
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // cas形式将waiters变量设置为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // ## 遍历队列(单向链表)中的WaitNode节点,开释全副的期待线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 提供的监听办法,需用户自定义实现 done(); callable = null; // to reduce footprint}
三、get()
public V get() throws InterruptedException, ExecutionException { int s = state; // NEW和COMPLETING状态触发期待 if (s <= COMPLETING) // == 期待实现 s = awaitDone(false, 0L); return report(s);}private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // -- 查看状态,如果此时已变成NORMAL则无需期待 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // -- 查看状态,如果此时是COMPLETING,切换其它线程执行 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // -- 新建期待节点 else if (q == null) q = new WaitNode(); // -- waiters变量赋值 else if (!queued) // ## 头插 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // -- 有超时设置状况 else if (timed) { nanos = deadline - System.nanoTime(); // ## 如果已超时,移除节点 if (nanos <= 0L) { removeWaiter(q); return state; } // ## 如果未超时,阻塞指定工夫 LockSupport.parkNanos(this, nanos); } // -- 线程阻塞 else LockSupport.park(this); }}