乐趣区

关于java:FutureTask源码深度剖析

FutureTask 源码深度分析

前言

在后面的文章本人入手写 FutureTask 当中咱们曾经仔细分析了 FutureTask 给咱们提供的性能,并且深入分析了咱们该如何实现它的性能,并且给出了应用 ReentrantLock 和条件变量实现 FutureTask 的具体代码。而在本篇文章当中咱们将认真介绍 JDK 外部是如何实现 FutureTask 的。(如果对 FutureTask 的外部大抵过程还不是很理解的话,能够先浏览本人入手写 FutureTask)。

工具筹备

在 JDK 的 FutureTask 当中会应用到一个工具 LockSupport,在正式介绍FutureTask 之前咱们先相熟一下这个工具。

LockSupport次要是用于阻塞和唤醒线程的,它次要是通过包装 UnSafe 类,通过 UnSafe 类当中的办法进行实现的,他底层的办法是通过依赖 JVM 实现的。在 LockSupport 当中次要有以下三个办法:

  • unpark(Thread thread))办法,这个办法能够给线程 thread 发放一个 许可证 ,你能够通过屡次调用这个办法给线程发放 许可证 ,每次调用都会给线程发放一个 许可证 ,然而这个 许可证 不可能进行累计,也就是说一个线程可能领有的最大的许可证的个数是 1 一个。
  • park()办法,这个线程会生产调用这个办法的线程一个许可证,因为线程的默认许可证的个数是 0,如果调用一次那么许可证的数目就变成 -1,当许可证的数目小于 0 的时候线程就会阻塞,因而如果线程素来没用调用 unpark 办法的话,那么在调用这个办法的时候会阻塞,如果线程在调用 park 办法之前,有线程调用 unpark(thread) 办法,给这个线程发放一个许可证的话,那么调用 park 办法就不会阻塞。
  • parkNanos(long nanos)办法,同 park 办法一样,nanos 示意最长阻塞超时工夫,超时后 park 办法将主动返回,如果调用这个办法的线程有许可证的话也不会阻塞。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class Demo {public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(() -> {LockSupport.park(); // 没有许可证 阻塞住这个线程
      try {TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {e.printStackTrace();
      }
      System.out.println("阻塞实现");
    });
    thread.start();
    TimeUnit.SECONDS.sleep(2);
    LockSupport.unpark(thread); // 给线程 thread 发放一个许可证
    System.out.println("线程启动");

  }
}

下面代码的执行后果

线程启动
阻塞实现

从下面代码咱们能够晓得 LockSupport.park() 能够阻塞一个线程,因为如果没有阻塞的话必定会先打印 阻塞实现,因为打印这句话的线程只休眠一秒,主线程休眠两秒。

在源代码当中你能够会遇到 UNSAFE.compareAndSwapXXX 的代码,这行代码次要是进行原子替换操作CAS,比方:

UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)))

下面的代码次要是将 this 对象当中的内存偏移地址为 stateOffset 的对象拿进去与 NEW 进行比拟,如果等于 NEW 那就将这个值设置为 CANCELLED,这整个操作是原子的(因为可能多个线程同时调用这个函数,因而须要保障操作是原子的),如果操作胜利返回true 反之返回 false。如果你目前不是很了解也没关系,只须要晓得它是将对象this 的内存偏移为 stateOffset 的值替换为 CANCELLED 就行,如果这个操作胜利返回true,不胜利返回false

深刻 FutureTask 外部

FutureTask 回顾

咱们首先来回顾一下 FutureTask 的编程步骤:

  • 写一个类实现 Callable 接口。
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;}

实现接口就实现 call 即可,能够看到这个函数是有返回值的,而 FutureTask 返回给咱们的值就是这个函数的返回值。

  • new一个 FutureTask 对象,并且 new 一个第一步写的类,new FutureTask<>(callable 实现类)
  • 最初将刚刚失去的 FutureTask 对象传入 Thread 类当中,而后启动线程即可new Thread(futureTask).start();
  • 而后咱们能够调用 FutureTaskget办法失去返回的后果futureTask.get();

