共计 3313 个字符,预计需要花费 9 分钟才能阅读完成。
前言
上一篇察看 ThreadPoolExecutor 的 submit 办法的时候,发现了它是靠 FutureTask 实现后果回调的:
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException(); | |
// ## 申明一个可回调工作,实质是一个 FutureTask | |
RunnableFuture<T> ftask = newTaskFor(task); | |
// 线程池篇剖析过 | |
execute(ftask); | |
return ftask; | |
} | |
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable); | |
} |
一、FutureTask 应用样例
// 1. 申明一个可回调工作 | |
FutureTask<String> task = new FutureTask(()->"hello world"); | |
Thread threadA = new Thread(task); | |
threadA.start(); | |
// 2. 阻塞形式获取工作执行后果:threadA 未执行完,以后线程 TreadB 会阻塞于此 | |
System.out.println(task.get()); |
执行过程
FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture=Runnable 接口 +Future 接口
- 线程 A 执行 start 办法时,会调用 FutureTask 的 run()办法(Runnable 接口)
run()办法会触发 FutureTask 外部的 state 状态变更,并调用 Callable 的 call()办法 - 此时线程 B 以及其它线程调用 FutureTask 的 get()办法(Future 接口),这些线程会阻塞期待 run()办法实现
源码层面会构建一个名为 waiters 的单项链表,以 LockSupport.part 的模式将线程阻塞在节点上 - call()办法执行实现,state 状态最终变更为 NORMAL,同时开释阻塞线程
要害属性
// ## 状态 | |
private volatile int state; | |
private static final int NEW = 0; | |
private static final int COMPLETING = 1; | |
private static final int NORMAL = 2; | |
private static final int EXCEPTIONAL = 3; | |
private static final int CANCELLED = 4; | |
private static final int INTERRUPTING = 5; | |
private static final int INTERRUPTED = 6; | |
// 后果返回接口 | |
private Callable<V> callable; | |
// 线程执行办法的返回后果 | |
private Object outcome; // non-volatile, protected by state reads/writes | |
// 正在执行 callable 接口发的线程 | |
private volatile Thread runner; | |
// 期待节点 | |
private volatile WaitNode waiters; |
二、run()
public void run() { | |
if (state != NEW | |
// runner 变量赋值 | |
|| !UNSAFE.compareAndSwapObject(this, runnerOffset, | |
null, Thread.currentThread())) | |
return; | |
try { | |
Callable<V> c = callable; | |
// NEW 状态下执行 | |
if (c != null && state == NEW) { | |
V result; | |
boolean ran; | |
try { | |
// 调用 Callable 的 call 办法,获取返回值 | |
result = c.call(); | |
ran = true; | |
} catch (Throwable ex) { | |
result = null; | |
ran = false; | |
setException(ex); | |
} | |
if (ran) | |
// == call 办法执行胜利,设置后果 | |
set(result); | |
} | |
} | |
} |
protected void set(V v) { | |
// 状态变更 NEW->COMPLETING | |
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { | |
// 执行后果赋值给 outcome | |
outcome = v; | |
// 状态变更 COMPLETING->NORMAL,示意执行实现 | |
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state | |
// == 开释期待队列中的阻塞线程 | |
finishCompletion();} | |
} |
private void finishCompletion() { | |
// assert state > COMPLETING; | |
for (WaitNode q; (q = waiters) != null;) { | |
// cas 形式将 waiters 变量设置为 null | |
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { | |
// ## 遍历队列(单向链表)中的 WaitNode 节点,开释全副的期待线程 | |
for (;;) { | |
Thread t = q.thread; | |
if (t != null) { | |
q.thread = null; | |
LockSupport.unpark(t); | |
} | |
WaitNode next = q.next; | |
if (next == null) | |
break; | |
q.next = null; // unlink to help gc | |
q = next; | |
} | |
break; | |
} | |
} | |
// 提供的监听办法,需用户自定义实现 | |
done(); | |
callable = null; // to reduce footprint | |
} |
三、get()
public V get() throws InterruptedException, ExecutionException { | |
int s = state; | |
// NEW 和 COMPLETING 状态触发期待 | |
if (s <= COMPLETING) | |
// == 期待实现 | |
s = awaitDone(false, 0L); | |
return report(s); | |
} | |
private int awaitDone(boolean timed, long nanos) | |
throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L; | |
WaitNode q = null; | |
boolean queued = false; | |
for (;;) {if (Thread.interrupted()) {removeWaiter(q); | |
throw new InterruptedException();} | |
int s = state; | |
// -- 查看状态,如果此时已变成 NORMAL 则无需期待 | |
if (s > COMPLETING) {if (q != null) | |
q.thread = null; | |
return s; | |
} | |
// -- 查看状态,如果此时是 COMPLETING,切换其它线程执行 | |
else if (s == COMPLETING) // cannot time out yet | |
Thread.yield(); | |
// -- 新建期待节点 | |
else if (q == null) | |
q = new WaitNode(); | |
// -- waiters 变量赋值 | |
else if (!queued) | |
// ## 头插 | |
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, | |
q.next = waiters, q); | |
// -- 有超时设置状况 | |
else if (timed) {nanos = deadline - System.nanoTime(); | |
// ## 如果已超时,移除节点 | |
if (nanos <= 0L) {removeWaiter(q); | |
return state; | |
} | |
// ## 如果未超时,阻塞指定工夫 | |
LockSupport.parkNanos(this, nanos); | |
} | |
// -- 线程阻塞 | |
else | |
LockSupport.park(this); | |
} | |
} |
正文完