追踪解析 FutureTask 源码

35次阅读

共计 4821 个字符,预计需要花费 13 分钟才能阅读完成。

零 前期准备
0 FBI WARNING
文章异常啰嗦且绕弯。
1 版本
JDK 版本 : OpenJDK 11.0.1
IDE : idea 2018.3
2 ThreadLocal 简介
FutureTask 是 jdk 中默认的 Future 实现类,常与 Callable 结合进行多线程并发操作。
3 Demo
import java.util.concurrent.*;

public class FutureTaskDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(1);
try{
// 创建一个要执行的 Callable 对象
// 此处其实 Runnable 对象也可以,但是通常不会那样做
Callable<String> task = () -> {
// 休眠三秒
TimeUnit.SECONDS.sleep(3);
// 返回一个字符串
return “hello”;
};

// 用 FutureTask 对象去包装 Callable
FutureTask<String> futureTask = new FutureTask<>(task);

// 此处将 FutureTask 对象丢进线程池里
pool.submit(futureTask);

// 注意,此处的 futureTask 本质上是作为 Runnable 被丢进池子里的
// 所以也可以用线程池的 execute(…) 方法
//pool.execute(futureTask)

// 还有一种更常见的执行方式是直接使用 Thread
//new Thread(futureTask).start();

// 获取结果
// 注意,如果没有获取到的话此处会阻塞线程直到获取到为止
String result = futureTask.get();

// 还有一种限时策略的结果获取
// 超时的情况下会抛出异常
//String result = futureTask.get(1,TimeUnit.SECONDS);

System.out.println(result);
}finally {
// 关闭连接池
pool.shutdown();
}

}
}
一 FutureTask 的创建
回到 Demo 中的创建代码:
FutureTask<String> futureTask = new FutureTask<>(task);
追踪 FutureTask 的构造器:
//FutureTask.class
public FutureTask(Callable<V> callable) {
// 有效性判断,不能为空
if (callable == null)
throw new NullPointerException();
// 记录下 callable 对象
this.callable = callable;
//state 是一个 int 类型的对象,是一个
//NEW = 0
this.state = NEW;
}
二 run
FutureTask 本身是 Runnable 的子类,其在被 ThreadPoolExecutor 或者 Thread 对象消费的时候也是被当做 Runnable 的实现类的。
所以其本身的核心逻辑就必然在 run() 方法中:
//FutureTask.class
public void run() {

// 先判断状态,如果状态不是 NEW 就会直接返回
//RUNNER 是一个 VarHandler 类型的变量,指向了 FutureTask 中的 thread 变量,用于储存当前的线程
// 但是如果 thread 已经不为 null,此处也会直接返回
// 这两种返回条件都意味着此 FutureTask 的 run() 方法已经执行过了
if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;

try {
// 获取 callable
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行 callable 的业务逻辑
result = c.call();
//ran 为成功标识
ran = true;
} catch (Throwable ex) {
// 出错的情况下
result = null;
ran = false;
// 不成功的情况下存入 exception
setException(ex);
}
// 如果成功的话会在此处进行操作
if (ran)
set(result);
}
} finally {
// 置空
runner = null;
int s = state;
if (s >= INTERRUPTING)
// 如果此 FutreTask 的状态是中断状态,会在此处不断调用 Thread.yield() 空转
handlePossibleCancellationInterrupt(s);
}
}
此处有两个关键方法,即为 setException(…) 和 set(…):
//FutureTask.class
protected void setException(Throwable t) {
// 用 CAS 操作比较并更新状态值
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
//outcome 是一个 Object 对象,用于存储 callable 的返回值
// 此处由于报错了,所以储存的是错误对象
outcome = t;
//EXCEPTIONAL = 3
STATE.setRelease(this, EXCEPTIONAL);
// 最后清理工作,主要用于唤醒等待线程和执行 callable
finishCompletion();
}
}

//FutureTask.class
protected void set(V v) {
// 基本逻辑和 setException(…) 方法雷同,只是 STATE 和 outcome 的储存值不同
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL);
finishCompletion();
}
}
再来看 finishCompletion() 方法:
//FutureTask.class
private void finishCompletion() {
//WaitNode 是 FutureTask 的静态内部类
// 其本质上是单向链表的节点表示类,用于存放想要获取 Callable 的返回值但是被阻塞的线程的线程对象
for (WaitNode q; (q = waiters) != null;) {
// 此处使用 CAS 将 q 从 WAITERS 里去除
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
// 此处置空线程对象,帮助 GC
q.thread = null;
// 唤醒线程
LockSupport.unpark(t);
}
// 接着往下遍历
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
// 此方法是空的
done();
// 置空 callable
callable = null;
}
之前提到过在 FutureTask 的 get(…) 方法中会阻塞线程,直到 Callable 执行完毕并能够获取返回值的时候才会结束阻塞。
所以 finishCompletion() 方法的主体其实就是去唤醒被阻塞的线程。
三 get
回到 Demo 中的创建代码:
String result = futureTask.get();
追踪 get() 方法:
//step 1
//FutureTask.class
public V get() throws InterruptedException, ExecutionException {
// 此处先判断状态值,如果非 COMPLETING,即为还没完成,就会调用 awaitDone(…) 方法阻塞线程
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回结果
return report(s);
}

//step 2
//FutureTask.class
private V report(int s) throws ExecutionException {
// 获取需要返回的对象
Object x = outcome;
// 如果是正常结束的就直接返回对象即可
if (s == NORMAL)
return (V)x;
// 出错的情况下,抛异常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
再来看一下阻塞线程的 awaitDone(…) 方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException {

// 循环的次数
long startTime = 0L;
// 节点对象
WaitNode q = null;
// 链表队列标识,代表该线程是否被加入链表中,初始为 false 代表未被加入
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {// 如果 Callable 的执行已经完成
if (q != null)
q.thread = null;
return s;
}else if (s == COMPLETING) //Callable 的执行刚刚完成,后续工作还没做
Thread.yield();
else if (Thread.interrupted()) {
// 线程被中断了,会抛出错误
removeWaiter(q);
throw new InterruptedException();
} else if (q == null) {// 进入此处的判断证明 Callable 还未完成,所以会创建等待节点
// 此处的 timed 传入为 false,不会在此返回
if (timed && nanos <= 0L)
return s;
q = new WaitNode(); // 新建节点
}else if (!queued)
//queued 初始为 false,进入此处的时候会将上一个判断条件中新建的 q 加入到链表的首节点中
// 并且 queued 变成 true
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
// 如果此操作是限时的,那么这里需要判断时间
final long parkNanos;
if (startTime == 0L) {
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() – startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos – elapsed;
}
if (state < COMPLETING)
// 此处挂起线程,时间为 parkNanos
// 本例中传入为 0L,所以是永久挂起
LockSupport.parkNanos(this, parkNanos);
}else
// 永久挂起线程
LockSupport.park(this);
}
}
四 一点唠叨
FutureTask 和 ThreadLocal 一样,都是 java.util.current 包中的小工具,封装不复杂,理解即可。

本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

正文完
 0