关于java:FutureTask源码解析

35次阅读

共计 6998 个字符,预计需要花费 18 分钟才能阅读完成。

简介

FutureTask 可用于异步获取执行后果或者勾销工作。通过实现 Runnable 或者 Callable 接口定义工作给 FutureTask,而后间接调用其 run()办法或者把工作放入线程池执行。在内部能够通过 FutureTask 的 get()办法异步获取执行后果。因而 FutureTask 非常适合用于耗时的计算,主线程能够实现本人的工作之后再去获取后果。另外,FutureTask 能够确定即便屡次调用了 run()办法,它只执行一次 Runnable 或者 Callable 工作,获取通过 cancel 勾销 FutureTask 工作。

应用场景

利用 FutureTask 和 ExecutorService,能够用多线程的形式提交计算工作,主线程继续执行其余工作,当主线程须要子线程的计算结果是,再异步获取现成的执行后果
FutureTask 在高并发环境下确保工作只执行一次

应用案例

public class FutureTaskTest {public static void main(String[] args) {FutureTaskTest futureTaskTest = new FutureTaskTest();
        ArrayList<FutureTask<Integer>> taskList = new ArrayList<>();
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            // 传入 Callable 对象创立 FutureTask 对象
            FutureTask<Integer> ft = new FutureTask<Integer>(futureTaskTest.new ComputeTask(i, "" + i));
            taskList.add(ft);
            // 提交给线程池执行工作,也能够通过 exec.invokeAll(taskList)一次性提交所有工作;
            service.submit(ft);
        }
        System.out.println("所有计算工作提交结束, 主线程接着干其余事件!");

        // 开始统计各计算线程计算结果
        Integer totalResult = 0;
        for (FutureTask<Integer> ft : taskList) {
            try {
                //FutureTask 的 get 办法会主动阻塞, 直到获取计算结果为止
                totalResult = totalResult + ft.get();} catch (InterruptedException e) {e.printStackTrace();
            } catch (ExecutionException e) {e.printStackTrace();
            }
        }

        // 敞开线程池
        service.shutdown();
        System.out.println("多任务计算后的总后果是:" + totalResult);
    }
    private class ComputeTask implements Callable<Integer>{
        private Integer result = 0;
        private String taskName = "";
        public ComputeTask(Integer iniResult, String taskName) {
            result = iniResult;
            this.taskName = taskName;
            System.out.println("生成子线程计算工作:" + taskName);
        }
        @Override
        public Integer call() throws Exception {for (int i = 0; i < 100; i++) {result = +i;}
            // 休眠 5 秒钟,察看主线程行为,预期的后果是主线程会继续执行,到要获得 FutureTask 的后果是期待直至实现。Thread.sleep(5000);
            System.out.println("子线程计算工作:" + taskName + "执行实现!");
            return result;
        }
    }
}

在下面的案例中,咱们自定义 ComputeTask 实现了 Callable 接口,自定义工作内容。并创立了一个定长为 5 的线程池和一个 ArrayList 类型的 FutureTask 汇合,而后通过线程池的 subbmit 办法把自定义工作提交给线程池。
而 submit 办法如下

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
咱们能够看到 submit 有三个重载办法,案例里用的是传参为 Callable 的 submit 办法,最终通过 newTaskFor 办法创立了一个 FutureTask 办法提交给了线程池

开启了一个 for 循环,在 for 循环外面向线程池提交了 10 次 task 工作。
最初遍历 list 汇合外面的 FutureTask,通过 get()获取执行后果,对所有后果进行累加。
因为咱们通过 newFixedThreadPool(5)创立了一个定长为 5 的线程池,最大并发数为 5,多余的工作会在队列外面进行期待。所以会先执行五个 task 工作,而后再去执行队列外面的五个 task 工作。最初在输入后果的时候两头会有一个短暂的进展,这个进展是因为开始提交的五个 task 工作执行实现,get()办法获取到了执行后果,然而前面的五个还在队列外面没有执行,所以 get 办法阻塞导致呈现进展。

在很多高并发的环境下,往往咱们只须要某些工作只执行一次。这种应用情景 FutureTask 的个性恰能胜任。

源码解析

数据结构和常量剖析

WaitNode 节点用来存储在前面 get()办法获取工作执行后果呈现阻塞时寄存线程应用。

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
state 示意当前任务执行状态
private volatile int state;
    // 当前任务尚未执行
    private static final int NEW          = 0;
    // 当前任务正在完结,然而没有齐全完结
    private static final int COMPLETING   = 1;
    // 当前任务失常完结
    private static final int NORMAL       = 2;
    // 当前任务执行过程中产生了异样 外部封装的 Callable.run()向上抛出异样
    private static final int EXCEPTIONAL  = 3;
    // 当前任务被勾销
    private static final int CANCELLED    = 4;
    // 当前任务中断中
    private static final int INTERRUPTING = 5;
    // 当前任务曾经中断 中断不是线程进行了。中断只是线程的一个标记,如果 submit 提交的工作齐全没有响应中断
    private static final int INTERRUPTED  = 6;
    
    // submit(runnable/callable) runnable 应用装璜者模式伪装成 callable
    private Callable<V> callable;
    /**
     * 失常状况下工作失常执行完结,outcome 保留执行后果
     * 非正常状况,outcome 保留异样 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /**
     * 当前任务被线程执行期间,保留以后执行工作的线程对象援用 */
    private volatile Thread runner;
    /**
     * 因为会有很多线程去 get 当前任务的后果,这里是用栈保留来多个线程 */
    private volatile WaitNode waiters;

次要办法

run()办法