可能你会对 FutureTask 的应用形式感觉困惑,或者不是很分明,当初咱们来认真捋一下思路。

  1. 首先启动一个线程要么是继承自 Thread 类,而后重写 Thread 类的 run 办法,要么是给 Thread 类传递一个实现了 Runnable 的类对象,当然能够用匿名外部类实现。
  2. 既然咱们的 FutureTask 对象能够传递给 Thread 类,阐明 FutureTask 必定是实现了 Runnable 接口,事实上也的确如此:

​ 能够发现的是 FutureTask 的确实现了 Runnable 接口,同时还实现了 Future 接口,这个 Future 接口次要提供了前面咱们应用 FutureTask 的一系列函数比方get

  1. 看到这里你应该可能大抵想到在 FutureTask 中的 run 办法会调用 Callable 当中实现的 call 办法,而后将后果保留下来,当调用 get 办法的时候再将这个后果返回。

状态示意

首先咱们先理解一下 FutureTask 的几种状态:

  • NEW,刚刚新建一个 FutureTask 对象。
  • COMPLETING,FutureTask 正在执行。
  • NORMAL,FutureTask 失常完结。
  • EXCEPTIONAL,如果 FutureTask 对象在执行 Callable 实现类对象的 call 办法的时候呈现的异样,那么 FutureTask 的状态就变成这个状态了。
  • CANCELLED,示意 FutureTask 的执行过程被勾销了。
  • INTERRUPTING,示意正在终止 FutureTask 对象的执行过程。
  • INTERRUPTED,示意 FutureTask 对象在执行的过程当中被中断了。

这些状态之间的可能的转移状况如下所示:

  • NEW -> COMPLETING -> NORMAL。
  • NEW -> COMPLETING -> EXCEPTIONAL。
  • NEW -> CANCELLED。
  • NEW -> INTERRUPTING -> INTERRUPTED。

FutureTask 当中用数字去示意这几个状态:

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

外围函数和字段

  • FutureTask类当中的外围字段

    private Callable<V> callable; // 用于保留传入 FutureTask 对象的 Callable 对象
    private Object outcome; // 用于保留 Callable 当中 call 函数的返回后果
    private volatile Thread runner; // 示意正在执行 call 函数的线程
    private volatile WaitNode waiters;// 被 get 函数挂起的线程 是一个单向链表 waiters 示意单向链表的头节点
    static final class WaitNode {
      volatile Thread thread; // 示意被挂起来的线程
      volatile WaitNode next; // 示意下一个节点
      WaitNode() { thread = Thread.currentThread(); }
    }
  • 构造函数:
public FutureTask(Callable<V> callable) {if (callable == null)
    throw new NullPointerException();
  this.callable = callable; // 保留传入来的 Callable 接口的实现类对象
  this.state = NEW;       // 这个就是用来保留 FutureTask 的状态 初识时 是新建状态
}
  • run办法,这个函数是实现 Runnable 接口的办法,也就是传入 Thread 类之后,Thread启动时执行的办法。
public void run() {
  // 如果 futuretask 的状态 state 不是 NEW
  // 或者不可能设置 runner 为以后的线程的话间接返回
  // 不在执行上面的代码 因为 state 曾经不是 NEW 
  // 阐明勾销了 futuretask 的执行
  if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                   null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
      V result;
      boolean ran; // 这个值次要用于示意 call 函数是否失常执行实现 如果失常执行实现就为 true
      try {result = c.call(); // 执行 call 函数失去咱们须要的返回值并且柏村在
        ran = true;
      } catch (Throwable ex) {
        result = null;
        ran = false;
        setException(ex); // call 函数异样执行 设置 state 为异样状态 并且唤醒由 get 函数阻塞的线程
      }
      if (ran)
        set(result); // call 函数失常执行实现 将失去的后果 result 保留到 outcome 当中 并且唤醒被 get 函数阻塞的线程
    }
  } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    // 如果这个 if 语句条件满足的话就示意执行过程被中断了
    if (s >= INTERRUPTING)
      // 这个次要是后续解决中断的过程不是很重要
      handlePossibleCancellationInterrupt(s);
  }
}
  • set办法,次要是用于设置 state 的状态,并且唤醒由 get 函数阻塞的线程。
