关于异步:以两种异步模型应用案例深度解析Future接口

3次阅读

共计 16919 个字符,预计需要花费 43 分钟才能阅读完成。

摘要:本文以理论案例的模式剖析了两种异步模型,并从源码角度深度解析 Future 接口和 FutureTask 类。

本文分享自华为云社区《【精通高并发系列】两种异步模型与深度解析 Future 接口(一)!》,作者:冰 河。

本文以理论案例的模式剖析了两种异步模型,并从源码角度深度解析 Future 接口和 FutureTask 类,心愿大家踏下心来,关上你的 IDE,跟着文章看源码,置信你肯定播种不小!

一、两种异步模型

在 Java 的并发编程中,大体上会分为两种异步编程模型,一类是间接以异步的模式来并行运行其余的工作,不须要返回工作的后果数据。一类是以异步的模式运行其余工作,须要返回后果。

1. 无返回后果的异步模型

无返回后果的异步工作,能够间接将工作丢进线程或线程池中运行,此时,无奈间接取得工作的执行后果数据,一种形式是能够应用回调办法来获取工作的运行后果。

具体的计划是:定义一个回调接口,并在接口中定义接管工作后果数据的办法,具体逻辑在回调接口的实现类中实现。将回调接口与工作参数一起放进线程或线程池中运行,工作运行后调用接口办法,执行回调接口实现类中的逻辑来处理结果数据。这里,给出一个简略的示例供参考。

  • 定义回调接口
package io.binghe.concurrent.lab04;

/**
 * @author binghe
 * @version 1.0.0
 * @description 定义回调接口
 */
public interface TaskCallable<T> {T callable(T t);
}

便于接口的通用型,这里为回调接口定义了泛型。

  • 定义工作后果数据的封装类
package io.binghe.concurrent.lab04;

import java.io.Serializable;

/**
 * @author binghe
 * @version 1.0.0
 * @description 工作执行后果
 */
public class TaskResult implements Serializable {
    private static final long serialVersionUID = 8678277072402730062L;
    /**
     * 工作状态
     */
    private Integer taskStatus;

    /**
     * 工作音讯
     */
    private String taskMessage;

    /**
     * 工作后果数据
     */
    private String taskResult;
 
    // 省略 getter 和 setter 办法
    @Override
    public String toString() {
        return "TaskResult{" +
                "taskStatus=" + taskStatus +
                ", taskMessage='" + taskMessage + '\'' +
                ", taskResult='" + taskResult + '\'' +
                '}';
    }
}
  • 创立回调接口的实现类

回调接口的实现类次要用来对工作的返回后果进行相应的业务解决,这里,为了不便演示,只是将后果数据返回。大家须要依据具体的业务场景来做相应的剖析和解决。

package io.binghe.concurrent.lab04;

/**
 * @author binghe
 * @version 1.0.0
 * @description 回调函数的实现类
 */
public class TaskHandler implements TaskCallable<TaskResult> {
    @Override
public TaskResult callable(TaskResult taskResult) {
//TODO 拿到后果数据后进一步解决
    System.out.println(taskResult.toString());
        return taskResult;
    }
}
  • 创立工作的执行类

工作的执行类是具体执行工作的类,实现 Runnable 接口,在此类中定义一个回调接口类型的成员变量和一个 String 类型的工作参数(模仿工作的参数),并在构造方法中注入回调接口和工作参数。在 run 办法中执行工作,工作实现后将工作的后果数据封装成 TaskResult 对象,调用回调接口的办法将 TaskResult 对象传递到回调办法中。

package io.binghe.concurrent.lab04;

/**
 * @author binghe
 * @version 1.0.0
 * @description 工作执行类
 */
public class TaskExecutor implements Runnable{
    private TaskCallable<TaskResult> taskCallable;
    private String taskParameter;

    public TaskExecutor(TaskCallable<TaskResult> taskCallable, String taskParameter){
        this.taskCallable = taskCallable;
        this.taskParameter = taskParameter;
    }

    @Override
    public void run() {
        //TODO 一系列业务逻辑, 将后果数据封装成 TaskResult 对象并返回
        TaskResult result = new TaskResult();
        result.setTaskStatus(1);
        result.setTaskMessage(this.taskParameter);
        result.setTaskResult("异步回调胜利");
        taskCallable.callable(result);
    }
}

