download:大前端2022版全面降级完结无密内置文档资料
FutureTask源码深度剖析
在JDK的FutureTask当中会使用到一个工具LockSupport,在正式介绍FutureTask之前咱们先熟悉一下这个工具。
LockSupport次要是用于阻塞和唤醒线程的,它次要是通过包装UnSafe类,通过UnSafe类当中的方法进行实现的,他底层的方法是通过依赖JVM实现的。在LockSupport当中次要有以下三个方法:
unpark(Thread thread))方法,这个方法可能给线程thread发放一个许可证,你可能通过多次调用这个方法给线程发放许可证,每次调用都会给线程发放一个许可证,然而这个许可证不能够进行累计,也就是说一个线程能够具备的最大的许可证的个数是1一个。
park()方法,这个线程会生产调用这个方法的线程一个许可证,因为线程的默认许可证的个数是0,如果调用一次那么许可证的数目就变成-1,当许可证的数目小于0的时候线程就会阻塞,因此如果线程从来没用调用unpark方法的话,那么在调用这个方法的时候会阻塞,如果线程在调用park方法之前,有线程调用unpark(thread)方法,给这个线程发放一个许可证的话,那么调用park方法就不会阻塞。
parkNanos(long nanos)方法,同park方法一样,nanos示意最长阻塞超时工夫,超时后park方法将主动返回,如果调用这个方法的线程有许可证的话也不会阻塞。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class Demo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> { LockSupport.park(); // 没有许可证 阻塞住这个线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("阻塞实现");});thread.start();TimeUnit.SECONDS.sleep(2);LockSupport.unpark(thread); //给线程 thread 发放一个许可证System.out.println("线程启动");
}
}
复制代码
下面代码的执行后果
线程启动
阻塞实现
复制代码
从下面代码咱们可能知道LockSupport.park()可能阻塞一个线程,因为如果没有阻塞的话必定会先打印阻塞实现,因为打印这句话的线程只休眠一秒,主线程休眠两秒。
在源代码当中你可能会遇到UNSAFE.compareAndSwapXXX的代码,这行代码次要是进行原子交换操作CAS,比如:
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)))
复制代码
下面的代码次要是将this对象当中的内存偏移地址为stateOffset的对象拿进去与NEW进行比较,如果等于NEW那就将这个值设置为CANCELLED,这整个操作是原子的(因为可能多个线程同时调用这个函数,因此需要保障操作是原子的),如果操作胜利返回true反之返回false。如果你目前不是很理解也没关系,只需要知道它是将对象this的内存偏移为stateOffset的值替换为CANCELLED就行,如果这个操作胜利返回true,不胜利返回false。
FutureTask回顾
咱们首先来回顾一下FutureTask的编程步骤:
写一个类实现Callable接口。
@FunctionalInterface
public interface Callable<V> {
/** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */V call() throws Exception;
}
复制代码
实现接口就实现call即可,可能看到这个函数是有返回值的,而FutureTask返回给咱们的值就是这个函数的返回值。
new一个FutureTask对象,并且new一个第一步写的类,new FutureTask<>(callable实现类)。
最初将刚刚失去的FutureTask对象传入Thread类当中,而后启动线程即可new Thread(futureTask).start();。
而后咱们可能调用FutureTask的get方法失去返回的后果futureTask.get();。
可能你会对FutureTask的使用形式感觉困惑,或者不是很明显,现在咱们来认真捋一下思路。
首先启动一个线程要么是继承自Thread类,而后重写Thread类的run方法,要么是给Thread类传送一个实现了Runnable的类对象,当然可能用匿名外部类实现。
既然咱们的FutureTask对象可能传送给Thread类,说明FutureTask必定是实现了Runnable接口,事实上也的确如此
可能发现的是FutureTask确实实现了Runnable接口,同时还实现了Future接口,这个Future接口次要提供了前面咱们使用FutureTask的一系列函数比如get。
看到这里你应该能够大抵想到在FutureTask中的run方法会调用Callable当中实现的call方法,而后将后果保存下来,当调用get方法的时候再将这个后果返回。
状态示意
首先咱们先了解一下FutureTask的几种状态:
NEW,刚刚新建一个FutureTask对象。
COMPLETING,FutureTask正在执行。
NORMAL,FutureTask失常结束。
EXCEPTIONAL,如果FutureTask对象在执行Callable实现类对象的call方法的时候出现的异样,那么FutureTask的状态就变成这个状态了。
CANCELLED,示意FutureTask的执行过程被勾销了。
INTERRUPTING,示意正在终止FutureTask对象的执行过程。
INTERRUPTED,示意FutureTask对象在执行的过程当中被中断了。
这些状态之间的可能的转移情况如下所示:
NEW -> COMPLETING -> NORMAL。
NEW -> COMPLETING -> EXCEPTIONAL。
NEW -> CANCELLED。
NEW -> INTERRUPTING -> INTERRUPTED。
在FutureTask当中用数字去示意这几个状态:
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
复制代码
核心函数和字段
FutureTask类当中的核心字段
private Callable<V> callable; // 用于保存传入 FutureTask 对象的 Callable 对象
复制代码
private Object outcome; // 用于保存 Callable 当中 call 函数的返回后果
复制代码
private volatile Thread runner; // 示意正在执行 call 函数的线程
复制代码
private volatile WaitNode waiters;// 被 get 函数挂起的线程 是一个单向链表 waiters 示意单向链表的头节点
static final class WaitNode {
volatile Thread thread; // 示意被挂起来的线程
volatile WaitNode next; // 示意下一个节点
WaitNode() { thread = Thread.currentThread(); }
}
复制代码
结构函数:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; //保存传入来的 Callable 接口的实现类对象
this.state = NEW; // 这个就是用来保存 FutureTask 的状态 初识时 是新建状态
}
复制代码
run方法,这个函数是实现Runnable接口的方法,也就是传入Thread类之后,Thread启动时执行的方法。
public void run() {
// 如果 futuretask 的状态 state 不是 NEW
// 或者不能够设置 runner 为以后的线程的话间接返回
// 不在执行上面的代码 因为 state 已经不是NEW
// 说明勾销了 futuretask 的执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))return;
try {
Callable<V> c = callable;if (c != null && state == NEW) { V result; boolean ran; // 这个值次要用于示意 call 函数是否失常执行实现 如果失常执行实现就为 true try { result = c.call(); // 执行 call 函数失去咱们需要的返回值并且柏村在 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // call 函数异样执行 设置 state 为异样状态 并且唤醒由 get 函数阻塞的线程 } if (ran) set(result); // call 函数失常执行实现 将失去的后果 result 保存到 outcome 当中 并且唤醒被 get 函数阻塞的线程}
} finally {
// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;// 如果这个if语句条件满足的话就示意执行过程被中断了if (s >= INTERRUPTING) // 这个次要是后续处理中断的过程不是很重要 handlePossibleCancellationInterrupt(s);
}
}
复制代码
set方法,次要是用于设置state的状态,并且唤醒由get函数阻塞的线程。
protected void set(V v) { // call 方法失常执行实现执行上面的方法 v 是 call 方法返回的后果
// 这个是原子交换 state 从 NEW 状态变成 COMPLETING 状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 将 call 函数的返回后果保存到 outcome 当中 而后会在 get 函数当中使用 outcome // 因为 get 函数需要失去 call 函数的后果 因此咱们需要在 call 函数当中返回 outcome// 上面代码是将 state 的状态变成 NORMAL 示意程序执行实现UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 因为其余线程可能在调用 get 函数的时候 call 函数还没有执行实现 因此这些线程会被阻塞 上面的这个方法次要是将这些线程唤醒finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t; // 将异样作为后果返回// 将最初的状态设置为 EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();
}
}
复制代码
get方法,这个方法次要是从FutureTask当中取出数据,然而这个时候可能call函数还没有执行实现,因此这个方法可能会阻塞调用这个方法的线程。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果以后的线程还没有执行实现就需要将以后线程挂起
if (s <= COMPLETING)
// 调用 awaitDone 函数将以后的线程挂起s = awaitDone(false, 0L);
// 如果 state 大于 COMPLETING 也就说是实现状态 可能间接调用这个函数返回
// 当然也可能是从 awaitDone 函数当中复原执行才返回
return report(s);// report 方法的次要作用是将后果 call 函数的返回后果 返回进来 也就是将 outcome 返回
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL) // 如果程序失常执行实现则间接返回后果
return (V)x;
if (s >= CANCELLED) // 如果 s 大于 CANCELLED 说明程序要么是被勾销要么是被中断了 抛出异样
throw new CancellationException();
// 如果下面两种转台都不是那么说明在执行 call 函数的时候程序发生异样
// 还记得咱们在 setException 函数当中将异样赋值给了 outcome 吗?
// 在这里将那个异样抛出了
throw new ExecutionException((Throwable)x);
}
复制代码
awaitDone方法,这个方法次要是将以后线程挂起。
private int awaitDone(boolean timed, long nanos) // timed 示意是否超时阻塞 nanos 示意如果是超时阻塞的话 超时工夫是几
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) { // 注意这是个死循环
// 如果线程被中断了 那么需要从 “等待队列” 当中移进来if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException();}int s = state;// 如果 call 函数执行实现 注意执行实现可能是失常执行实现 也可能是异样 勾销 中断的任何一个状态if (s > COMPLETING) { if (q != null) q.thread = null; return s;}// 如果是正在执行的话 说明马上就执行实现了 只差将 call 函数的执行后果赋值给 outcome 了// 因此可能不进行阻塞先让出 CPU 让其它线程执行 可能下次调度到这个线程 state 的状态很可能就// 变成 实现了else if (s == COMPLETING) // cannot time out yet Thread.yield();else if (q == null) q = new WaitNode();else if (!queued) // 如果节点 q 还没有入队 // 上面这行代码稍微有点简单 其中 waiter 示意等待队列的头节点 // 这行代码的作用是将 q 节点的 next 指向 waiters 而后将 q 节点 // 赋值给 waiters 也就是说 q 节点变成等待队列的头节点 整个过程可能用 // 上面代码标识 // q.next = waiters; // waiters = q; // 然而将 q 节点赋值给 waiter这个操作是原子的 可能胜利也可能不胜利 // 如果不胜利 因为 for 循环是死循环下次喊 还会进行 if 判断 // 如果 call 函数已经执行实现失去其返回后果那么就可能间接返回 // 如果还没有结束 那么就会调用下方的两个 if 分支将线程挂起 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) { // 如果是使用超时挂起 deadline 示意如果工夫超过这个值的话就可能将线程启动了 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // 如果线程等待工夫到了就需要从等待队列当中将以后线程对应的节点移除队列 removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos);}else // 如果不是超时阻塞的话 间接将这个线程挂起即可 // 这个函数后面已经提到了就是将以后线程唤醒 // 就是将调用 park 方法的线程唤醒 LockSupport.park(this);
}
}
复制代码
finishCompletion方法,这个方法是用于将所有被get函数阻塞的线程唤醒。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 如果可能将 waiter 设置为 null 则进入 for 循环 在 for 循环外部将所有线程唤醒// 这个操作也是原子操作if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; // 如果线程不等于空 则需要将这个线程唤醒 if (t != null) { q.thread = null; // 这个函数已经提到了 就是将线程 t 唤醒 LockSupport.unpark(t); } // 失去下一个节点 WaitNode next = q.next; // 如果节点为空 说明所有的线程都已经被唤醒了 可能返回了 if (next == null) break; q.next = null; // unlink to help gc q = next; // 唤醒下一个节点 } break;}
}
done();// 这个函数是空函数没有实现
callable = null; // to reduce footprint
}
复制代码
cancel方法,这个方法次要是勾销FutureTask的执行过程。
public boolean cancel(boolean mayInterruptIfRunning) {
// 参数 mayInterruptIfRunning 示意可能在线程执行的时候中断
// 只有 state == NEW 并且能够将 state 的状态从 NEW 变成 中断或者勾销才能够执行上面的 try 代码块
// 否则间接返回 false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;
try { // in case call to interrupt throws exception
// 如果在线程执行的时候中断代码就执行上面的逻辑if (mayInterruptIfRunning) { try { Thread t = runner; // 失去正在执行 call 函数的线程 if (t != null) t.interrupt(); } finally { // final state // 将 state 设置为 INTERRUPTED 状态 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}
} finally {
// 唤醒被 get 函数阻塞的线程finishCompletion();
}
return true;
}
复制代码
下面谈到了FutureTask当中最核心的一些函数,这些过程还是十分复杂的,必须理好思路认真分析才能够真正理解。除了下面的这些函数之外,在FutureTask当中还有一些其余的函数没有谈到,因为这些函数不会影响咱们的理解,如果大家感兴趣可能自行去看FutureTask源代码!