前言
上一篇察看ThreadPoolExecutor的submit办法的时候,发现了它是靠FutureTask实现后果回调的:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// ## 申明一个可回调工作,实质是一个FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 线程池篇剖析过
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
一、FutureTask应用样例
// 1.申明一个可回调工作
FutureTask<String> task = new FutureTask(()->"hello world");
Thread threadA = new Thread(task);
threadA.start();
// 2.阻塞形式获取工作执行后果:threadA未执行完,以后线程TreadB会阻塞于此
System.out.println(task.get());
执行过程
FutureTask实现了RunnableFuture接口,而RunnableFuture=Runnable接口+Future接口
- 线程A执行start办法时,会调用FutureTask的run()办法(Runnable接口)
run()办法会触发FutureTask外部的state状态变更,并调用Callable的call()办法 - 此时线程B以及其它线程调用FutureTask的get()办法(Future接口),这些线程会阻塞期待run()办法实现
源码层面会构建一个名为waiters的单项链表,以LockSupport.part的模式将线程阻塞在节点上 - call()办法执行实现,state状态最终变更为NORMAL,同时开释阻塞线程
要害属性
// ## 状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 后果返回接口
private Callable<V> callable;
// 线程执行办法的返回后果
private Object outcome; // non-volatile, protected by state reads/writes
// 正在执行callable接口发的线程
private volatile Thread runner;
// 期待节点
private volatile WaitNode waiters;
二、run()
public void run() {
if (state != NEW
// runner变量赋值
|| !UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// NEW状态下执行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用Callable的call办法,获取返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// == call办法执行胜利,设置后果
set(result);
}
}
}
protected void set(V v) {
// 状态变更NEW->COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 执行后果赋值给outcome
outcome = v;
// 状态变更COMPLETING->NORMAL,示意执行实现
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// == 开释期待队列中的阻塞线程
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// cas形式将waiters变量设置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// ## 遍历队列(单向链表)中的WaitNode节点,开释全副的期待线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 提供的监听办法,需用户自定义实现
done();
callable = null; // to reduce footprint
}
三、get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
// NEW和COMPLETING状态触发期待
if (s <= COMPLETING)
// == 期待实现
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// -- 查看状态,如果此时已变成NORMAL则无需期待
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// -- 查看状态,如果此时是COMPLETING,切换其它线程执行
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// -- 新建期待节点
else if (q == null)
q = new WaitNode();
// -- waiters变量赋值
else if (!queued)
// ## 头插
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// -- 有超时设置状况
else if (timed) {
nanos = deadline - System.nanoTime();
// ## 如果已超时,移除节点
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// ## 如果未超时,阻塞指定工夫
LockSupport.parkNanos(this, nanos);
}
// -- 线程阻塞
else
LockSupport.park(this);
}
}
发表回复