到这里,整个大的框架算是实现了,接下来,就是测试看是否获取到异步工作的后果了。

  • 异步工作测试类
package io.binghe.concurrent.lab04;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试回调
 */
public class TaskCallableTest {public static void main(String[] args){TaskCallable<TaskResult> taskCallable = new TaskHandler();
        TaskExecutor taskExecutor = new TaskExecutor(taskCallable, "测试回调工作");
        new Thread(taskExecutor).start();}
}

在测试类中,应用 Thread 类创立一个新的线程,并启动线程运行工作。运行程序最终的接口数据如下所示。

TaskResult{taskStatus=1, taskMessage='测试回调工作', taskResult='异步回调胜利'}

大家能够细细品味下这种获取异步后果的形式。这里,只是简略的应用了 Thread 类来创立并启动线程,也能够应用线程池的形式实现。大家可自行实现以线程池的形式通过回调接口获取异步后果。

2. 有返回后果的异步模型

只管应用回调接口可能获取异步工作的后果,然而这种形式应用起来略显简单。在 JDK 中提供了能够间接返回异步后果的解决计划。最罕用的就是应用 Future 接口或者其实现类 FutureTask 来接管工作的返回后果。

  • 应用 Future 接口获取异步后果

应用 Future 接口往往配合线程池来获取异步执行后果,如下所示。

package io.binghe.concurrent.lab04;

import java.util.concurrent.*;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试 Future 获取异步后果
 */
public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> future = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {return "测试 Future 获取异步后果";}
        });
        System.out.println(future.get());
        executorService.shutdown();}
}

运行后果如下所示。

测试 Future 获取异步后果

  • 应用 FutureTask 类获取异步后果

FutureTask 类既能够联合 Thread 类应用也能够联合线程池应用,接下来,就看下这两种应用形式。

联合 Thread 类的应用示例如下所示。

package io.binghe.concurrent.lab04;

import java.util.concurrent.*;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试 FutureTask 获取异步后果
 */
public class FutureTaskTest {public static void main(String[] args)throws ExecutionException, InterruptedException{FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {return "测试 FutureTask 获取异步后果";}
        });
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}

运行后果如下所示。

测试 FutureTask 获取异步后果
联合线程池的应用示例如下。

package io.binghe.concurrent.lab04;

import java.util.concurrent.*;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试 FutureTask 获取异步后果
 */
public class FutureTaskTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newSingleThreadExecutor();
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {return "测试 FutureTask 获取异步后果";}
        });
        executorService.execute(futureTask);
        System.out.println(futureTask.get());
        executorService.shutdown();}
}

运行后果如下所示。

测试 FutureTask 获取异步后果
能够看到应用 Future 接口或者 FutureTask 类来获取异步后果比应用回调接口获取异步后果简略多了。留神:实现异步的形式很多,这里只是用多线程举例。

二、深度解析 Future 接口

1.Future 接口

Future 是 JDK1.5 新增的异步编程接口,其源代码如下所示。

package java.util.concurrent;

public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

能够看到,在 Future 接口中,总共定义了 5 个形象办法。接下来,就别离介绍下这 5 个办法的含意。

cancel(boolean)
勾销工作的执行,接管一个 boolean 类型的参数,胜利勾销工作,则返回 true,否则返回 false。当工作曾经实现,曾经完结或者因其余起因不能取消时,办法会返回 false,示意工作勾销失败。当工作未启动调用了此办法,并且后果返回 true(勾销胜利),则当前任务不再运行。如果工作曾经启动,会依据以后传递的 boolean 类型的参数来决定是否中断以后运行的线程来勾销以后运行的工作。

  • isCancelled()

判断工作在实现之前是否被勾销,如果在工作实现之前被勾销,则返回 true;否则,返回 false。

这里须要留神一个细节:只有工作未启动,或者在实现之前被勾销,才会返回 true,示意工作曾经被胜利勾销。其余状况都会返回 false。

  • isDone()

判断工作是否曾经实现,如果工作失常完结、抛出异样退出、被勾销,都会返回 true,示意工作曾经实现。