run()办法是工作执行的入口

 public void run() {
        /**
         * state 不为 new,阐明以后 task 曾经被其余线程解决过,以后线程不解决
         * 通过 CAS 把 task 的 runner 设置为以后线程
         */
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        // 执行到这里,以后 task 肯定是 NEW 状态,而且以后线程抢占 task 胜利
        try {
            // callable 就是程序员本人封装的逻辑,就是下面案例中自定义的 ComputeTask 类
            Callable<V> c = callable;
            // 判断避免提交 task 为 null,内部线程把以后 task 给 cancel 掉
            if (c != null && state == NEW) {
                V result;
                // 工作执行状态
                boolean ran;
                try {
                // 执行自定义工作内容,就是下面 ComputeTask 中的 for 循环对 result 累加操作
                    result = c.call();
                // 设置 task 工作执行状态为 true,示意当前任务曾经执行实现,然而 FutureTask 不是执行实现的状态
                    ran = true;
                } catch (Throwable ex) {
                // 如果执行自定义工作过程中出现异常,就把 result 设置为 null,同时把 FutureTask 返回后果设置为执行过程中抛出的异样
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 设置以后执行线程为 nul
            runner = null;
            // 获取以后 FutureTask 状态
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
get()办法

get 办法就是内部线程获取 task 工作执行后果的办法。如果当前任务状态为 COMPLETING 和 NEW,则把以后线程封装成一个 waitNode 节点进入队列期待,否则就通过 report 办法获取执行后果或者执行后果中的异样,变量 outcome 就是保留工作执行后果或者执行过程中对外抛出的异样。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 以后 task 未执行或者正在完结,调用 get 办法的内部线程会被阻塞
    if (s <= COMPLETING)

        s = awaitDone(false, 0L);
    return report(s);
}
awaitDone() 办法

首先把 waitNode 置为 null,而后进入 for 循环的自旋
CASE 1:
第一次自旋,因为以后 waitNode,且以后内部线程第一次获取执行后果,没有属于本人的 node 节点,所以命中 else if (q == null)这个判断条件,创立属于以后内部线程的 node 节点。
CASE 2:
因为以后线程有了本人的 node 节点,如果当前任务还是没有实现,就会命中 else if (!queued)这个判断条件,通过 CAS 以头插法的模式把以后线程的 node 节点进入期待队列。入队胜利,queued,开始下轮自旋,查看工作状态,如果工作状态不为 NEW 和 COMPLETING,则返回工作执行后果或者异样。如果入队失败,依然获取最新的工作状态,不是 NEW 和 COMPLETING 返回后果或者异样,如果是 NEW 和 COMPLETING 则开始新一次的 CAS 把以后线程的 node 节点入队。如果入队胜利并且当前任务还是 NEW 和 COMPLETING,则以后线程会被 park,晓得被唤醒或者被中断。

 private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        /**
         * 因为 get 办法中,传入的 timed 为 false,如果这个中央 deadline 为 0
         * 
         */
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 因为以后获取后果的线程第一次进入,所以线程对应的 waitNode 节点为 null
        WaitNode q = null;
        // 是否入栈标记
        boolean queued = false;
        // 自旋
        for (;;) {// 如果以后线程是被中断唤醒,interrupted()会返回 true,同时革除中断标记,前面 interrupted()返回的都是 false
            if (Thread.interrupted()) {
                // 栈顶元素出栈
                removeWaiter(q);
                // 向外抛出中断异样
                throw new InterruptedException();}
            // 获取当前任务的最新状态
            int s = state;
            // 当前任务曾经解决并有后果,可能是失常后果,也可能是异样后果
            if (s > COMPLETING) {
                // 阐明以后线程曾经创立了 node 对象,此时将 q.thread 设置为 null,帮忙 GC
                if (q != null)
                    q.thread = null;
                // 返回执行后果
                return s;
            }
            // 阐明当前任务曾经进入实现中,让出 CPU 执行权,开始下一轮的抢占
            else if (s == COMPLETING) // cannot time out yet
            /**
             * java 通过 start 启动线程,让线程变成就绪状态期待 CPU 调度执行,而 yield()办法就是让线程由执行状态变成就绪状态,让出 CPU 工夫片
             * 在下一个线程执行的时候,以后线程可能会被执行,也可能不会被执行
             */
                Thread.yield();
            // 第一次自旋,因为后面 q 曾经为 null,所以这里为以后线程创立一个 waitNode
            else if (q == null)
                q = new WaitNode();
            // 第二次自旋,以后线程曾经有了 WaitNode 对象,然而 node 对象还没有入队
            else if (!queued)
            /**
             * q.next = waiters 把以后节点的 next 节点设置为原来的头结点 waiter,而后把 waiter 指向 q
             * waiter 始终指向队列头结点, 把以后线程 node 节点的 next 指向队列头结点,* 听过 CAS 设置 waiter 指向以后线程 node,设置失败可能其余线程领先设置胜利
             */
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {removeWaiter(q);
                    return state;9
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
            /**
             * 以后线程就会被 park,变为 WAITING
             * 除非有其余线程将你唤醒或者以后线程被中断
             */
                LockSupport.park(this);
        }
    }

这里,FutureTask 的次要办法的源码根本解析实现,咱们能够看到,在 FutrueTask 外面,在并发多线程的场景下,更改值都是通过 unsafe 类的 CAS 形式实现,并且每次操作之前都会获取最新的 state 值来获取当前任务的状态,因为在并发场景下,每次更改之前不获取最新值就间接进行操作的话就可能笼罩掉最新的执行后果。这个思维咱们在当前的高并发场景下能够采纳。

正文完
 0