- 你有一个思维,我有一个思维,咱们替换后,一个人就有两个思维
- If you can NOT explain it simply, you do NOT understand it well enough
前言
创立线程有几种形式?这个问题的答案应该是能够脱口而出的吧
- 继承 Thread 类
- 实现 Runnable 接口
但这两种形式创立的线程是属于”三wu产品“:
- 没有参数
- 没有返回值
- 没方法抛出异样
class MyThread implements Runnable{ @Override public void run() { log.info("my thread"); }}
Runnable 接口是 JDK1.0 的外围产物
/** * @since JDK1.0 */@FunctionalInterfacepublic interface Runnable { public abstract void run();}
用着 “三wu产品” 总是有一些弊病,其中没方法拿到返回值是最让人不能忍的,于是 Callable 就诞生了
Callable
又是 Doug Lea 巨匠,又是 Java 1.5 这个神奇的版本
/** * @see Executor * @since 1.5 * @author Doug Lea * @param <V> the result type of method {@code call} */@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}
Callable 是一个泛型接口,外面只有一个 call()
办法,该办法能够返回泛型值 V ,应用起来就像这样:
Callable<String> callable = () -> { // Perform some computation Thread.sleep(2000); return "Return some result";};
二者都是函数式接口,外面都仅有一个办法,应用上又是如此类似,除了有无返回值,Runnable 与 Callable 就点差异吗?
Runnable VS Callable
两个接口都是用于多线程执行工作的,但他们还是有很显著的差异的
执行机制
先从执行机制上来看,Runnable 你太分明了,它既能够用在 Thread 类中,也能够用在 ExecutorService 类中配合线程池的应用;Bu~~~~t, Callable 只能在 ExecutorService 中应用,你翻遍 Thread 类,也找不到Callable 的身影
异样解决
Runnable 接口中的 run 办法签名上没有 throws ,天然也就没方法向上流传受检异样;而 Callable 的 call() 办法签名却有 throws,所以它能够解决受检异样;
所以归纳起来看次要有这几处不同点:
整体差异尽管不大,然而这点差异,却具备重大意义
返回值和解决异样很好了解,另外,在理论工作中,咱们通常要应用线程池来治理线程(起因曾经在 为什么要应用线程池? 中明确阐明),所以咱们就来看看 ExecutorService 中是如何应用二者的
ExecutorService
先来看一下 ExecutorService 类图
我将上图标记的办法独自放在此处
void execute(Runnable command);<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);
能够看到,应用ExecutorService 的 execute()
办法仍旧得不到返回值,而 submit()
办法清一色的返回 Future
类型的返回值
仔细的敌人可能曾经发现, submit() 办法曾经在 CountDownLatch 和 CyclicBarrier 傻傻的分不清楚? 文章中屡次应用了,只不过咱们没有获取其返回值罢了,那么
- Future 到底是什么呢?
- 怎么通过它获取返回值呢?
咱们带着这些疑难一点点来看
Future
Future 又是一个接口,外面只有五个办法:
从办法名称上置信你曾经能看出这些办法的作用
// 勾销工作boolean cancel(boolean mayInterruptIfRunning);// 获取工作执行后果V get() throws InterruptedException, ExecutionException;// 获取工作执行后果,带有超时工夫限度V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;// 判断工作是否曾经勾销boolean isCancelled();// 判断工作是否曾经完结boolean isDone();
铺垫了这么多,看到这你兴许有些乱了,咱们连忙看一个例子,演示一下几个办法的作用
@Slf4jpublic class FutureAndCallableExample { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newSingleThreadExecutor(); // 应用 Callable ,能够获取返回值 Callable<String> callable = () -> { log.info("进入 Callable 的 call 办法"); // 模仿子线程工作,在此睡眠 2s, // 小细节:因为 call 办法会抛出 Exception,这里不必像应用 Runnable 的run 办法那样 try/catch 了 Thread.sleep(5000); return "Hello from Callable"; }; log.info("提交 Callable 到线程池"); Future<String> future = executorService.submit(callable); log.info("主线程继续执行"); log.info("主线程期待获取 Future 后果"); // Future.get() blocks until the result is available String result = future.get(); log.info("主线程获取到 Future 后果: {}", result); executorService.shutdown(); }}
程序运行后果如下:
如果你运行上述示例代码,主线程调用 future.get() 办法会阻塞本人,直到子工作实现。咱们也能够应用 Future 办法提供的 isDone
办法,它能够用来查看 task 是否曾经实现了,咱们将下面程序做点小批改:
// 如果子线程没有完结,则睡眠 1s 从新查看while(!future.isDone()) { System.out.println("Task is still not done..."); Thread.sleep(1000);}
来看运行后果:
如果子程序运行工夫过长,或者其余起因,咱们想 cancel 子程序的运行,则咱们能够应用 Future 提供的 cancel 办法,持续对程序做一些批改
while(!future.isDone()) { System.out.println("子线程工作还没有完结..."); Thread.sleep(1000); double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0; // 如果程序运行工夫大于 1s,则勾销子线程的运行 if(elapsedTimeInSec > 1) { future.cancel(true); }}
来看运行后果:
为什么调用 cancel 办法程序会呈现 CancellationException 呢? 是因为调用 get() 办法时,明确阐明了:
调用 get() 办法时,如果计算结果被勾销了,则抛出 CancellationException (具体起因,你会在上面的源码剖析中看到)
有异样不解决是十分不业余的,所以咱们须要进一步批改程序,以更敌对的形式解决异样
// 通过 isCancelled 办法判断程序是否被勾销,如果被勾销,则打印日志,如果没被勾销,则失常调用 get() 办法if (!future.isCancelled()){ log.info("子线程工作已实现"); String result = future.get(); log.info("主线程获取到 Future 后果: {}", result);}else { log.warn("子线程工作被勾销");}
查看程序运行后果:
置信到这里你曾经对 Future
的几个办法有了根本的应用印象,但 Future
是接口,其实应用 ExecutorService.submit()
办法返回的始终都是 Future
的实现类 FutureTask
接下来咱们就进入这个外围实现类一探到底
FutureTask
同样先来看类构造
public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}
很神奇的一个接口,FutureTask
实现了 RunnableFuture
接口,而 RunnableFuture
接口又别离实现了 Runnable
和 Future
接口,所以能够推断出 FutureTask
具备这两种接口的个性:
- 有
Runnable
个性,所以能够用在ExecutorService
中配合线程池应用 - 有
Future
个性,所以能够从中获取到执行后果
FutureTask源码剖析
如果你残缺的看过 AQS 相干剖析的文章,你兴许会发现,浏览 Java 并发工具类源码,咱们无非就是要关注以下这三点:
- 状态 (代码逻辑的次要管制)- 队列 (期待排队队列)- CAS (平安的set 值)
脑海中牢记这三点,咱们开始看 FutureTask 源码,看一下它是如何围绕这三点实现相应的逻辑的
文章结尾曾经提到,实现 Runnable 接口模式创立的线程并不能获取到返回值,而实现 Callable 的才能够,所以 FutureTask 想要获取返回值,必然是和 Callable 有分割的,这个推断一点都没错,从构造方法中就可以看进去:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable}
即使在 FutureTask 构造方法中传入的是 Runnable 模式的线程,该构造方法也会通过 Executors.callable
工厂办法将其转换为 Callable 类型:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable}
然而 FutureTask 实现的是 Runnable 接口,也就是只能重写 run() 办法,run() 办法又没有返回值,那问题来了:
- FutureTask 是怎么在 run() 办法中获取返回值的?
- 它将返回值放到哪里了?
- get() 办法又是怎么拿到这个返回值的呢?
咱们来看一下 run() 办法(要害代码都已标记正文)
public void run() { // 如果状态不是 NEW,阐明工作曾经执行过或者曾经被勾销,间接返回 // 如果状态是 NEW,则尝试把执行线程保留在 runnerOffset(runner字段),如果赋值失败,则间接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 获取构造函数传入的 Callable 值 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 失常调用 Callable 的 call 办法就能够获取到返回值 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 保留 call 办法抛出的异样 setException(ex); } if (ran) // 保留 call 办法的执行后果 set(result); } } finally { runner = null; int s = state; // 如果工作被中断,则执行中断解决 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
run()
办法没有返回值,至于 run()
办法是如何将 call()
办法的返回后果和异样都保存起来的呢?其实非常简单, 就是通过 set(result) 保留失常程序运行后果,或通过 setException(ex) 保留程序异样信息
/** The result to return or exception to throw from get() */private Object outcome; // non-volatile, protected by state reads/writes// 保留异样后果protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}// 保留失常后果protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); }}
setException
和 set
办法十分类似,都是将异样或者后果保留在 Object
类型的 outcome
变量中,outcome
是成员变量,就要思考线程平安,所以他们要通过 CAS形式设置 outcome 变量的值,既然是在 CAS 胜利后 更改 outcome 的值,这也就是 outcome 没有被 volatile
润饰的起因所在。
保留失常后果值(set办法)与保留异样后果值(setException办法)两个办法代码逻辑,惟一的不同就是 CAS 传入的 state 不同。咱们下面提到,state 少数用于控制代码逻辑,FutureTask 也是这样,所以要搞清代码逻辑,咱们须要先对 state 的状态变动有所理解
/* * * Possible state transitions: * NEW -> COMPLETING -> NORMAL //执行过程顺利完成 * NEW -> COMPLETING -> EXCEPTIONAL //执行过程出现异常 * NEW -> CANCELLED // 执行过程中被勾销 * NEW -> INTERRUPTING -> INTERRUPTED //执行过程中,线程被中断 */private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;
7种状态,千万别慌,整个状态流转其实只有四种线路
FutureTask 对象被创立进去,state 的状态就是 NEW 状态,从下面的构造函数中你应该曾经发现了,四个最终状态 NORMAL ,EXCEPTIONAL , CANCELLED , INTERRUPTED 也都很好了解,两个中间状态稍稍有点让人困惑:
- COMPLETING: outcome 正在被set 值的时候
- INTERRUPTING:通过 cancel(true) 办法正在中断线程的时候
总的来说,这两个中间状态都示意一种刹时状态,咱们将几种状态图形化展现一下:
咱们晓得了 run() 办法是如何保留后果的,以及晓得了将失常后果/异样后果保留到了 outcome 变量里,那就须要看一下 FutureTask 是如何通过 get() 办法获取后果的:
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果 state 还没到 set outcome 后果的时候,则调用 awaitDone() 办法阻塞本人 if (s <= COMPLETING) s = awaitDone(false, 0L); // 返回后果 return report(s);}
awaitDone 办法是 FutureTask 最外围的一个办法
// get 办法反对超时限度,如果没有传入超时工夫,则承受的参数是 false 和 0L// 有期待就会有队列排队或者可响应中断,从办法签名上看有 InterruptedException,阐明该办法这是能够被中断的private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 计算期待截止工夫 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果以后线程被中断,如果是,则在期待对抗中删除该节点,并抛出 InterruptedException if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 状态大于 COMPLETING 阐明曾经达到某个最终状态(失常完结/异样完结/勾销) // 把 thread 只为空,并返回后果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果是COMPLETING 状态(中间状态),示意工作已完结,但 outcome 赋值还没完结,这时被动让出执行权,让其余线程优先执行(只是收回这个信号,至于是否别的线程执行肯定会执行可是不肯定的) else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 期待节点为空 else if (q == null) // 将以后线程结构节点 q = new WaitNode(); // 如果还没有入队列,则把以后节点退出waiters首节点并替换原来waiters else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果设置超时工夫 else if (timed) { nanos = deadline - System.nanoTime(); // 工夫到,则不再期待后果 if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞期待特定工夫 LockSupport.parkNanos(this, nanos); } else // 挂起以后线程,晓得被其余线程唤醒 LockSupport.park(this); }}
总的来说,进入这个办法,通常会经验三轮循环
- 第一轮for循环,执行的逻辑是
q == null
, 这时候会新建一个节点 q, 第一轮循环完结。 - 第二轮for循环,执行的逻辑是
!queue
,这个时候会把第一轮循环中生成的节点的 next 指针指向waiters,而后CAS的把节点q 替换waiters, 也就是把新生成的节点增加到waiters 中的首节点。如果替换胜利,queued=true。第二轮循环完结。 - 第三轮for循环,进行阻塞期待。要么阻塞特定工夫,要么始终阻塞晓得被其余线程唤醒。
对于第二轮循环,大家可能稍稍有点迷糊,咱们后面说过,有阻塞,就会排队,有排队天然就有队列,FutureTask 外部同样保护了一个队列
/** Treiber stack of waiting threads */private volatile WaitNode waiters;
说是期待队列,其实就是一个 Treiber 类型 stack,既然是 stack, 那就像手枪的弹夹一样(脑补一下子弹放入弹夹的情景),后进先出,所以刚刚说的第二轮循环,会把新生成的节点增加到 waiters stack 的首节点
如果程序运行失常,通常调用 get() 办法,会将以后线程挂起,那谁来唤醒呢?天然是 run() 办法运行完会唤醒,设置返回后果(set办法)/异样的办法(setException办法) 两个办法中都会调用 finishCompletion 办法,该办法就会唤醒期待队列中的线程
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // 唤醒期待队列中的线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint}
将一个工作的状态设置成终止态只有三种办法:
- set
- setException
- cancel
前两种办法曾经剖析完,接下来咱们就看一下 cancel
办法
查看 Future cancel(),该办法正文上明确阐明三种 cancel 操作肯定失败的情景
- 工作曾经执行实现了
- 工作曾经被勾销过了
- 工作因为某种原因不能被勾销
其它状况下,cancel操作将返回true。值得注意的是,cancel操作返回 true 并不代表工作真的就是被勾销, 这取决于动员cancel状态时,工作所处的状态
- 如果发动cancel时工作还没有开始运行,则随后工作就不会被执行;
如果发动cancel时工作曾经在运行了,则这时就须要看
mayInterruptIfRunning
参数了:- 如果mayInterruptIfRunning 为true, 则以后在执行的工作会被中断
- 如果mayInterruptIfRunning 为false, 则能够容许正在执行的工作持续运行,直到它执行完
有了这些铺垫,看一下 cancel 代码的逻辑就秒懂了
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 须要中断工作执行线程 if (mayInterruptIfRunning) { try { Thread t = runner; // 中断线程 if (t != null) t.interrupt(); } finally { // final state // 批改为最终状态 INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 唤醒期待中的线程 finishCompletion(); } return true;}
外围办法终于剖析完了,到这咱们喝口茶劳动一下吧
我是想说,应用 FutureTask 来演练烧水泡茶经典程序
如上图:
- 洗水壶 1 分钟
- 烧开水 15 分钟
- 洗茶壶 1 分钟
- 洗茶杯 1 分钟
- 拿茶叶 2 分钟
最终泡茶
让我心算一下,如果串行总共须要 20 分钟,但很显然在烧开水期间,咱们能够洗茶壶/洗茶杯/拿茶叶
这样总共须要 16 分钟,节约了 4分钟工夫,烧水泡茶尚且如此,在当初高并发的时代,4分钟能够做的事太多了,学会应用 Future 优化程序是必然(其实优化程序就是寻找要害门路,要害门路找到了,非关键门路的工作通常就能够和要害门路的内容并行执行了)
@Slf4jpublic class MakeTeaExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); // 创立线程1的FutureTask FutureTask<String> ft1 = new FutureTask<String>(new T1Task()); // 创立线程2的FutureTask FutureTask<String> ft2 = new FutureTask<String>(new T2Task()); executorService.submit(ft1); executorService.submit(ft2); log.info(ft1.get() + ft2.get()); log.info("开始泡茶"); executorService.shutdown(); } static class T1Task implements Callable<String> { @Override public String call() throws Exception { log.info("T1:洗水壶..."); TimeUnit.SECONDS.sleep(1); log.info("T1:烧开水..."); TimeUnit.SECONDS.sleep(15); return "T1:开水已备好"; } } static class T2Task implements Callable<String> { @Override public String call() throws Exception { log.info("T2:洗茶壶..."); TimeUnit.SECONDS.sleep(1); log.info("T2:洗茶杯..."); TimeUnit.SECONDS.sleep(2); log.info("T2:拿茶叶..."); TimeUnit.SECONDS.sleep(1); return "T2:福鼎白茶拿到了"; } }}
下面的程序是主线程期待两个 FutureTask 的执行后果,线程1 烧开水工夫更长,线程1心愿在水烧开的那一刹那就能够拿到茶叶间接泡茶,怎么半呢?
那只须要在线程 1 的FutureTask 中获取 线程 2 FutureTask 的返回后果就能够了,咱们稍稍批改一下程序:
@Slf4jpublic class MakeTeaExample1 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); // 创立线程2的FutureTask FutureTask<String> ft2 = new FutureTask<String>(new T2Task()); // 创立线程1的FutureTask FutureTask<String> ft1 = new FutureTask<String>(new T1Task(ft2)); executorService.submit(ft1); executorService.submit(ft2); executorService.shutdown(); } static class T1Task implements Callable<String> { private FutureTask<String> ft2; public T1Task(FutureTask<String> ft2) { this.ft2 = ft2; } @Override public String call() throws Exception { log.info("T1:洗水壶..."); TimeUnit.SECONDS.sleep(1); log.info("T1:烧开水..."); TimeUnit.SECONDS.sleep(15); String t2Result = ft2.get(); log.info("T1 拿到T2的 {}, 开始泡茶", t2Result); return "T1: 上茶!!!"; } } static class T2Task implements Callable<String> { @Override public String call() throws Exception { log.info("T2:洗茶壶..."); TimeUnit.SECONDS.sleep(1); log.info("T2:洗茶杯..."); TimeUnit.SECONDS.sleep(2); log.info("T2:拿茶叶..."); TimeUnit.SECONDS.sleep(1); return "福鼎白茶"; } }}
来看程序运行后果:
晓得这个变动后咱们再回头看 ExecutorService 的三个 submit 办法:
<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> Future<T> submit(Callable<T> task);
第一种办法,逐层代码查看到这里:
你会发现,和咱们革新烧水泡茶的程序思维是类似的,能够传进去一个 result,result 相当于主线程和子线程之间的桥梁,通过它奴才线程能够共享数据
第二个办法参数是 Runnable 类型参数,即使调用 get() 办法也是返回 null,所以仅是能够用来断言工作曾经完结了,相似 Thread.join()
第三个办法参数是 Callable 类型参数,通过get() 办法能够明确获取 call() 办法的返回值
到这里,对于 Future 的整块解说就完结了,还是须要简略消化一下的
总结
如果相熟 Javascript 的敌人,Future 的个性和 Javascript 的 Promise 是相似的,私下开玩笑通常将其比喻成男朋友的承诺
回归到Java,咱们从 JDK 的演变历史,谈及 Callable 的诞生,它补救了 Runnable 没有返回值的空缺,通过简略的 demo 理解 Callable 与 Future 的应用。 FutureTask 又是 Future接口的外围实现类,通过浏览源码理解了整个实现逻辑,最初联合FutureTask 和线程池演示烧水泡茶程序,置信到这里,你曾经能够轻松获取线程后果了
烧水泡茶是非常简单的,如果更简单业务逻辑,以这种形式应用 Future 必定会带来很大的会乱(程序完结没方法被动告诉,Future 的链接和整合都须要手动操作)为了解决这个短板,没错,又是那个男人 Doug Lea, CompletableFuture
工具类在 Java1.8 的版本呈现了,搭配 Lambda 的应用,让咱们编写异步程序也像写串行代码那样简略,纵享丝滑
接下来咱们就理解一下 CompletableFuture
的应用
灵魂诘问
- 你在日常开发工作中是怎么将整块工作做到分工与合作的呢?有什么基本准则吗?
- 如何批量的执行异步工作呢?
参考
- Java 并发编程实战
- Java 并发编程的艺术
- Java 并发编程之美
日拱一兵 | 原创