这里须要留神一个细节:只有工作未启动,或者在实现之前被勾销,才会返回 true,示意工作曾经被胜利勾销。其余状况都会返回 false。

  • isDone()

判断工作是否曾经实现,如果工作失常完结、抛出异样退出、被勾销,都会返回 true,示意工作曾经实现。

  • get()

当工作实现时,间接返回工作的后果数据;当工作未实现时,期待工作实现并返回工作的后果数据。

  • get(long, TimeUnit)

当工作实现时,间接返回工作的后果数据;当工作未实现时,期待工作实现,并设置了超时等待时间。在超时工夫内工作实现,则返回后果;否则,抛出 TimeoutException 异样。

2.RunnableFuture 接口

Future 接口有一个重要的子接口,那就是 RunnableFuture 接口,RunnableFuture 接口岂但继承了 Future 接口,而且继承了 java.lang.Runnable 接口,其源代码如下所示。

package java.util.concurrent;

public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}

这里,问一下,RunnableFuture 接口中有几个形象办法?想好了再说!哈哈哈。。。

这个接口比较简单 run()办法就是运行工作时调用的办法。

3.FutureTask 类

FutureTask 类是 RunnableFuture 接口的一个十分重要的实现类,它实现了 RunnableFuture 接口、Future 接口和 Runnable 接口的所有办法。FutureTask 类的源代码比拟多,这个就不粘贴了,大家自行到 java.util.concurrent 下查看。

(1)FutureTask 类中的变量与常量

在 FutureTask 类中首先定义了一个状态变量 state,这个变量应用了 volatile 关键字润饰,这里,大家只须要晓得 volatile 关键字通过内存屏障和禁止重排序优化来实现线程平安,后续会独自深度剖析 volatile 关键字是如何保障线程平安的。紧接着,定义了几个工作运行时的状态常量,如下所示。

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

其中,代码正文中给出了几个可能的状态变更流程,如下所示。

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

接下来,定义了其余几个成员变量,如下所示。

private Callable<V> callable;
private Object outcome; 
private volatile Thread runner;
private volatile WaitNode waiters;

又看到咱们所相熟的 Callable 接口了,Callable 接口那必定就是用来调用 call()办法执行具体任务了。

  • outcome:Object 类型,示意通过 get()办法获取到的后果数据或者异样信息。
  • runner:运行 Callable 的线程,运行期间会应用 CAS 保障线程平安,这里大家只须要晓得 CAS 是 Java 保障线程平安的一种形式,后续文章中会深度剖析 CAS 如何保障线程平安。
  • waiters:WaitNode 类型的变量,示意期待线程的堆栈,在 FutureTask 的实现中,会通过 CAS 联合此堆栈替换工作的运行状态。

看一下 WaitNode 类的定义,如下所示。

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

能够看到,WaitNode 类是 FutureTask 类的动态外部类,类中定义了一个 Thread 成员变量和指向下一个 WaitNode 节点的援用。其中通过构造方法将 thread 变量设置为以后线程。

(2)构造方法

接下来,是 FutureTask 的两个构造方法,比较简单,如下所示。

public FutureTask(Callable<V> callable) {if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);
    this.state = NEW;
}

(3)是否勾销与实现办法

持续向下看源码,看到一个工作是否勾销的办法,和一个工作是否实现的办法,如下所示。

public boolean isCancelled() {return state >= CANCELLED;}

public boolean isDone() {return state != NEW;}

这两办法中,都是通过判断工作的状态来断定工作是否已勾销和已实现的。为啥会这样判断呢?再次查看 FutureTask 类中定义的状态常量发现,其常量的定义是有法则的,并不是随便定义的。其中,大于或者等于 CANCELLED 的常量为 CANCELLED、INTERRUPTING 和 INTERRUPTED,这三个状态均能够示意线程曾经被勾销。当状态不等于 NEW 时,能够示意工作曾经实现。

通过这里,大家能够学到一点:当前在编码过程中,要依照法则来定义本人应用的状态,尤其是波及到业务中有频繁的状态变更的操作,有法则的状态可使业务解决变得事倍功半,这也是通过看他人的源码设计可能学到的,这里,倡议大家还是多看他人写的优良的开源框架的源码。

(4)勾销办法

咱们持续向下看源码,接下来,看到的是 cancel(boolean)办法,如下所示。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();} finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {finishCompletion();
    }
    return true;
}

