乐趣区

关于人工智能:马士兵AI-人工智能工程师完结无密内置文档资料

download:马士兵 -AI 人工智能工程师 | 完结无密内置文档资料

FutureTask 源码深度剖析
在 JDK 的 FutureTask 当中会使用到一个工具 LockSupport,在正式介绍 FutureTask 之前咱们先熟悉一下这个工具。
LockSupport 次要是用于阻塞和唤醒线程的,它次要是通过包装 UnSafe 类,通过 UnSafe 类当中的方法进行实现的,他底层的方法是通过依赖 JVM 实现的。在 LockSupport 当中次要有以下三个方法:

unpark(Thread thread))方法,这个方法可能给线程 thread 发放一个许可证,你可能通过多次调用这个方法给线程发放许可证,每次调用都会给线程发放一个许可证,然而这个许可证不能够进行累计,也就是说一个线程能够具备的最大的许可证的个数是 1 一个。

park()方法,这个线程会生产调用这个方法的线程一个许可证,因为线程的默认许可证的个数是 0,如果调用一次那么许可证的数目就变成 -1,当许可证的数目小于 0 的时候线程就会阻塞,因此如果线程从来没用调用 unpark 方法的话,那么在调用这个方法的时候会阻塞,如果线程在调用 park 方法之前,有线程调用 unpark(thread)方法,给这个线程发放一个许可证的话,那么调用 park 方法就不会阻塞。

parkNanos(long nanos)方法,同 park 方法一样,nanos 示意最长阻塞超时工夫,超时后 park 方法将主动返回,如果调用这个方法的线程有许可证的话也不会阻塞。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class Demo {

public static void main(String[] args) throws InterruptedException {

Thread thread = new Thread(() -> {LockSupport.park(); // 没有许可证 阻塞住这个线程
  try {TimeUnit.SECONDS.sleep(1);
  } catch (InterruptedException e) {e.printStackTrace();
  }
  System.out.println("阻塞实现");
});
thread.start();
TimeUnit.SECONDS.sleep(2);
LockSupport.unpark(thread); // 给线程 thread 发放一个许可证
System.out.println("线程启动");

}
}

复制代码
下面代码的执行后果
线程启动
阻塞实现
复制代码
从下面代码咱们可能知道 LockSupport.park()可能阻塞一个线程,因为如果没有阻塞的话必定会先打印阻塞实现,因为打印这句话的线程只休眠一秒,主线程休眠两秒。
在源代码当中你可能会遇到 UNSAFE.compareAndSwapXXX 的代码,这行代码次要是进行原子交换操作 CAS,比如:
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)))
复制代码
下面的代码次要是将 this 对象当中的内存偏移地址为 stateOffset 的对象拿进去与 NEW 进行比较,如果等于 NEW 那就将这个值设置为 CANCELLED,这整个操作是原子的(因为可能多个线程同时调用这个函数,因此需要保障操作是原子的),如果操作胜利返回 true 反之返回 false。如果你目前不是很理解也没关系,只需要知道它是将对象 this 的内存偏移为 stateOffset 的值替换为 CANCELLED 就行,如果这个操作胜利返回 true,不胜利返回 false。

FutureTask 回顾
咱们首先来回顾一下 FutureTask 的编程步骤:

写一个类实现 Callable 接口。

@FunctionalInterface
public interface Callable<V> {

/**
 * Computes a result, or throws an exception if unable to do so.
 *
 * @return computed result
 * @throws Exception if unable to compute a result
 */
V call() throws Exception;

}
复制代码
实现接口就实现 call 即可,可能看到这个函数是有返回值的,而 FutureTask 返回给咱们的值就是这个函数的返回值。

new 一个 FutureTask 对象,并且 new 一个第一步写的类,new FutureTask<>(callable 实现类)。
最初将刚刚失去的 FutureTask 对象传入 Thread 类当中,而后启动线程即可 new Thread(futureTask).start();。
而后咱们可能调用 FutureTask 的 get 方法失去返回的后果 futureTask.get();。

可能你会对 FutureTask 的使用形式感觉困惑,或者不是很明显,现在咱们来认真捋一下思路。

首先启动一个线程要么是继承自 Thread 类,而后重写 Thread 类的 run 方法,要么是给 Thread 类传送一个实现了 Runnable 的类对象,当然可能用匿名外部类实现。
既然咱们的 FutureTask 对象可能传送给 Thread 类,说明 FutureTask 必定是实现了 Runnable 接口,事实上也的确如此

可能发现的是 FutureTask 确实实现了 Runnable 接口,同时还实现了 Future 接口,这个 Future 接口次要提供了前面咱们使用 FutureTask 的一系列函数比如 get。

看到这里你应该能够大抵想到在 FutureTask 中的 run 方法会调用 Callable 当中实现的 call 方法,而后将后果保存下来,当调用 get 方法的时候再将这个后果返回。

状态示意
首先咱们先了解一下 FutureTask 的几种状态:

