前言
最近遇到了一个业务场景,波及到多数据源之间的申请的流程编排,正好看到了一篇某团介绍CompletableFuture原理和应用的技术文章,次要还是波及应用层面。网上很多文章波及原理的局部讲的不是特地具体且比拟形象。因为波及到多线程的工具必须要了解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
背景
咱们把Runnable了解为最根本的线程工作,只具备在线程下执行一段逻辑的能力。为了获取执行的返回值,发明了Callable和与其配合应用的Future。为了将工作之间进行逻辑编排,就诞生了CompletableFuture。对于如何了解工作的逻辑编排,举一个简略的例子:
关上电脑-更新零碎这两个操作是有先后顺序的,然而泡茶和这两个操作没有先后顺序,是能够并行的,而开始办公必须要期待其余操作完结之后能力进行,这就造成了工作编排的执行链。
在IO密集型零碎中,相似的场景有很多。因为不同数据集的查问依赖主键不同,A数据集的查问主键是B数据集的一个字段这种状况很常见,通常还须要并发查问多个数据集的数据,所以对于多线程的执行编排是有需要的。
一种解决办法是CountDownLatch,让线程执行到某个中央后进行期待,直到依赖的工作执行完结。对于一些简略的执行链是能够满足的,然而当编排逻辑简单起来,CountDownLatch会导致代码难以保护和调试。所以诞生了CompletableFuture用来形容和保护工作之间的依赖关系以进行工作编排。在理论利用中,有以下两类场景是适宜应用工作编排的:
多数据源申请的流程编排
非阻塞化网关等NIO场景
应用形式
创立与执行
同步办法
和FutureTask相似,CompletableFuture也通过get()办法获取执行后果。然而不同的是,CompletableFuture自身能够不承载可执行的工作(相比FutureTask则必须承载一个可执行的工作Callable),通过一个用于标记执行胜利并设置返回值的函数,在应用上也更为灵便,如下:
CompletableFuture<String> demo = new CompletableFuture<>();demo.complete("success");System.out.println(demo.get());
复制代码
执行后果:success
和Future相似,get()函数也是同步阻塞的,调用get函数后线程会阻塞直到调用complete办法标记工作曾经执行胜利。
除了手动触发工作的实现,也能够让创建对象的同时就标记工作实现:
CompletableFuture<String> demo = CompletableFuture.completedFuture("success");System.out.println(demo.get());
复制代码
执行后果:success
异步办法
相比于同步办法,异步执行更为常见。比方上面这个例子:
CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> { System.out.println("do something by thread" + Thread.currentThread().getName()); return "success"; }); System.out.println(demo.get());
复制代码
执行后果:
do something by threadForkJoinPool.commonPool-worker-9
success
supplyAsync办法接管一个Supplier对象,逻辑函数交给线程池中的线程异步执行
默认会应用ForkJoinPool的公共线程池来执行代码(不举荐),当然也能够指定线程池,如下:
ExecutorService executor = Executors.newFixedThreadPool(4);CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> { System.out.println("do something by thread" + Thread.currentThread().getName()); return "success";}, executor);System.out.println(demo.get());
复制代码
执行后果:
do something by threadpool-1-thread-1
success
如果不须要执行后果,也能够用runAsync办法:
CompletableFuture.runAsync(() -> { System.out.println("do something by thread" + Thread.currentThread().getName());});
复制代码
执行后果:
do something by threadForkJoinPool.commonPool-worker-9
多任务编排
多任务编排是CompletableFuture的外围,这里列举不同的场景来进行阐明
一元依赖
步骤2须要依赖步骤1执行结束能力执行,相似主线程的程序执行,能够通过以下形式实现:
ExecutorService executor = Executors.newFixedThreadPool(4); CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> { System.out.println("执行【步骤1】"); return "【步骤1的执行后果】"; }, executor); CompletableFuture<String> step2 = step1.thenApply(result -> { System.out.println("上一步操作后果为:" + result); return "【步骤2的执行后果】"; }); System.out.println("步骤2的执行后果:" + step2.get());
复制代码
执行后果:
执行【步骤1】
上一步操作后果为:【步骤1的执行后果】
步骤2的执行后果:【步骤2的执行后果】
通过thenApply办法,接管上一个CompletableFuture对象的返回值,其中隐含的逻辑是,该处逻辑只有等上一个CompletableFuture对象执行完后才会执行
二元依赖
相比于一元依赖的程序执行链,二元依赖更为常见,比方上面这个场景:
步骤1和2是并行的,而步骤3须要等步骤1和2执行完之后能力执行,通过CompletableFuture是这么实现的:
ExecutorService executor = Executors.newFixedThreadPool(4); CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> { System.out.println("执行【步骤1】"); return "【步骤1的执行后果】"; }, executor); CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> { System.out.println("执行【步骤2】"); return "【步骤2的执行后果】"; }, executor); CompletableFuture<String> step3 = step1.thenCombine(step2, (result1, result2) -> { System.out.println("前两步操作后果别离为:" + result1 + result2); return "【步骤3的执行后果】"; }); System.out.println("步骤3的执行后果:" + step3.get());
复制代码
执行后果:
执行【步骤1】
执行【步骤2】
前两步操作后果别离为:【步骤1的执行后果】【步骤2的执行后果】
步骤3的执行后果:【步骤3的执行后果】
通过thenCombine办法,期待step1和step2都执行结束后,获取其返回后果并执行一段新的逻辑
多元依赖
当然还可能有上面这种场景,步骤M须要依赖1-N的N个前置节点:
这种状况会更为简单,实现形式如下:
ExecutorService executor = Executors.newFixedThreadPool(4); CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> { System.out.println("执行【步骤1】"); return "【步骤1的执行后果】"; }, executor); CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> { System.out.println("执行【步骤2】"); return "【步骤2的执行后果】"; }, executor); CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> { System.out.println("执行【步骤3】"); return "【步骤3的执行后果】"; }, executor); CompletableFuture<Void> stepM = CompletableFuture.allOf(step1, step2, step3); CompletableFuture<String> stepMResult = stepM.thenApply(res -> { // 通过join函数获取返回值 String result1 = step1.join(); String result2 = step2.join(); String result3 = step3.join(); return result1 + result2 + result3; }); System.out.println("步骤M的后果:" + stepMResult.get());
复制代码
执行后果:
执行【步骤1】
执行【步骤2】
执行【步骤3】
步骤M的后果:【步骤1的执行后果】【步骤2的执行后果】【步骤3的执行后果】
通过allOf函数申明当参数中的所有工作执行结束后,才会执行下一步操作,然而要留神,allOf自身只是定义节点,真正阻塞的地位是thenApply函数。
和之前的形式不同,因为采纳了不定变量,所以要通过CompletableFuture#join来获取每个工作的返回值。
除了allOf之外,如果咱们须要任意一个工作实现后就执行下一步操作,能够应用anyOf办法,如下:
// step1/2/3的定义雷同 // ... CompletableFuture<Object> stepM = CompletableFuture.anyOf(step1, step2, step3);System.out.println("步骤M的后果:" + stepM.get());
复制代码
执行后果:
步骤M的后果:【步骤1的执行后果】
与allOf不同,anyOf的返回值即为第一个执行结束的工作
工作原理
概念
在讲原理之前,先来理解一下CompletableFuture的定义。在实现上,CompletableFuture继承了Future和CompletionStage
Future毋庸置疑,CompletableFuture最根本的能力就是获取异步计算的后果。CompletionStage则是申明了编排节点的能力,每一个CompletionStage都申明了流程树上的一个节点(见下图)
CompletionStage申明的接口thenXXX,包含thenApply、thenCompose等,定义了节点之间的连贯形式(理论状况更为简单,具体原理参考下节函数剖析),通过这种形式,最终定义出一颗流程树,进而实现了多线程的工作编排。CompletionStage的办法返回值通常是另一个CompletionStage,进而形成了链式调用。
构造剖析
CompletableFuture里蕴含两个变量,result和stack
result很好了解,就是以后节点的执行后果。stack就比较复杂,是一个无锁并发栈,申明了以后节点执行结束后要触发的节点列表,接下来咱们具体讲一下
CompletableFuture中的栈设计
Completion是一个无锁并发栈,申明了以后节点执行结束后要触发的节点列表。在结构上是一个链式节点,其中只蕴含了一个指向下一个节点的next对象
咱们能够看到Completion有繁多的实现类,示意不同的依赖形式。
咱们晓得,在CompletableFuture中的流程编排是通过thenApply、thenAccept、thenCombine等形式来实现的,
thenApply:接管上一步的处理结果,进行下一步生产,并返回后果
thenAccept:和thenApply相似,不过无后果返回
thenCombine:同时接管两个流程节点,等其都执行结束后一起处理结果
每个函数理论别离对应了一种Completion实现类,以方才的三种函数为例,别离对应了UniApply、UniAccept、UniCombine三个对象。所以Completion能够认为是流程编排逻辑的形象对象,能够了解为流程节点,或者工作节点。
以UniCompletion为例,构造如下:
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // 线程池CompletableFuture<V> dep; // 实现工作依赖的cfCompletableFuture<T> src; // 实现工作所需资源所在的cf/** * 如果工作能够被执行则返回true,通过FJ标记位保障只有一个线程判断胜利。 * 如果工作是异步的,则在工作启动后通过tryFire来执行工作 */final boolean claim() { Executor e = executor; if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { if (e == null) return true; executor = null; // disable e.execute(this); } return false;}/** * 如果dep不为空返回true,用以判断当前任务节点是否已被激活 */final boolean isLive() { return dep != null;}
}
复制代码
先来看claim函数,这个比拟容易解释,该函数用于判断工作是否可被执行。通过compareAndSetForkJoinTaskTag函数的CAS操作保障只有一个线程执行胜利,次要作用就是在多线程状况下确保工作的正确执行。
接下来就是重头戏,源工作与依赖工作,这两个概念是CompletableFuture的外围,贯通了所有逻辑的执行,只有了解了这两个概念,能力对执行原理有比拟透彻的了解
源工作与依赖工作
源工作和依赖工作在UniCompletion中别离为src和dep属性,举个具体的例子,比方上面这段代码:
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
return "A";
});
CompletableFuture<String> b = a.thenApply(res -> {
return "B " + res;
});
复制代码
调用a.thenApply(Function fn)时,能够认为是生成了一个UniApply的流程节点(具体怎么生成的下文会提到),其中源工作就是a,而依赖工作则是thenApply的返回值。
换个简略的说法,在下面的代码中,咱们有a、b两个工作,b工作的实现须要依赖于a工作的实现,所以a会生成一个流程节点(UniApply对象),其中蕴含了b想要执行实现的全副资源(a的执行后果等),这时a工作就叫做源工作(因为a工作中有工作资源)。而b工作须要依赖a工作来实现,所以b工作叫做依赖工作。
源工作的实现会触发依赖工作的执行,这个就是工作编排的基本原理
函数剖析
在本节中,CompletableFuture因为名字太长,会以cf来代指
因为thenAccept、thenCombine函数等逻辑比拟相似,咱们以最根底的thenApply函数为例进行剖析
外围函数
咱们先不要间接从thenApply、complete等函数动手,咱们先来看这几个外围函数,不明确做什么的不要紧,先了解这几个函数的原理就好
uniApply
CompletableFuture的逻辑在于“只有当X条件满足时,再执行Y逻辑”,uniApply函数就是负责这样的逻辑判断,首先看源码:
final <S> boolean uniApply(CompletableFuture<S> a,
Consumer<? super S> f, UniApply<S> c) {Object r; Throwable x;// 1if (a == null || (r = a.result) == null || f == null) return false;tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { // 2 if (c != null && !c.claim()) return false; // 3 S s = (S) r; completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); }}return true;
}
复制代码
整个办法能够分为三段(已在代码中标出),咱们离开来说。
第一段,判断所给的工作节点是否曾经执行结束,如果曾经执行结束则进入下一步
第二段,如果有关联的流程节点,则通过claim函数判断当前任务是否可被执行,如果可执行则进入下一步(确保多线程状况下工作的正确执行)
第三段,执行传入的函数并把值设置到以后对象中。
整个逻辑是这样的,首先咱们传入了一个cf对象、一个函数,和一个流程节点。只有当传入的cf对象执行实现(result不为空),再执行给定的函数,并把执行后果设置到以后对象中。如果不思考非凡状况,uniApply办法用一句话解释就是:如果给定的工作曾经执行结束,就执行传入的函数并把执行后果设置到以后对象中
tryFire
uniApply函数仅仅是一个有条件的函数执行器,真正想要达到工作编排还须要其余函数的参加,咱们先来看tryFire办法:
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) return null;dep = null; src = null; fn = null;return d.postFire(a, mode);
}
复制代码
tryFire依据关联关系的不同有多种实现,理论执行流程相差不大,这里以罕用的UniApply的实现来举例。
首先这个办法接管了一个mode参数,有以下几种取值:
-1:流传模式,或者叫嵌套模式。示意工作理论曾经执行结束,只是在传递状态
0:同步模式。工作由以后线程调用解决
1:异步模式。工作须要提交到指定线程池解决
依据mode的不同,理论tryFire执行的流程也会产生很大区别。不过归根到底,tryFire办法的实质是调用了uniApply执行一次工作,如果执行胜利,则会清空dep、src等本身属性(清空之后isLive办法会返回false,示意工作曾经执行结束),同时通过postFire办法执行该工作下的其余依赖工作,实现工作的流传执行。
postFire办法因为和tryFire办法关联比拟亲密,这里放在一起阐明:
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) { if (mode < 0 || a.result == null) a.cleanStack(); else a.postComplete(); } if (result != null && stack != null) { if (mode < 0) return this; else postComplete(); } return null;
}
复制代码
这里简略概括一下执行原理,如果是嵌套模式,则清理栈内有效工作,并返回对象自身(能够认为什么都没做);否则通过postComplete办法执行栈内依赖此工作的其余工作项
postComplete
当一个CompletionStage执行实现之后,会触发依赖它的其余CompletionStage的执行,这些Stage的执行又会触发新一批的Stage执行,这就是工作的程序编排
如果说uniApply是根底性能,是负责线程平安且恪守依赖程序地执行一个函数,那么postComplete就是外围逻辑,负责当一个工作执行结束后触发依赖该工作的其余工作项,先来看源码:
final void postComplete() {
CompletableFuture<?> f = this; Completion h;// 1while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; // 2 if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } // 3 f = (d = h.tryFire(NESTED)) == null ? this : d; }}
}
复制代码
在源码上标记了三个地位,别离代表三层构造,首先是第一层while循环,只有以后对象栈中还有流程节点,那么就循环执行外部逻辑。
第二层,因为continue的存在,和第一层联合起来看就是一个批量压栈的操作,将所有须要触发的依赖树按程序压入以后对象栈中。
第三层,通过tryFire按程序触发栈中所有的依赖工作。上节咱们能够看到tryFire函数内依据mode的不同会触发不同的逻辑,这里mode指定为NESTED就是为了防止循环调用postComplete
执行函数
几个外围函数介绍完了,接下来咱们回到最外层,来看看工作是如何执行的,首先咱们以thenApply为例剖析外围执行函数
supplyAsync(理论调用为asyncSupplyStage)
该办法用于提交一个工作到线程池中执行,并将该工作打包为一个CompletableFuture节点
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;
}
复制代码
其中AsyncSupply实现了Runnable接口,所以了解为一种非凡的工作即可。这种工作在执行实现后会调用completeValue将函数执行的后果设置以后对象中。
所以整体逻辑为,首先创立一个cf对象,并立刻将工作增加到线程池执行,在执行结束后会将工作执行的后果保留到所创立的cf对象中。
complete
public boolean complete(T value) {
boolean triggered = completeValue(value);postComplete();return triggered;
}
复制代码
该办法间接调用completeValue办法设置值,设置完值之后调用postComplete办法来顺次执行后续工作。当调用该办法时,会实现工作的依赖扩散执行
thenApply(理论调用为uniApplyStage)
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();CompletableFuture<V> d = new CompletableFuture<V>();if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC);}return d;
}
复制代码
联合上节剖析的外围函数,咱们很容易能够剖析该函数的流程:执行function函数,如果条件不满足则执行失败,会生成一个流程节点并压入栈,同时再通过tryFire再尝试执行一次,如果条件仍然不满足,那么只能期待所依赖的工作执行实现后通过postComplete触发执行。
get
public T get() throws InterruptedException, ExecutionException {
Object r;return reportGet((r = result) == null ? waitingGet(true) : r);
}
复制代码
办法外围在于waitingGet,外部应用了ForkJoinPool.managedBlock来阻塞线程直到执行结束
流程剖析
在函数剖析中,咱们理论曾经阐明了工作依赖执行的基本原理,这里为了更为具体地阐明,咱们以一个简略的例子来剖析。
首先咱们抛开所有简单的因素,以最根本的同步串行代码来讲,咱们当初有这样一个对象:
CompletableFuture<String> A = new CompletableFuture<>();
复制代码
而后咱们这时候给其加上了工作编排,减少了一个thenApply依赖
AtomicInteger seq = new AtomicInteger(0);Function<String, String> func = s -> s + " | " + seq.incrementAndGet();CompletableFuture<String> a = new CompletableFuture<>();CompletableFuture<String> b = a.thenApply(func);
复制代码
于是咱们就有了这样一个构造,A的stack中压入了一个Completion节点,该节点的源工作指向A自身,而依赖工作指向了B,示意B工作依赖A工作的执行。
接下来咱们再加一条依赖
AtomicInteger seq = new AtomicInteger(0);Function<String, String> func = s -> s + " | " + seq.incrementAndGet();CompletableFuture<String> a = new CompletableFuture<>();CompletableFuture<String> b = a.thenApply(func); CompletableFuture<String> c = a.thenApply(func);
复制代码
咱们会发现两个特点:
和栈的性质一样,越晚增加的编排逻辑越早被执行
基于同一个对象衍生进去的流程节点的源工作是统一的
以此类推,thenXXX的其余逻辑也是相似的原理,当a调用complete函数时(无论是同步还是异步),都会顺次触发A工作的stack下挂接的其余依赖工作。而只有a没有调用complete函数,那么thenApply中挂接的依赖工作无论如何都无奈执行(因为a对象的result属性为空)
注意事项
防止主工作和子工作向同一个线程池中申请线程,因为存在依赖关系,当通过join来获取子工作的值时,一旦子工作因为线程队列已满进入阻塞队列,那么将会造成死