protected void set(V v) { // call 办法失常执行实现执行上面的办法 v 是 call 办法返回的后果
  // 这个是原子替换 state 从 NEW 状态变成 COMPLETING 状态
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v; // 将 call 函数的返回后果保留到 outcome 当中 而后会在 get 函数当中应用 outcome 
                                // 因为 get 函数须要失去 call 函数的后果 因而咱们须要在 call 函数当中返回 outcome
    // 上面代码是将 state 的状态变成 NORMAL 示意程序执行实现
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    // 因为其余线程可能在调用 get 函数的时候 call 函数还没有执行实现 因而这些线程会被阻塞 上面的这个办法次要是将这些线程唤醒
    finishCompletion();}
}

protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = t; // 将异样作为后果返回
    // 将最初的状态设置为 EXCEPTIONAL
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    finishCompletion();}
}
  • get办法,这个办法次要是从 FutureTask 当中取出数据,然而这个时候可能 call 函数还没有执行实现,因而这个办法可能会阻塞调用这个办法的线程。
public V get() throws InterruptedException, ExecutionException {
  int s = state;
  // 如果以后的线程还没有执行实现就须要将以后线程挂起
  if (s <= COMPLETING)
    // 调用 awaitDone 函数将以后的线程挂起
    s = awaitDone(false, 0L);
  // 如果 state 大于 COMPLETING 也就说是实现状态 能够间接调用这个函数返回 
  // 当然也能够是从 awaitDone 函数当中复原执行才返回
  return report(s);// report 办法的次要作用是将后果 call 函数的返回后果 返回进来 也就是将 outcome 返回
  
}

private V report(int s) throws ExecutionException {
  Object x = outcome;
  if (s == NORMAL) // 如果程序失常执行实现则间接返回后果
    return (V)x;
  if (s >= CANCELLED) // 如果 s 大于 CANCELLED 阐明程序要么是被勾销要么是被中断了 抛出异样
    throw new CancellationException();
  // 如果下面两种转台都不是那么阐明在执行 call 函数的时候程序产生异样
  // 还记得咱们在 setException 函数当中将异样赋值给了 outcome 吗?// 在这里将那个异样抛出了
  throw new ExecutionException((Throwable)x);
}
  • awaitDone办法,这个办法次要是将以后线程挂起。
private int awaitDone(boolean timed, long nanos) // timed 示意是否超时阻塞  nanos 示意如果是超时阻塞的话 超时工夫是多少
  throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;
  WaitNode q = null;
  boolean queued = false;
  for (;;) { // 留神这是个死循环
    
    // 如果线程被中断了 那么须要从“期待队列”当中移出去
    if (Thread.interrupted()) {removeWaiter(q);
      throw new InterruptedException();}

    int s = state;
    // 如果 call 函数执行实现 留神执行实现可能是失常执行实现 也可能是异样 勾销 中断的任何一个状态
    if (s > COMPLETING) {if (q != null)
        q.thread = null;
      return s;
    }
    // 如果是正在执行的话 阐明马上就执行实现了 只差将 call 函数的执行后果赋值给 outcome 了
    // 因而能够不进行阻塞先让出 CPU 让其它线程执行 可能下次调度到这个线程 state 的状态很可能就
    // 变成 实现了
    else if (s == COMPLETING) // cannot time out yet
      Thread.yield();
    else if (q == null)
      q = new WaitNode();
    else if (!queued) // 如果节点 q 还没有入队
      // 上面这行代码略微有点简单 其中 waiter 示意期待队列的头节点
      // 这行代码的作用是将 q 节点的 next 指向 waiters 而后将 q 节点
      // 赋值给 waiters 也就是说 q 节点变成期待队列的头节点 整个过程能够用
      // 上面代码标识
      // q.next = waiters;
      // waiters = q;
      // 然而将 q 节点赋值给 waiter 这个操作是原子的 可能胜利也可能不胜利
      // 如果不胜利 因为 for 循环是死循环下次喊 还会进行 if 判断
      // 如果 call 函数曾经执行实现失去其返回后果那么就能够间接返回
      // 如果还没有完结 那么就会调用下方的两个 if 分支将线程挂起
      queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                           q.next = waiters, q);
    else if (timed) {
      // 如果是应用超时挂起 deadline 示意如果工夫超过这个值的话就能够将线程启动了
      nanos = deadline - System.nanoTime();
      if (nanos <= 0L) {
        // 如果线程等待时间到了就须要从期待队列当中将以后线程对应的节点移除队列
        removeWaiter(q);
        return state;
      }
      LockSupport.parkNanos(this, nanos);
    }
    else
      // 如果不是超时阻塞的话 间接将这个线程挂起即可
      // 这个函数后面曾经提到了就是将以后线程唤醒
      // 就是将调用 park 办法的线程唤醒
      LockSupport.park(this);
  }
}
  • finishCompletion办法,这个办法是用于将所有被 get 函数阻塞的线程唤醒。