NEW,刚刚新建一个 FutureTask 对象。
COMPLETING,FutureTask 正在执行。
NORMAL,FutureTask 失常结束。
EXCEPTIONAL,如果 FutureTask 对象在执行 Callable 实现类对象的 call 方法的时候出现的异样,那么 FutureTask 的状态就变成这个状态了。
CANCELLED,示意 FutureTask 的执行过程被勾销了。
INTERRUPTING,示意正在终止 FutureTask 对象的执行过程。
INTERRUPTED,示意 FutureTask 对象在执行的过程当中被中断了。

这些状态之间的可能的转移情况如下所示:

NEW -> COMPLETING -> NORMAL。
NEW -> COMPLETING -> EXCEPTIONAL。
NEW -> CANCELLED。
NEW -> INTERRUPTING -> INTERRUPTED。

在 FutureTask 当中用数字去示意这几个状态:
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
复制代码
核心函数和字段

FutureTask 类当中的核心字段
private Callable<V> callable; // 用于保存传入 FutureTask 对象的 Callable 对象
复制代码
private Object outcome; // 用于保存 Callable 当中 call 函数的返回后果
复制代码
private volatile Thread runner; // 示意正在执行 call 函数的线程
复制代码
private volatile WaitNode waiters;// 被 get 函数挂起的线程 是一个单向链表 waiters 示意单向链表的头节点
static final class WaitNode {
volatile Thread thread; // 示意被挂起来的线程
volatile WaitNode next; // 示意下一个节点
WaitNode() { thread = Thread.currentThread(); }
}
复制代码

结构函数:

public FutureTask(Callable<V> callable) {
if (callable == null)

throw new NullPointerException();

this.callable = callable; // 保存传入来的 Callable 接口的实现类对象
this.state = NEW; // 这个就是用来保存 FutureTask 的状态 初识时 是新建状态
}
复制代码

run 方法,这个函数是实现 Runnable 接口的方法,也就是传入 Thread 类之后,Thread 启动时执行的方法。

public void run() {
// 如果 futuretask 的状态 state 不是 NEW
// 或者不能够设置 runner 为以后的线程的话间接返回
// 不在执行上面的代码 因为 state 已经不是 NEW
// 说明勾销了 futuretask 的执行
if (state != NEW ||

  !UNSAFE.compareAndSwapObject(this, runnerOffset,
                               null, Thread.currentThread()))
return;

try {

Callable<V> c = callable;
if (c != null && state == NEW) {
  V result;
  boolean ran; // 这个值次要用于示意 call 函数是否失常执行实现 如果失常执行实现就为 true
  try {result = c.call(); // 执行 call 函数失去咱们需要的返回值并且柏村在
    ran = true;
  } catch (Throwable ex) {
    result = null;
    ran = false;
    setException(ex); // call 函数异样执行 设置 state 为异样状态 并且唤醒由 get 函数阻塞的线程
  }
  if (ran)
    set(result); // call 函数失常执行实现 将失去的后果 result 保存到 outcome 当中 并且唤醒被 get 函数阻塞的线程
}

} finally {

// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果这个 if 语句条件满足的话就示意执行过程被中断了
if (s >= INTERRUPTING)
  // 这个次要是后续处理中断的过程不是很重要
  handlePossibleCancellationInterrupt(s);

}
}

复制代码

set 方法,次要是用于设置 state 的状态,并且唤醒由 get 函数阻塞的线程。

protected void set(V v) {// call 方法失常执行实现执行上面的方法 v 是 call 方法返回的后果
// 这个是原子交换 state 从 NEW 状态变成 COMPLETING 状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = v; // 将 call 函数的返回后果保存到 outcome 当中 而后会在 get 函数当中使用 outcome 
                            // 因为 get 函数需要失去 call 函数的后果 因此咱们需要在 call 函数当中返回 outcome
// 上面代码是将 state 的状态变成 NORMAL 示意程序执行实现
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 因为其余线程可能在调用 get 函数的时候 call 函数还没有执行实现 因此这些线程会被阻塞 上面的这个方法次要是将这些线程唤醒
finishCompletion();

}
}

protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = t; // 将异样作为后果返回
// 将最初的状态设置为 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();

}
}
复制代码

get 方法,这个方法次要是从 FutureTask 当中取出数据,然而这个时候可能 call 函数还没有执行实现,因此这个方法可能会阻塞调用这个方法的线程。

public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果以后的线程还没有执行实现就需要将以后线程挂起
if (s <= COMPLETING)

// 调用 awaitDone 函数将以后的线程挂起
s = awaitDone(false, 0L);

// 如果 state 大于 COMPLETING 也就说是实现状态 可能间接调用这个函数返回
// 当然也可能是从 awaitDone 函数当中复原执行才返回
return report(s);// report 方法的次要作用是将后果 call 函数的返回后果 返回进来 也就是将 outcome 返回

}

private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL) // 如果程序失常执行实现则间接返回后果

return (V)x;

if (s >= CANCELLED) // 如果 s 大于 CANCELLED 说明程序要么是被勾销要么是被中断了 抛出异样