接下来,拆解 cancel(boolean)办法。在 cancel(boolean)办法中,首先判断工作的状态和 CAS 的操作后果,如果工作的状态不等于 NEW 或者 CAS 的操作返回 false,则间接返回 false,示意工作勾销失败。如下所示。

if (!(state == NEW &&
      UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
          mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;

接下来,在 try 代码块中,首先判断是否能够中断当前任务所在的线程来勾销工作的运行。如果能够中断当前任务所在的线程,则以一个 Thread 长期变量来指向运行工作的线程,当指向的变量不为空时,调用线程对象的 interrupt()办法来中断线程的运行,最初将线程标记为被中断的状态。如下所示。

try {if (mayInterruptIfRunning) {
        try {
            Thread t = runner;
            if (t != null)
                t.interrupt();} finally { // final state
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
    }
}

这里,发现变更工作状态应用的是 UNSAFE.putOrderedInt()办法,这个办法是个什么鬼呢?点进去看一下,如下所示。

public native void putOrderedInt(Object var1, long var2, int var4);

能够看到,又是一个本地办法,嘿嘿,这里先不论它,后续文章会详解这些办法的作用。

接下来,cancel(boolean)办法会进入 finally 代码块,如下所示。

finally {finishCompletion();
}

能够看到在 finallly 代码块中调用了 finishCompletion()办法,顾名思义,finishCompletion()办法示意结束任务的运行,接下来看看它是如何实现的。点到 finishCompletion()办法中看一下,如下所示。

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

在 finishCompletion()办法中,首先定义一个 for 循环,循环终止因子为 waiters 为 null,在循环中,判断 CAS 操作是否胜利,如果胜利进行 if 条件中的逻辑。首先,定义一个 for 自旋循环,在自旋循环体中,唤醒 WaitNode 堆栈中的线程,使其运行实现。当 WaitNode 堆栈中的线程运行实现后,通过 break 退出外层 for 循环。接下来调用 done()办法。done()办法又是个什么鬼呢?点进去看一下,如下所示。


protected void done() {}

能够看到,done()办法是一个空的办法体,交由子类来实现具体的业务逻辑。

当咱们的具体业务中,须要在勾销工作时,执行一些额定的业务逻辑,能够在子类中覆写 done()办法的实现。

(5)get()办法

持续向下看 FutureTask 类的代码,FutureTask 类中实现了两个 get()办法,如下所示。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

没参数的 get()办法为当工作未运行实现时,会阻塞,直到返回工作后果。有参数的 get()办法为当工作未运行实现,并且等待时间超出了超时工夫,会 TimeoutException 异样。

两个 get()办法的次要逻辑差不多,一个没有超时设置,一个有超时设置,这里说一下次要逻辑。判断工作的以后状态是否小于或者等于 COMPLETING,也就是说,工作是 NEW 状态或者 COMPLETING,调用 awaitDone()办法,看下 awaitDone()办法的实现,如下所示。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {if (Thread.interrupted()) {removeWaiter(q);
            throw new InterruptedException();}

        int s = state;
        if (s > COMPLETING) {if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

接下来,拆解 awaitDone()办法。在 awaitDone()办法中,最重要的就是 for 自旋循环,在循环中首先判断以后线程是否被中断,如果曾经被中断,则调用 removeWaiter()将以后线程从堆栈中移除,并且抛出 InterruptedException 异样,如下所示。

if (Thread.interrupted()) {removeWaiter(q);
    throw new InterruptedException();}

接下来,判断工作的以后状态是否实现,如果实现,并且堆栈句柄不为空,则将堆栈中的以后线程设置为空,返回当前任务的状态,如下所示。

int s = state;
if (s > COMPLETING) {if (q != null)
        q.thread = null;
    return s;
}

当工作的状态为 COMPLETING 时,使以后线程让出 CPU 资源,如下所示。

else if (s == COMPLETING)
    Thread.yield();

如果堆栈为空,则创立堆栈对象,如下所示。

else if (q == null)
    q = new WaitNode();

如果 queued 变量为 false,通过 CAS 操作为 queued 赋值,如果 awaitDone()办法传递的 timed 参数为 true,则计算超时工夫,当工夫已超时,则在堆栈中移除以后线程并返回工作状态,如下所示。如果未超时,则重置超时工夫,如下所示。

else if (!queued)
    queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {removeWaiter(q);
        return state;
    }
    LockSupport.parkNanos(this, nanos);
}

如果不满足上述的所有条件,则将以后线程设置为期待状态,如下所示。

else
    LockSupport.park(this);

接下来,回到 get()办法中,当 awaitDone()办法返回后果,或者工作的状态不满足条件时,都会调用 report()办法,并将当前任务的状态传递到 report()办法中,并返回后果,如下所示。

return report(s);
看来,这里还要看下 report()办法啊,点进去看下 report()办法的实现,如下所示。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

能够看到,report()办法的实现比较简单,首先,将 outcome 数据赋值给 x 变量,接下来,次要是判断接管到的工作状态,如果状态为 NORMAL,则将 x 强转为泛型类型返回;当工作的状态大于或者等于 CANCELLED,也就是工作曾经勾销,则抛出 CancellationException 异样,其余状况则抛出 ExecutionException 异样。

至此,get()办法剖析实现。留神:肯定要了解 get()办法的实现,因为 get()办法是咱们应用 Future 接口和 FutureTask 类时,应用的比拟频繁的一个办法。

(6)set()办法与 setException()办法

持续看 FutureTask 类的代码,接下来看到的是 set()办法与 setException()办法,如下所示。

protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();}
}

protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();}
}