private void finishCompletion() {
  // assert state > COMPLETING;
  for (WaitNode q; (q = waiters) != null;) {
    // 如果能够将 waiter 设置为 null 则进入 for 循环 在 for 循环外部将所有线程唤醒
    // 这个操作也是原子操作
    if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {
        Thread t = q.thread;
        // 如果线程不等于空 则须要将这个线程唤醒
        if (t != null) {
          q.thread = null;
          // 这个函数曾经提到了 就是将线程 t 唤醒
          LockSupport.unpark(t);
        }
        // 失去下一个节点
        WaitNode next = q.next;
        // 如果节点为空 阐明所有的线程都曾经被唤醒了 能够返回了
        if (next == null)
          break;
        q.next = null; // unlink to help gc
        q = next; // 唤醒下一个节点
      }
      break;
    }
  }

  done();// 这个函数是空函数没有实现

  callable = null;        // to reduce footprint
}
  • cancel办法,这个办法次要是勾销 FutureTask 的执行过程。
public boolean cancel(boolean mayInterruptIfRunning) {
  // 参数 mayInterruptIfRunning 示意可能在线程执行的时候中断
  
  // 只有 state == NEW 并且可能将 state 的状态从 NEW 变成 中断或者勾销才可能执行上面的 try 代码块
  // 否则间接返回 false
  if (!(state == NEW &&
        UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                                 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;
  try {    // in case call to interrupt throws exception
    // 如果在线程执行的时候中断代码就执行上面的逻辑
    if (mayInterruptIfRunning) {
      try {
        Thread t = runner; // 失去正在执行 call 函数的线程
        if (t != null)
          t.interrupt();} finally { // final state
        // 将 state 设置为 INTERRUPTED 状态
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
      }
    }
  } finally {
    // 唤醒被 get 函数阻塞的线程
    finishCompletion();}
  return true;
}

下面谈到了 FutureTask 当中最外围的一些函数,这些过程还是非常复杂的,必须理好思路仔细分析才可能真正了解。除了下面的这些函数之外,在 FutureTask 当中还有一些其余的函数没有谈到,因为这些函数不会影响咱们的了解,如果大家感兴趣能够自行去看 FutureTask 源代码!

总结

在本篇文章当中次要探讨了以下问题:

  • LockSupportparkunpark办法,次要用于阻塞和唤醒线程,更加确切的说是给线程发放凭证,当凭证的数据小于 0 的时候线程就会阻塞。
  • UNSAFE.compareAndSwapXXX办法,这个办法次要是进行原子替换(CAS 过程),判断对象的某个内存偏移地址的值是否与指定的值相等,如果相等则进行替换,如果以上操作胜利执行则返回 true 否则返回false,同时这个操作是具备原子性的。
  • get 办法当中,如果 state 的状态小于或者等于 COMPLETING,则须要调用函数awaitDone 将线程挂起,否则间接返回后果即可。
  • run办法是整个 FutureTask 最外围的办法,在这个办法当中会调用传入 FutureTask 对象的 Callable 对象当中的 call 办法,而后将其的返回值保留到 outcome 当中,最初会将所有被 get 函数阻塞的线程都唤醒。
  • finishCompletion办法,是将期待队列当中所有的被阻塞的线程全副唤醒。

如果大家是第一次接触这些问题的话,了解起来的难度还是十分大的。如果大家对一些根本的工具还不够相熟能够先去缓缓相熟这些根本工具,在相熟完根本工具之后就不会在细节问题上卡住了。大家也能够先理解整体的过程,而后再去剖析细节,这样大家对大局观的把握有了,去剖析细节问题也能够得心应手。


更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

退出移动版