throw new CancellationException();

// 如果下面两种转台都不是那么说明在执行 call 函数的时候程序发生异样
// 还记得咱们在 setException 函数当中将异样赋值给了 outcome 吗?
// 在这里将那个异样抛出了
throw new ExecutionException((Throwable)x);
}

复制代码

awaitDone 方法,这个方法次要是将以后线程挂起。

private int awaitDone(boolean timed, long nanos) // timed 示意是否超时阻塞 nanos 示意如果是超时阻塞的话 超时工夫是几
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {// 注意这是个死循环


// 如果线程被中断了 那么需要从“等待队列”当中移进来
if (Thread.interrupted()) {removeWaiter(q);
  throw new InterruptedException();}

int s = state;
// 如果 call 函数执行实现 注意执行实现可能是失常执行实现 也可能是异样 勾销 中断的任何一个状态
if (s > COMPLETING) {if (q != null)
    q.thread = null;
  return s;
}
// 如果是正在执行的话 说明马上就执行实现了 只差将 call 函数的执行后果赋值给 outcome 了
// 因此可能不进行阻塞先让出 CPU 让其它线程执行 可能下次调度到这个线程 state 的状态很可能就
// 变成 实现了
else if (s == COMPLETING) // cannot time out yet
  Thread.yield();
else if (q == null)
  q = new WaitNode();
else if (!queued) // 如果节点 q 还没有入队
  // 上面这行代码稍微有点简单 其中 waiter 示意等待队列的头节点
  // 这行代码的作用是将 q 节点的 next 指向 waiters 而后将 q 节点
  // 赋值给 waiters 也就是说 q 节点变成等待队列的头节点 整个过程可能用
  // 上面代码标识
  // q.next = waiters;
  // waiters = q;
  // 然而将 q 节点赋值给 waiter 这个操作是原子的 可能胜利也可能不胜利
  // 如果不胜利 因为 for 循环是死循环下次喊 还会进行 if 判断
  // 如果 call 函数已经执行实现失去其返回后果那么就可能间接返回
  // 如果还没有结束 那么就会调用下方的两个 if 分支将线程挂起
  queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                       q.next = waiters, q);
else if (timed) {
  // 如果是使用超时挂起 deadline 示意如果工夫超过这个值的话就可能将线程启动了
  nanos = deadline - System.nanoTime();
  if (nanos <= 0L) {
    // 如果线程等待工夫到了就需要从等待队列当中将以后线程对应的节点移除队列
    removeWaiter(q);
    return state;
  }
  LockSupport.parkNanos(this, nanos);
}
else
  // 如果不是超时阻塞的话 间接将这个线程挂起即可
  // 这个函数后面已经提到了就是将以后线程唤醒
  // 就是将调用 park 方法的线程唤醒
  LockSupport.park(this);

}
}

复制代码

finishCompletion 方法,这个方法是用于将所有被 get 函数阻塞的线程唤醒。

private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {

// 如果可能将 waiter 设置为 null 则进入 for 循环 在 for 循环外部将所有线程唤醒
// 这个操作也是原子操作
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {
    Thread t = q.thread;
    // 如果线程不等于空 则需要将这个线程唤醒
    if (t != null) {
      q.thread = null;
      // 这个函数已经提到了 就是将线程 t 唤醒
      LockSupport.unpark(t);
    }
    // 失去下一个节点
    WaitNode next = q.next;
    // 如果节点为空 说明所有的线程都已经被唤醒了 可能返回了
    if (next == null)
      break;
    q.next = null; // unlink to help gc
    q = next; // 唤醒下一个节点
  }
  break;
}

}

done();// 这个函数是空函数没有实现

callable = null; // to reduce footprint
}

复制代码

cancel 方法,这个方法次要是勾销 FutureTask 的执行过程。

public boolean cancel(boolean mayInterruptIfRunning) {
// 参数 mayInterruptIfRunning 示意可能在线程执行的时候中断

// 只有 state == NEW 并且能够将 state 的状态从 NEW 变成 中断或者勾销才能够执行上面的 try 代码块
// 否则间接返回 false
if (!(state == NEW &&

    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                             mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;

try {// in case call to interrupt throws exception

// 如果在线程执行的时候中断代码就执行上面的逻辑
if (mayInterruptIfRunning) {
  try {
    Thread t = runner; // 失去正在执行 call 函数的线程
    if (t != null)
      t.interrupt();} finally { // final state
    // 将 state 设置为 INTERRUPTED 状态
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
  }
}

} finally {

// 唤醒被 get 函数阻塞的线程
finishCompletion();

}
return true;
}

复制代码
下面谈到了 FutureTask 当中最核心的一些函数,这些过程还是十分复杂的,必须理好思路认真分析才能够真正理解。除了下面的这些函数之外,在 FutureTask 当中还有一些其余的函数没有谈到,因为这些函数不会影响咱们的理解,如果大家感兴趣可能自行去看 FutureTask 源代码!

退出移动版