通过源码能够看出,set()办法与 setException()办法整体逻辑简直一样,只是在设置工作状态时一个将状态设置为 NORMAL,一个将状态设置为 EXCEPTIONAL。

至于 finishCompletion()办法,后面曾经剖析过。

(7)run()办法与 runAndReset()办法

接下来,就是 run()办法了,run()办法的源代码如下所示。

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

能够这么说,只有应用了 Future 和 FutureTask,就必然会调用 run()办法来运行工作,把握 run()办法的流程是十分有必要的。在 run()办法中,如果以后状态不是 NEW,或者 CAS 操作返回的后果为 false,则间接返回,不再执行后续逻辑,如下所示。

if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
    return;

接下来,在 try 代码块中,将成员变量 callable 赋值给一个长期变量 c,判断长期变量不等于 null,并且工作状态为 NEW,则调用 Callable 接口的 call()办法,并接管后果数据。并将 ran 变量设置为 true。当程序抛出异样时,将接管后果的变量设置为 null,ran 变量设置为 false,并且调用 setException()办法将工作的状态设置为 EXCEPTIONA。接下来,如果 ran 变量为 true,则调用 set()办法,如下所示。

try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {result = c.call();
            ran = true;
        } catch (Throwable ex) {
            result = null;
            ran = false;
            setException(ex);
        }
        if (ran)
            set(result);
    }
}

接下来,程序会进入 finally 代码块中,如下所示。

finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
}

这里,将 runner 设置为 null,如果工作的以后状态大于或者等于 INTERRUPTING,也就是线程被中断了。则调用 handlePossibleCancellationInterrupt()办法,接下来,看下 handlePossibleCancellationInterrupt()办法的实现。

private void handlePossibleCancellationInterrupt(int s) {if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield();}

能够看到,handlePossibleCancellationInterrupt()办法的实现比较简单,当工作的状态为 INTERRUPTING 时,应用 while()循环,条件为当前任务状态为 INTERRUPTING,将以后线程占用的 CPU 资源开释,也就是说,当工作运行实现后,开释线程所占用的资源。

runAndReset()办法的逻辑与 run()差不多,只是 runAndReset()办法会在 finally 代码块中将工作状态重置为 NEW。runAndReset()办法的源代码如下所示,就不反复阐明了。

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

(8)removeWaiter()办法

removeWaiter()办法中次要是应用自旋循环的形式来移除 WaitNode 中的线程,比较简单,如下所示。

private void removeWaiter(WaitNode node) {if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

最初,在 FutureTask 类的最初,有如下代码。

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {throw new Error(e);
    }
}

对于这些代码的作用,会在后续深度解析 CAS 文章中具体阐明,这里就不再探讨。

至此,对于 Future 接口和 FutureTask 类的源码就剖析完了。

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0