关于java:JAVA并发编程Future接口及FutureTask的实现分析

27次阅读

共计 4722 个字符,预计需要花费 12 分钟才能阅读完成。

一、Future 接口

Future 接口示意 异步工作的后果,提供了 cancel 勾销、isDone 是否已实现、get 有限期待或超时期待获取后果等操作,源码如下:

public interface Future<V> {
    // 尝试勾销工作的执行
    // 如果 mayInterruptIfRunning 为 true,则尝试中断该线程
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断是否在实现前勾销
    boolean isCancelled();
    // 判断是否已实现
    boolean isDone();
    // 有限期待获取后果,反对中断
    V get() throws InterruptedException, ExecutionException;
    // 超时期待获取后果,反对中断
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

二、FutureTask

FutureTask 实现了 Runnable 和 Future 接口,所以它既是工作,也是异步工作的后果。如下类图

在 ThreadPoolExecutor 线程池执行器 submit 提交工作 时,会返回一个 Future(如果调用的是 execute,则不会返回 Future),那么这个 Future 其实是一个 FutureTask 对象,它会将传进来的 Runnable 工作封装为 FutureTask,再将封装后的 FutureTask 传入 execute() 办法中,并间接返回该 FutureTask,源码如下:

    public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 调用具体子类的 execute 办法
        execute(ftask);
        return ftask;
    }


    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
    }

当咱们提交工作之后拿到了这个 FutureTask 之后,就能够调用 Future 接口提供的各种操作(如 cancel 勾销工作、isDone 是否已实现、get 有限期待或超时期待获取后果)。那么 FutureTask 是如何来具体实现这些操作的呢?

2.1 要害属性

FutureTask 外部有几个要害属性,如下:

  • Callable<V> callable:对应的工作
  • Object outcome:工作实现的后果或抛出的异样
  • Thread runner:运行该工作的线程
  • WaitNode waiters:期待 FutureTask 实现的线程(能够有多个,以单链表的模式存储)

2.2 状态治理

FutureTask 通过对 工作状态的治理,来实现各种操作,状态有以下几种:

    private static final int NEW          = 0;    // 初始状态
    private static final int COMPLETING   = 1;    // 实现中
    private static final int NORMAL       = 2;      // 失常实现
    private static final int EXCEPTIONAL  = 3;    // 异样实现
    private static final int CANCELLED    = 4;    // 已勾销
    private static final int INTERRUPTING = 5;    // 中断中
    private static final int INTERRUPTED  = 6;    // 已中断

可能存在的状态转换,有以下几种:

  1. NEW -> COMPLETING -> NORMAL
  2. NEW -> COMPLETING -> EXCEPTIONAL
  3. NEW -> CANCELLED
  4. NEW -> INTERRUPTING -> INTERRUPTED

2.3 工作的运行过程

当咱们向 ThreadPoolExecutor 进行 submit 提交工作时,外部执行了 execute(FutureTask)办法,其实质就是工作线程执行 FutureTask 工作的 run 办法,所以这里要看一下 FutureTask 的 run 办法是如何来切换工作状态的?源码如下:

public void run() {
        // 当工作状态不为 NEW 时,间接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {result = c.call();
                    // 如果失常实现,则 ran 为 true
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    // 如果有异样,则 ran 为 false
                    ran = false;
                    // 将 outcome 设置为 ex,批改工作状态为 EXCEPTIONAL,并唤醒期待线程
                    setException(ex);
                }
                // 如果失常实现,将 outcome 设置为 result,批改工作状态为 NORMAL,并唤醒期待线程
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

2.4 勾销工作

通过调用 cancel(mayInterruptIfRunning)办法 尝试勾销工作的执行

  1. 如果工作状态不为初始状态 NEW,间接返回 false,示意勾销失败。
  2. 否则进一步判断 mayInterruptIfRunning,如果为 true,将工作状态批改为 INTERRUPTING,如果是 flase,则将工作状态批改为 CANCELLED。
  3. 判断 mayInterruptIfRunning 是否为 true,如果是,则调用 runner 线程的 interrupt()中断办法,并将工作状态批改为 INTERRUPTED
  4. 最终唤醒正在期待的线程 waiters
  5. 返回 true,示意勾销胜利

源码如下:

public boolean cancel(boolean mayInterruptIfRunning) {
        // 判断如果 state 不为 NEW,则间接返回 false,否则通过 CAS 尝试批改 state 的值
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {// 如果 mayInterruptIfRunning 为 true,调用线程的 interrupt()办法
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();} finally { // final state
                    // 更新 state 为 INTERRUPTED,putOrderedInt 不保障立刻对其余线程可见
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 唤醒正在期待的线程
            finishCompletion();}
        return true;
    }

2.5 有限期待或超时期待工作后果

能够通过 get()来 可中断的有限期待或超时期待 工作的后果,如下:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 如果状态 <= COMPLETING,则有限期待,直到被唤醒或中断
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)
            throw new NullPointerException();
        int s = state;
        // 如果状态 <= COMPLETING,则进行超时期待
        // 如果超时期待后,工作仍然还没有实现,则抛出 TimeoutException 异样
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

那么接下来就来看下 awaitDone()办法的具体实现,源码如下:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 计算最初期限 deadline
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            // 如果线程被中断,从 waiters 中移除,并间接抛出 InterruptedException 异样
            if (Thread.interrupted()) {removeWaiter(q);
                throw new InterruptedException();}

            int s = state;
            // 判断如果工作状态 > COMPLETING,阐明工作曾经实现,间接返回工作状态
            if (s > COMPLETING) {if (q != null)
                    q.thread = null;
                return s;
            }
            // 判断工作状态 == COMPLETING,阐明工作还没有实现,然而行将实现
            // 所以如果工作状态始终为 COMPLETING,则会始终死循环
            else if (s == COMPLETING) 
                Thread.yield();
            else if (q == null)        // 如果走到这里,阐明工作状态为 NEW,则创立一个期待节点
                q = new WaitNode();
            else if (!queued)          // 走到这里,阐明该期待节点还没有入队,则通过 CAS 将该期待节点插到链表头部
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {    // 如果为超时期待
                nanos = deadline - System.nanoTime();
                // 如果 nanos <= 0,阐明曾经超时,从 waiters 中移除,并返回工作状态
                // 否则阐明还没有超时,通过 LockSupport.parkNanos 来限时挂起
                if (nanos <= 0L) {removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                // 将以后线程挂起
                LockSupport.park(this);
        }
    }

能够看到,awaitDone()外部还是通过 LockSupport.park()或 parkNanos()办法来将线程挂起或限时挂起来实现的。

正文完
 0