共计 35663 个字符,预计需要花费 90 分钟才能阅读完成。
前言
近期作者对响应式编程越发感兴趣,在内部分享 ”JAVA9-12″ 新特性过程中,有两处特性让作者深感兴趣:
1.JAVA9 中的 JEP266 对并发编程工具的更新,包含发布订阅框架 Flow 和 CompletableFuture 加强,其中发布订阅框架以 java.base 模块下的 java.util.concurrent.Flow 及其中的几个内部类 / 接口为组成部分,它们的名称和作用如下,摘自 JAVA12 的 Flow api 文档。
2.JAVA9 中孵化,JAVA11 中标准化的 HttpClient,在之前分享的 JAVA9-12 新特性一文中曾引用摘自网络的 HttpClient 代码片段:
片段 1:
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.build();
return client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
片段 2:
HttpClient client = HttpClient.newHttpClient();
List<String> urls = List.of("http://www.baidu.com","http://www.alibaba.com/","http://www.tencent.com");
List<HttpRequest> requests = urls.stream()
.map(url -> HttpRequest.newBuilder(URI.create(url)))
.map(reqBuilder -> reqBuilder.build())
.collect(Collectors.toList());
List<CompletableFuture<HttpResponse<String>>> futures = requests.stream()
.map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString()))
.collect(Collectors.toList());
futures.stream()
.forEach(e -> e.whenComplete((resp,err) -> {if(err != null){err.printStackTrace();
}else{System.out.println(resp.body());
System.out.println(resp.statusCode());
}
}));
CompletableFuture.allOf(futures
.toArray(CompletableFuture<?>[]::new))
.join();}
在片段 1 中,thenApply 方法是 CompletableFuture 的成员,client.sendAsync 返回的是一个 CompletableFuture。这两段代码很好阅读,甚至说猜出其中的意义。片段 2 可以说对于作者目前的书写习惯是一个全面的颠覆,显然我们可以预先定义响应行为,而行为的执行时间则由前一个阶段的实际完成时间决定。片段 2 中的 whenComplete 方法很好理解,最后一行用 allOf 生成一个类似树的依赖结构,在当前方法中等待所有 CompletableFuture 执行完成。
简单看这两段代码,响应式编程的魅力可见一斑,甚至可以说是美妙不可言。
那么,作为 JAVA9 中额外照顾增强,HttpClient 赖以实现的 CompletableFuture,它是何方神圣呢?
CompletionStage 接口
CompletionStage 是什么?不妨卖个关子先。
作者目前使用的 JDK 版本为 8,尽管它不包含 9 之后的增强,万幸 CompletionStage 是从 JDK8 引入,因此足以用以了解这一伟大神器了。近期作者在公司使用的一些开源框架中,发现至处间接对它的使用:
1. 持久化框架 Redission。它内部使用一个 RedissonExecutorService(实现 ScheduledExecutorService)和 PromiseDelegator(实现 CompletionStage,而 CompletableFuture 同样也实现了 CompletionStage)来异步地执行 task。
2.apollo 配置中心。它提供了配置变更的异步通知机制,而这依赖于 spring web-mvc 提供的 DeferredResult,而在异步处理 return value 时,DeferredResult 的 setResult 同样也是相应的 CompletionStage 执行。
// 例:阿波罗 NotificationControllerV2 拉取通知接口
@GetMapping
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;
// 省略无关代码
//DeferredResultWrapper 是 apollo 作者包装的 spring DeferredResult
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
// 省略无关代码
if (!CollectionUtils.isEmpty(newNotifications)) {deferredResultWrapper.setResult(newNotifications);
} else {
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResultWrapper.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {deferredResults.remove(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});
// 省略
return deferredResultWrapper.getResult();}
在 spring 的 CompletionStageReturnValueHandler 的 handleReturnValue()方法中,如下异步地处理响应结果:
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {if (returnValue == null) {mavContainer.setRequestHandled(true);
return;
}
final DeferredResult<Object> deferredResult = new DeferredResult<Object>();
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
@SuppressWarnings("unchecked")
CompletionStage<Object> future = (CompletionStage<Object>) returnValue;
future.thenAccept(new Consumer<Object>() {
@Override
public void accept(Object result) {
// 在一个 CompletionStage 完成后执行此方法,为 defferedResult 设值
deferredResult.setResult(result);
}
});
future.exceptionally(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable ex) {
// 在一个 CompletionStage 执行出错后执行此方法,为 deferredResult 设值
deferredResult.setErrorResult(ex);
return null;
}
});
}
以上代码的 future.thenAccept 与 future.exceptionally 只是规定了两种情况下程序接下来的运行行为,相应的代码不是立即执行,而是等到相应的行为发生了才去执行。很明显,同步式编程写流程,响应式编程似乎就是在写行为。
显然,只要熟悉了 CompletionStage 的 api,以上的代码就绝对简单了,好了,开胃菜已上完,接下来介绍 CompletionStage。
CompletionStage 其实很好理解,按照官方定义,它表示一个可能异步运行的“阶段”,在该阶段内要执行相应的行为,而这些运算会在另一个 CompletionStage 完成后开始,它自身完成后又可触发另一个依赖的 CompletionStage。
在 CompletionStage 中这些方法均可用来定义一个行为,行为的执行方式可参考方法名和入参,这与 java8 中的 stream api 持同样的风格。行为参数可以是 Consumer,Function,Runnable。包含 accept 的方法,参数会有一个 Consumer,它会消费上一或多个 CompletionStage 的结果;包含 run 的方法,参数会有一个 Runnable,它的运行不需要前面 CompletionStage 的执行结果;包含 apply 的方法,参数会包含 Function,该 function 一般以前一或几阶段的返回值为入参,以自身的执行结果作为当前 CompletionStage 的结果。
CompletionStage 和实现类 ComletableFuture 的方法名中也会包含 either/all/any 等简单的单词,和上述的含义相组合,不难理解。
以以下三个接口为例说明:
1.public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
接口会返回一个 CompletionStage,该 stage 仅在当前 stage 和参数中的 other 正常完成后才会执行参数中的 action。
2.public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
接口会返回一个 CompletionStage,该 stage 会在当前 stage 或参数中的 other 正常执行完毕后异步执行参数中的函数 fn,而 fn 的参数就是前面执行完毕的 stage 的结果,fn 的返回值将是被返回的 stage 的结果。
3.public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
接口会返回一个 CompletionStage,它会在当前 stage 和参数中的 other 正常执行完毕后执行,以这两个 stage 的结果作为参数,在参数 executor 线程池中执行 action 函数,因为它是一个消费者,因此没有返回值。
接口的其他方法逻辑类似,不再缀述。
CompletableFuture 源码
上一节简述了 CompletionStage 接口的函数定义,作为官方提供的实现类,CompletableFuture 实现了有关的所有接口,它的作者依旧是我等膜拜的道格大神,下面来具体分析 CompletableFuture 的实现。
类签名:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
从签名信息来看,CompletableFuture 实现了 Future 和 CompletionStage 接口,这意味着它即满足 CompletableStage 的阶段执行,也提供了 Future 中获取该执行结果的方法。
首先来看成员变量和核心函数:
volatile Object result; // 当前对象的结果或者一个异常包装对象 AltResult,关于 AltResult 下面再看
volatile Completion stack; // 任务栈,Completion 后面再述。final boolean internalComplete(Object r) {
// 使用 cas 原子操作,将原本为 null 的 result 置为 r,所有调用者都保证 r 不是 null,因此只有第一次才能返回 true。return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
}
final boolean casStack(Completion cmp, Completion val) {
// 尝试用 cas 原子操作将当前 stack 的值从 cmp 换为 val。return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
}
// 其中 STACK,RESULT 就是上面 stack 和 result 的句柄,这点和其他 juc 中的工具惯例相同
private static final sun.misc.Unsafe UNSAFE;
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
try {
final sun.misc.Unsafe u;
UNSAFE = u = sun.misc.Unsafe.getUnsafe();
Class<?> k = CompletableFuture.class;
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
NEXT = u.objectFieldOffset
(Completion.class.getDeclaredField("next"));
} catch (Exception x) {throw new Error(x);
}
}
stack 的类型为 Completion,为了方便理解,在介绍 Completion 类之前,先看几个声明在 CompletableFuture 的常量
static final int SYNC = 0;// 同步
static final int ASYNC = 1;// 异步
static final int NESTED = -1;// 嵌套
再来看 Completion 类的结构
// 继承 ForkJoinTask,实现 Runnable,以及签名接口 AsynchronousCompletionTask
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // 指向下一个 Completion
// 当被触发时,执行 completion 动作,如果存在需要传递的行为,// 返回一个代表该行为的 CompletableFuture
// 参数只能是上面提到的 SYNC,ASYNC,NESTED,后面留意它的正负。abstract CompletableFuture<?> tryFire(int mode);
// 如果当前 completion 依旧是可触发的,则返回 true,这会在清理任务栈时使用.
abstract boolean isLive();
// 继承自 Runnable,直接调用 tryFile, 参数为 1
public final void run() { tryFire(ASYNC); }
// 继承自 ForkJoinTask,直接调用 tryFile, 参数为 1,返回 true
public final boolean exec() { tryFire(ASYNC); return true; }
// 继承自 ForkJoinTask,直接返回 null
public final Void getRawResult() { return null;}
// 继承自 ForkJoinTask,空方法。public final void setRawResult(Void v) {}}
上面列举了内部类 Completion 的全部代码,它继承并实现了 ForkJoinTask 和 Runnable 中的抽象方法,同时声明了 tryFire 这个抽象方法供子类实现。因为继承了 ForkJoinTask,这意味着 Completion 也是一个任务,且它可能在 ForkJoinPool 中执行。关于 Completion 和它的子类后面详述。先来继续看核心函数和成员实现。
/** 尝试将一个任务压栈,成功返回 true */
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);// 把当前的栈设置为 c 的 next
// 尝试把当前栈(h)更新为新值(c)return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
//lazySetNext 定义
static void lazySetNext(Completion c, Completion next) {UNSAFE.putOrderedObject(c, NEXT, next);
}
方法 tryPushStack 的流程很简单,先调用 lazySetNext 将当前栈设置为参数的 next,这样达到了栈的后入为顶层的目的,然后试图将顶部元素设置为新压入栈的 c。
/** 不加锁将任务压栈,使用 cas 加自旋的方式,这也是道格大神的经典. */
final void pushStack(Completion c) {do {} while (!tryPushStack(c));
}
接下来是一些对输出结果编码的代码。
// 内部类,用于对 null 和异常进行包装,从而保证对 result 进行 cas 只有一次成功。static final class AltResult { // See above
final Throwable ex; // null only for NIL
AltResult(Throwable x) {this.ex = x;}
}
/** 空值用一个 ex 为 null 的 AltResult 表示 */
static final AltResult NIL = new AltResult(null);
/** 使用上面的 NIL 完成任务,若任务已经被完成过,返回 false */
final boolean completeNull() {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
NIL);
}
/** 对空值进行编码,使用 NIL */
final Object encodeValue(T t) {return (t == null) ? NIL : t;
}
/** 使用 t 完成当前任务,t 是 null 时使用 NIL 作为结果,否则使用 t */
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t);
}
// 对异常进行编码,返回一个 AltResult,其值 ex 取决于参数 x,// 若 x 为 CompletionException 则直接用 x 赋值 ex,// 否则用 CoimpletionException 包一层。static AltResult encodeThrowable(Throwable x) {return new AltResult((x instanceof CompletionException) ? x :
new CompletionException(x));
}
/** 使用参数提供的异常的编码结果完成任务,若 result 已非空,返回 false */
final boolean completeThrowable(Throwable x) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
encodeThrowable(x));
}
// 如果 x 非 CompletionException,将它包裹成 CompletionException 返回。// 如果不是,则判断,若 r 是 AltResult 且其 ex 就是参数 x 的值,则将 r 返回。// 否则将 x 包裹成 AltResult 返回。static Object encodeThrowable(Throwable x, Object r) {if (!(x instanceof CompletionException))
x = new CompletionException(x);
else if (r instanceof AltResult && x == ((AltResult)r).ex)
return r;
return new AltResult(x);
}
// 给定一个 Throwble x,一个 Object r,使用上面的方法编码的结果来尝试完成。final boolean completeThrowable(Throwable x, Object r) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
encodeThrowable(x, r));
}
// 如果 x 不是 null,使用上面的 encodeThrowable 对 x 编码的结果返回,否则若 t 是空,// 返回 NIL, 否则返回 t。Object encodeOutcome(T t, Throwable x) {return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
}
static Object encodeRelay(Object r) {
Throwable x;
// 对非空参数 r 进行判断。// 若 r 是 AltResult 且具备非空的 ex,且 ex 并不是 CompletionException 类型,// 将 ex 包装成 CompletionException,并包裹成 AltResult 返回。// 其他情况直接返回 r。return (((r instanceof AltResult) &&
(x = ((AltResult)r).ex) != null &&
!(x instanceof CompletionException)) ?
new AltResult(new CompletionException(x)) : r);
}
final boolean completeRelay(Object r) {
// 这段代码的逻辑和上一个方法联合去看,当前未完成的情况下,尝试使用参数 r 完成。// 如果 r 是异常,尝试将它包装成 CompletionException 并外包一层 AltResult。// 用这个 AltResult 完成。return UNSAFE.compareAndSwapObject(this, RESULT, null,
encodeRelay(r));
}
CompletableFuture 本质也是一个 Future, 因此也会支持异步的阻塞的 result 获取。因为在完成这个 future 时,为了便于处理和维护,使用了编码的结果,固在读取结果时,也要对结果进行解码。
/** * 供 future.get() 使用。*/
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException {if (r == null)
// 参数 r 代表一个 CompletableFuture 的 result,因为它会对异常和 null 进行编码。// 故 null 可以视为 get 的中间被扰动的结果。throw new InterruptedException();
if (r instanceof AltResult) {
Throwable x, cause;
// 这一段很简单,是 AltResult,ex 是空返回空。if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
//ex 是取消异常,转换后抛出。throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
// 异常是包装异常 CompletionException,取出被包装的异常抛出。x = cause;
throw new ExecutionException(x);
}
//result 不是 null 也不能体现异常,强转返回。@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
//reportJoin 方法相对简单,因为 join 操作会一直等待,r 能保证非空。// 对于非 AltResult 类型的 r 直接强转返回,AltResult 类型的处理与
//reportGet 类似,但是不解 CompletionException,直接抛出。// 此方法抛出的异常均不受检。private static <T> T reportJoin(Object r) {if (r instanceof AltResult) {
Throwable x;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if (x instanceof CompletionException)
throw (CompletionException)x;
throw new CompletionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
相应的 get 和 join 方法实现。
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
可以看出,get 和 join 方法分别先调用 reportGet,reportJoin, 若得到的空结果,会继续调用 waitingGet 方法,只是参数分别为 true 和 false,waitingGet 方法的实现需要先了解剩余的核心函数以及 Completion 子类,稍后再看。
一些与异步操作的准备:
/** * 标识是异步方法产生的任务的接口,对于异步行为的监控,debug,追踪会很有用。* 在 jdk8 的 CompletableFuture 实现中,它有三个直接实现类,AsyncRun,* AsyncSupply 以及前面提到过的 Completion。*/
public static interface AsynchronousCompletionTask {
}
// 判断是否使用 ForkJoinPool 的 common 线程池,在 ForkJoinTask 中持有该线程池的引用。// 判断规则是可用 cpu 核数大于 1.
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
// 异步线程池,根据上述判断,决定使用 commonPool 还是 ThreadPerTaskExecutor,// 后者是一个对每一个任务都新建一个线程的 low 逼线程池。private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/** low 逼线程池源码,没什么可说的 */
static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) {new Thread(r).start();}
}
static Executor screenExecutor(Executor e) {if (!useCommonPool && e == ForkJoinPool.commonPool())
// 判断参数执行器(线程池的父接口,一般会传入线程池)是否需要屏蔽,// 如果参数就是 ForkJoinPool.commonPool() 并且经前面的系统判断
//useCommonPool 为 false,则强制使用 asyncPool。return asyncPool;
if (e == null) throw new NullPointerException();
// 非空且通过验证,返回参数 e
return e;
}
为异步做的这些准备很好理解,屏蔽不合理的线程池使用,在用户提供的线程池,commonPool 和 ThreadPerTaskExecutor 之中择一,在后续的操作中需要使用它们。
还有两个重要的核心函数,是道格大神的神作。
final void postComplete() {
CompletableFuture<?> f = this; Completion h;// 初始化 f 为 this
while ((h = f.stack) != null ||//1,f 的栈非空
(f != this && (h = (f = this).stack) != null)) {//2 f 的栈为空且不是 this,重置
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {//3 h 出栈
if (t != null) {//4 出栈的 h 不是最后一个元素,最后一个元素直接执行 7 即可,减少一次循环 cas 竞态
if (f != this) {// f 不是 this
pushStack(h);//5 将 f 刚出栈的 h(顶)入 this 的栈(顶)continue;
}
h.next = null; //6 detach 帮助 gc
}
//tryFire 参数为 NESTED,即 -1,这是它唯一一次使用。f = (d = h.tryFire(NESTED)) == null ? this : d;//7 f 栈的最后一个元素或者就是 this 栈中的元素
}
}
}
这寥寥数行代码的含金量不可小觑。它应在将要完成时调用,很明显,它会将当前 CompletableFuture 的栈以及传递依赖的其他 CompletableFuture 的栈清空。为了便于解释,在相应的代码上打出了编号,下面详细分析。
调用该方法,首先进入 1,此时 f 是当前 CompletableFuture,h 是它的 stack,满足不为空的判断,进入 3.
到达 3 时,将栈顶 Completion h 出栈,一般除非并发多个线程对同一个 CompletableFuture 调用 postComplete,否则一定会成功并到达 4。若出现多个线程调用,cas 失败,则重新循环。
到达 4 后,若发现 f 的栈已空,则直接进入 7,否则判断 f 是否为当前 CompletableFuture,若是,则进行 6,取消 h 和 t 的关联,若不是则进入 5,将 h(f 中刚刚移除的栈顶)压入当前 Completable 的栈并重新循环。
显然,只要处理当前 CompletableFuture 的栈,就一定会执行 7,只要处理的是另一个 CompletableFuture 的栈,就会将其出栈,然后压入当前 CompletableFuture 的栈。
在 7 处,会尝试执行栈顶的 Completion 的 tryFile 方法,它会返回一个可能为 null 的 CompletableFuture,若非空,则赋给 f,否则将 this 赋给 f。
所以这段方法的真实执行流程:当前 CompletableFuture 的栈中元素逐个出栈并 tryFile,发现新的 CompletableFuture,将它的元素反向压入本 CompletableFuture 的栈,压入结束后,继续对栈中元素逐个出栈并 tryFire,发现非空 CompletableFuture 则继续上述过程。直到本 CompletableFuture 的栈中不再有元素(此时 tryFire 返回的 CompletableFuture 栈也是空的)为止。
膜拜道格大神的同时,顺便点一下,这似乎是一种避免递归的方式。只不过 tryFire 返回的 CompletableFuture 中的栈元素将会反向执行。
/* 遍历栈并去除死亡任务 /
final void cleanStack() {for (Completion p = null, q = stack; q != null;) {// 初始条件,q 指向 null 时终止。Completion s = q.next;// 循环内第一行,q 永远指向栈顶,s 永远指向栈顶第二个元素或者 null
if (q.isLive()) {// a 只要 q 存活,就将 p 指向 q,并将 q 指向 s
p = q;
q = s;
}
else if (p == null) {//b q 不存活,p 是 null,两种可能,从未见到存活的节点,或执行过最后的重启
casStack(q, s);/ 将 q 出栈
q = stack;// 变量 q 重新指向新的栈顶。}
else {
p.next = s;// q 已死亡,且当前已经找到过存活的元素。p 指向 q 的下一个元素 s,从而将 q 出栈
if (p.isLive())// c 判断 p 是否存活,而 p 只能是 null 或者最近一个存活的 Completion
q = s;//6.q 前进
else {//4
p = null; //d 重新将 p 置 null 并将 q 指向当前的栈,重启循环。q = stack;
}
}
}
}
为了让这段代码的说明更加清晰,不妨举个简单的例子说明。
假定当前 CompletableFuture 的栈中有 1 - 9 个元素,其中 14568 在调用 cleanStack 方法时已死亡,在执行过程中,也出现方法执行过程中出现死亡的状态。
进入循环,p 为 null,q 指向 1,满足循环条件,开始第一轮循环。
第一轮循环进入后,s 指向 2,p 为 null,q 指向 1,是个死亡对象,因此在第一个判断条件 a 处未能通过,b 判断条件为真,q 被移除,循环结束,此时 p 为 null,q 指向 2,栈变为 2 -9.
第二轮循环进入,s 指向 3,p 为 null,q 指向 2,是个存活对象,进入 a,循环结束,p 指向 2,q 指向 3。栈依旧为 2 -9.
第三轮循环进入,s 指向 4,p 为 2,q 指向 3,是存活对象,进入 a,循环结束,p 指向 3,q 指向 4,栈保持 2 - 9 不变。
第四轮循环进入,s 指向 5,p 为 3,q 指向 4,是个死亡对象,p 非空且存活,进入 c,则 p 保持为 3,3 的 next 指向 5,q 指向 5. 循环结束,栈变为 2356789.
第五轮循环进入,s 指向 6,p 指向 3,q 指向 5,是个死亡对象,p 非空且存活,进入 c,p 保持为 3,3 的 next 指向 6,q 指向 6,循环结束,栈变为 236789.
第六轮循环进入,s 指向 7,p 指向 3,q 指向 6,是个死亡对象,假定此时 3 死亡,则 3 的 next 指向 7,进入 d 分支,p 为 null,q 为 2,栈为 23789.
第七轮循环进入,s 指向 3,p 为 null,q 指向 2,是个存活对象,p 指向 2,q 指向 3,栈依旧为 23789.
第八轮循环进入,s 指向 4,p 指向 2,q 指向 3,是个死亡对象,p 非空且存活,进入 c,则 p 保持为 2,q 指向 7,3 的 next 指向 7,栈变 2789.
第九轮进入,s 指向 8,p 指向 2,q 指向 7,是个存活对象,进入 a 分支,p 变为 7,q 变为 8,栈保持 2789. 假定此步之后 2 死亡,但此时 p 已经指向 7.
第十轮进入,s 指向 9,p 指向 7,q 指向 8,是个死亡对象,p 当前指向 7 且存活,所以尽管 2 不存活,仍旧进入分支 c,p 保持为 7,q 指向 9,7 的 next 指向 9. 栈为 279.
第十一轮,s 为 null,p 指向 7,q 指向 9,是个存活对象,则进入 a 分支,p 变为 9,q 变为 null,栈保持 279.
因 q 为 null,循环终止。栈经过清理只剩下 279 三个元素,其中 2 因为巧合而死亡且未被清理。
下面回到 Completion,Completion 是一个抽象类,前面已经简单展示它的源码,它的子类如下:
可以看到有三个直接子类,CoCompletion,Signaller 和 UniCompletion。UniCompletion 又有若干子类,它们分别作为一些 CompletionStage 中声明方法的实现工具,很明显,道格大神在此处大量使用了策略模式。
先来简单看一下 CoCompletion 的实现:
static final class CoCompletion extends Completion {
//CoCompletion 完全委托给 base 执行。BiCompletion<?,?,?> base;
CoCompletion(BiCompletion<?,?,?> base) {this.base = base;}
final CompletableFuture<?> tryFire(int mode) {
BiCompletion<?,?,?> c; CompletableFuture<?> d;
if ((c = base) == null || (d = c.tryFire(mode)) == null)
//base 未指定,或 base 的 tryFire 返回 null,则返回 null。return null;
base = null; // 解除关联,再 isLive 判断为死亡。// 返回的 d 就是 base 的 tryFire 返回的非空 CompletableFuture
return d;
}
final boolean isLive() {
BiCompletion<?,?,?> c;
// 存活标准,base 非空且 base 的 dep 非空。return (c = base) != null && c.dep != null;
}
}
CoCompletion 虽然是 Completion 的直接子类,但它依赖了 BiCompletion,且 BiCompletion 是 UniCompletion 的直接子类,先来看 UniCompletion.
abstract static class UniCompletion<T,V> extends Completion {
Executor executor;// 用来执行任务的执行器
CompletableFuture<V> dep; // 要完成的依赖 CompletableFuture
CompletableFuture<T> src; // 作为行为源的 CompletableFuture
UniCompletion(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src) {this.executor = executor; this.dep = dep; this.src = src;}
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {//1
//compareAndSetForkJoinTaskTag 是 ForkJoinTask 的方法,利用 cas,保证任何一种情况下,该行为只能执行一次。if (e == null)
// 不精确的说法是同步调用返回 true,异步调用返回 false,然后在线程池中执行。// 有 e 代表要在执行器中执行,尽管大多数情况下 e 都是线程池实例,会异步运行任务。但对于 Executor 来说,完全可以实现在同一个线程执行。return true;//2.
// 对于 3 这行代码,道格大神注释就写了个 disable,为此我翻了大量代码,发现根本过不了上面 cas 这一关,所以个人有两个理解://1. 对于当前 Completion 而言,它的线程池只能用来做一次事,在 claim 之后立即置空,尽管此时还没有执行 action,也不允许当前 Completion 使用它做别的事了。//2. 减少了一个指向该线程池的引用,线程池也有被 gc 的时候吧。就算不 gc,关闭虚拟机或者 dump 的时候也能少做点事。executor = null; // 3.
e.execute(this);// 使用该线程池异步执行,回忆上面 Completion 的声明,它实现了 runnable,在 run 方法中 tryFire(ASYNC), 参数 ASYNC 是正数。}
return false;
}
final boolean isLive() { return dep != null;}
}
尽管 UniCompletion 本身代码不多,但是有关代码却很绕,后面会从 CompletableFuture 调用开始说明一个完整的工作流,作者本来有几次都已经十分艰难的“确定发现问题”,写出了“问题”,但最终还是在描述过程中启动大脑自我否定,不得不佩服道格大神强大的逻辑和大脑。
很明显,UniCompletion 是一个可以拥有执行器的 Completion,它是两个操作的结合,dep 为要最终执行的依赖操作,src 为来源 CompletableFuture,tryFire 没有默认实现,它的子类分别根据不同情况实现了该方法,实现的方式依旧是优雅的策略模式。
claim 方法要在执行 action 前调用,若 claim 方法返回 false,则不能调用 action,原则上要保证 action 只执行一次。
claim 的意思是声称,开个玩笑,在美剧行尸走肉第四季,有一伙武装分子解决为了解决内部分配问题的提出了一个办法,对任何事物只看谁先喊一句”claimed“,代表”我要了“。调用 claim 方法和稍后运行 action 的动作发生在一个线程,因此需要该线程尝试去 claim 这个 action,claim 成功则执行,claim 不成功则不执行。
但在提供 Executor 的前提下,claim 除了声明以外,还会直接在方法内使用该 executor 执行 tryFire,间接地执行 action,并返回 false,避免调用者也执行 action,因为有 cas 的效果,多次 claim 只有第一次可能返回 true。
接下来看 BiCompletion,它也是一个抽象类,不同在于它有两个源,也就是它的成员 dep 要等到另外两个成员 CompletableFuture(src,snd)完成,具体的依赖关系要看子类实现。
abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
CompletableFuture<U> snd; // 第二个源 action
BiCompletion(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src, CompletableFuture<U> snd) {super(executor, dep, src); this.snd = snd;
}
}
BiCompletion 有多个实现类,看名称可以看到 Apply,Accept,Run 等字眼,前面已经讨论过相应的语义。
以 OrApply 为例
static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
Function<? super T,? extends V> fn;
OrApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
CompletableFuture<U> snd,
Function<? super T,? extends V> fn) {
// 构造函数,多传一个函数,该函数就是 dep 对应的 action。super(executor, dep, src, snd); this.fn = fn;
}
//tryFire 父类没有实现
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null ||// 没有 dep,则没有相应的依赖行为,已经执行过的 dep 会是 null。// 执行 orApply 返回 false,则返回 null。最后一个参数仅当 mode 是 ASYNC(只有它大于 1)时会是 this
!d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
// 到此可能是运行过,或者不满执行 fn 的条件,返回 null。return null;
// 前面 dep 不是 null,执行 orApply 也成功了,则解除引用关联,下次运行会直接返回 null,也不影响 gc。// 回忆前面看过的核心函数 postComplete, 会对 CompletableFuture 中栈上的所有 Completion 进行 tryFire,// 返回非 null 则进行类似递归的操作,很明显,在调用 postComplete
// 方法时,dep 为 null 会返回一个 null,避免了再次 tryFire。dep = null; src = null; snd = null; fn = null;
// 正常运行结束,调用 dep 的 postFire 并返回。return d.postFire(a, b, mode);
}
}
orApply 方法定义在 CompletionFuture。前面没有叙述。它不是作者称为的”核心函数“(即各种 Completion 都能使用到)。
final <R,S extends R> boolean orApply(CompletableFuture<R> a,
CompletableFuture<S> b,
Function<? super R, ? extends T> f,
OrApply<R,S,T> c) {
Object r; Throwable x;
if (a == null || b == null ||
// 为 r 赋值用于后续的计算,因为是 or,r 优先取第一个,第一个源 action 未完成的情况下再取第二个。((r = a.result) == null && (r = b.result) == null) || f == null)
// 首先检测两个源 action,若 a 和 b 均未完成,则说明依赖 dep 不可被执行,返回 false。return false;
// 仅当当前(dep)未完成(result 为 null)时,可进行完成工作。tryComplete: if (result == null) {
try {
// 前面说过,c 不为 null 说明是异步执行,需要先去尝试 claim 这个 action。if (c != null && !c.claim())
// 异步且 claim 不成功,返回 false。return false;
if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {
// 如果 r 表示异常,调用 completeThrowable 核心函数并结束代码块,直接返回 true。completeThrowable(x, r);
break tryComplete;
}
// 第一个非空的 action(a 或 b)结果代表异常,但 ex 是 null,则将 r 置为 null 并返回 true。r = null;
}
// r 不代表异常结果,直接强转,用该结果作为 action 的参数,执行 action,用结果作为当前的 result。出现异常则进入 catch 块。@SuppressWarnings("unchecked") R rr = (R) r;
completeValue(f.apply(rr));
} catch (Throwable ex) {
// 上述代码出现异常,调用 completeThrowable 完成 dep(this)completeThrowable(ex);
}
}
return true;
}
正常运行结束还会调用 dep 的 postFire,它也位于 CompletableFuture 中,但它只供 BiCompletion 在 tryFire 成功之后才可使用,该方法源码如下:
final CompletableFuture<T> postFire(CompletableFuture<?> a,
CompletableFuture<?> b, int mode) {
// 对于 ab 两个源,先处理 b,后处理 a
if (b != null && b.stack != null) {
// b 存在且 b 的栈还有元素
if (mode < 0 || b.result == null)
// 当为 NESTED(只有它的值是 -1)时,或者 b 没有结果时,对 b 进行清栈。调用 postFire 意味着 d 执行 tryFire 成功,// 即 d 获得了结果,而这前提是 ab 之一已执行成功 (orApply 的含义),所以 ab 可能是其一完成。b.cleanStack();
else
// 非 NESTED, 则对 b 进行 postComplete, 该方法内部又会对 b 的栈上的每一个 Completion 执行 tryFire,而且用 NESTED 模式。b.postComplete();}
// 接下来对 a 直接进行 postFire,并沿用 mode。return postFire(a, mode);
}
对 a 进行 postComplete 的方法如下:
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {if (a != null && a.stack != null) {
// 栈非空
if (mode < 0 || a.result == null)
// 类似上面的逻辑,是 NESTED 模式时或者 a 未完成时,对 a 进行清栈,否则对 a 执行 postComplete.
a.cleanStack();
else
a.postComplete();}
// 处理 a 之后,处理当前(即 dep)if (result != null && stack != null) {
// 有结果且栈非空
if (mode < 0)
//NESTED 模式,直接返回 this。return this;
else
// 非 NESTED 模式,执行 postComplete,其中会对 d 的栈中所有 Completion 进行 tryFire(NESTED),// 并在每一个 tryFire 返回的 CompletableFuture 逆栈执行同一样操作,参见上面的源码。postComplete();}
return null;
}
以上是全部与 OrApply 的实现有关的源码,下面来看一看 OrApply 的应用,再简单梳理一下流程。
在 CompletableFuture 中有三个有关的方法:
可以看到三个方法的签名和调用信息, 这三个方法均是实现自 CompletionStage。关于方法的字意和大致逻辑的推测方法前面已分析。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
// 直接调用 orApplyStage,不指定线程池。return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
// 调用 orApplyStage 方法,外部不提供线程池,使用 asyncPool,关于 asyncPool 前面已分析。return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
// 调用 orApplyStage 方法,但对外面传入的线程池进行屏蔽,条件符合则使用,不符合则更换,屏蔽原则前面已分析。return orApplyStage(screenExecutor(executor), other, fn);
}
可见三个方法均使用了 orApplyStage 方法,只是在参数上有所不同。再来看 orApplyStage 方法。
private <U extends T,V> CompletableFuture<V> orApplyStage(
Executor e, CompletionStage<U> o,
Function<? super T, ? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
// 要执行的函数未提供,或者参数 o 转换的 CompletableFuture 也是 null,则抛出空指针。throw new NullPointerException();
// 新建了一个 dep,后面将它返回,故直接调用实现自 CompletionStage 的方法不用考虑返回空的问题,可以链式调用。CompletableFuture<V> d = new CompletableFuture<V>();
// 如果指定了线程池,直接进入 if。未指定线程池,首先尝试调用 orApply 方法,并以 this 和 b 作参数。// 前面分析过,若条件满足,即 this 和 b 有一个是完成态,则会立即执行 f,结果或异常作为 d 的结果。//d.orApply 的最后一个参数是 null(c),说明是同步操作,不会进行 c.claim 操作。if (e != null || !d.orApply(this, b, f, null)) {
// 指定了线程池,或者尝试 d.orApply 条件不满足,转为异步。// 构建 OrApply 对象压入 Completion 栈。OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
orpush(b, c);
// 压栈后再次尝试同步调用一次 tryFire,前面分析过,tryFire 成功会最终调用相应的 cleanStack,postComplete 等操作,// 将死亡的 Completion(各子类有不同的判定,CoCompletion 判定 base 是 null,有些判断 dep 是 null,而完成一般会把 dep 置 null)// 从栈上移除。c.tryFire(SYNC);
}
return d;
}
public CompletableFuture<T> toCompletableFuture() {
// 直接返回 this
return this;
}
final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {if (c != null) {
// 循环条件,b 不存在或未完成且同时当前 CompletableFuture 未完成。有任何一个完成则终止,若无完成,则执行下面的代码将任务入 this 和 b 的栈。while ((b == null || b.result == null) && result == null) {
// 将 c 压入当前 CompletableFuture 栈并退出循环。if (tryPushStack(c)) {if (b != null && b != this && b.result == null) {
// 存在 b,b 不是当前,b 未完成时。尝试将 c 封装成 CoCompletion 并压入 b 的栈,前面说过
// 这个压入 b 栈的 q 完全依赖于 c,并使用 c 的运行结果。Completion q = new CoCompletion(c);
// 内循环,参数外循环说明。while (result == null && b.result == null &&
!b.tryPushStack(q))
lazySetNext(q, null); // clear on failure
}
break;
}
// 到此说明 c 压入当前栈失败,则将 c 的 next 恢复为 null。lazySetNext(c, null); // clear on failure
}
}
}
简单梳理 OrApply 这一条线的流程,其他线逻辑类似。
当使用 Completable 的 applyToEitherAsync/applyToEither 时,将进入这一条线的代码执行,CompletableFuture 在初步验参后,会封装一个 d 用于表示结果的 CompletableFuture,稍后将会用它作为返回值。随后根据入参不同而进入不停的逻辑。
同步的情况,即未提供 Executor,首先就尝试调用它的 d.uniApply 方法,若此时当前 CompletableFuture 或参数中的另一个 stage 已完成,则用完成的结果直接执行用户指定的 action 并对 d 的结果进行赋值,并进一步完成 d 的后续清栈和 postComplete(1);若此时当前的 Completable 或另一个 stage 未完成,则不满足执行 action 的条件,将当前 Completable 作为第一个 source,另一个 stage 作为第二个 source,封装成一个 OrApply 并压当前 CompletableFuture 和另一个 stage 的栈(2),随后立即以同步方式调用它的 tryFire(1)。
异步的情况,直接封装 OrApply 对象,将由线程池间接调用 tryFire(3),进一步调用 orApply 方法,因为是异步,即使满足了前面的条件(ab 之一正常或异常完成),依旧需要进行 claim,claim 失败则不会执行 action。claim 成功,执行 action 出现异常,则用异常来完成这个 action。
以上三种情况最终都会执行 action,标注了(1)和(3)是很明确的两种情况。
任何一个 CompletableFuture 完成后,都会根据 mode 进行后续处理,其实尽管每个 Completion 都具备一个 next 指针,但每一个 Completion 的完成均不依赖于栈中的其他 Completion,仅在 cleanStack,压栈,postComplete 使用了该栈的结构。现在来回答前面分析时发现的两个问题。
1. 当前 CompletableFuture 在完成后,执行 postComplete,会将它自身的栈中 completion 出栈并执行 action,若要产生新的 CompletableFuture,则将它的栈反向压入自身的栈,然后重复执行出栈 - 执行的操作。反向压栈有问题吗?答案是没有。因为栈中的每一个 Completion 在执行上互不影响,它们的顺序只影响到 cleanStack 和 postComplete 的处理顺序。CompletableFuture 和它的栈元素产生的 CompletableFuture 彼此间有顺序要求,但对同一个 CompletableFuture 的栈内的 Completion 元素彼此间没有顺序要求,决定他们顺序的是对源 CompletionFuture 调用 orApply,thenApply 等等方法的顺序,后续运行也完全独立。只不过在源 CompletableFuture 进行 postComplete 时,执行的顺序将会与原本的”先来后到“相反。
2.cleanStack 到一半,p 指向的 Completion 依旧存活,位于 p 以上的 Completion 已执行完毕,那么不会重新开始循环,p 之前的死亡 Completion 会留在栈中。这也是为什么前面使用 OrApply 来解释这个问题的原因,因为很可能就不存在这个问题。根据前面的源码,仅有 postComplete 触发的 tryFire 会使用 NESTED(-1) 模式,只有 NESTED 模式下,或者源 CompletableFuture 的 result 为 null(未完成)的情况下执行 postFire 才会进入到 cleanStack,否则会进入 postComplete,后者会将所有元素出栈并执行存活元素,显然不存在要考虑存活的问题。而只有 or 且为 BiCompletion 的情况下,才可能出现两个源之一实际并未完成,这样在非 NESTED 模式下调用 cleanStack 方法。
可见 2 的问题是存在的。但它对于整体的运行结果是无影响的,后续该 source 执行完毕,调用自身的 postComplete 时,将已死亡的 Completion 出栈并 tryFire,会发现诸如”dep=null” 等情况,直接返回 null,则 postComplete 方法中的 f 会保持指向 this 并继续迭代下一个栈元素。
目前关于 2 中提到的 cleanStack 的调用只出现在 UniCompletion 成功后调用 postFire 时依赖模式和 result 运行。其实还有一种情况,就是前面提了一次的,属于 future 接口的 get 方法,以及类似的 join 方法。
前面提到,get 和 join 方法都会在获取不到结果是按条件轮循 watingGet 方法,下面来看 waitingGet 方法。
private Object waitingGet(boolean interruptible) {
Signaller q = null;// 信号器
boolean queued = false;// 是否入队
int spins = -1;// 自旋次数
Object r;// 结果引用
// 循环条件是只等待 result,内部有根据扰动决定的 break
while ((r = result) == null) {
// 自旋次数只有第一次进来是负值,后续只能是 0 或其他正数。if (spins < 0)
// 自旋次数,多处理器下初始化为 16,否则为 0,即不自旋。设置值后此次循环结束。spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0;
// 第二次循环时才会判断自旋次数。只要 spins 大于 0 就继续循环,直到达到 0 为止再执行下面的 else 代码。else if (spins > 0) {
// 仅当下一个种子数不小于 0 时,减小一次自旋次数。nextSecondarySeed 是 Thread 类中使用 @Contended 注解标识的变量,// 这与传说中的伪共享有关。if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
// 停止自旋后的第一轮循环,result 依旧是 null,则对 q 进行初始化,关于 Signaller 后续再讲。else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
// 初始化 q 后的下一轮循环(停止自旋后的第二轮),queued 是 false,将上一轮循环初始化的 q 压入栈。else if (!queued)
queued = tryPushStack(q);
// 停止自旋后的若干次循环(上一步可能压栈失败,则下一轮自旋会再次压栈,直到成功)后,判断是否可扰动。else if (interruptible && q.interruptControl < 0) {
// 扰动信号匹配,将 q 的有关字段全部置空,顺带清一下栈,返回 null。q.thread = null;
// 这个清栈的过程,细看上面的解释还有有关的源码,可能会发出一个疑问,cleanStack 只能清除 isLive 判断 false 的 Completion,// 但目前的实现,基本上都只能在 dep 为 null,base 为 null 等仅当 dep 执行完成的情况发生,而 dep 完成的情况是当前 CompletableFuture 的
//result 不是 null,而方法运行到此,很明显 result 必然是 null,那么还有必要清栈吗?// 答案是必要的,首先将来也许能出现存活或死亡状态与 source 的 result 无关的 Completion,那么此处清一下栈也是帮助后面的工作。// 其次,刚才压入栈的 q 在 thread 指向 null 时即已死亡,它也必须要进行清除。cleanStack();
return null;
}
else if (q.thread != null && result == null) {
// q 关联的线程存在,即 q 存活,且依旧没有执行完毕,使用 ForkJoinPool 的阻塞管理机制,q 的策略进行阻塞。try {ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
// 阻塞是可以扰动的,此时会将 q 的扰动控制信号设置为 -1,则下一次循环时将可能进入上一个 else if。q.interruptControl = -1;
}
}
}
// 前面的循环没有 break,能执行到此,只有 result 获得非 null 值的情况。if (q != null) {
// 若 q 不是 null,说明没有在自旋阶段获取到 result,需要对它进行禁用。q.thread = null;
if (q.interruptControl < 0) {if (interruptible)
// 可扰动且有扰动信号,则说明扰动后未能进入上面带有 cleanStack 的那个 else if,// 可能是恰好在这次循环开始时获取到了非空 result,从而退出循环,也可能是参数 interruptible 为假,// 在外部扰动了当前线程后,依旧等到了 result。// 只要发生了扰动,就将结果置 null,外面调用者如果是 join,可以报出扰动。r = null; // report interruption
else
// 如果不可扰动,则中断当前线程(创建 q 的线程)。Thread.currentThread().interrupt();
}
}
// 当前 future 已经有结果,进行 postComplete 逻辑并返回 r。postComplete();
return r;
}
根据该方法的注释,waitingGet 方法只会有两个结果,null(可扰动并且扰动了)和原始的 result。而 get 方法可扰动,也即可返回 null,join 方法不可扰动,只能等待结束或抛出异常。
waitingGet 方法中出现了第三个也是最后一个 Completion 的直接子类 Signaller,前面没有对它进行介绍,不过它也只使用在此处,因此可以一并介绍。
static final class Signaller extends Completion
implements ForkJoinPool.ManagedBlocker {
long nanos; // 计时的情况下,要等待的时间。final long deadline; // 计时的情况下指定不为 0 的值
volatile int interruptControl; // 大于 0 代表可扰动,小于 0 代表已扰动。volatile Thread thread;// 持有的线程
Signaller(boolean interruptible, long nanos, long deadline) {this.thread = Thread.currentThread();
this.interruptControl = interruptible ? 1 : 0;// 不可扰动,赋 0
this.nanos = nanos;
this.deadline = deadline;
}
final CompletableFuture<?> tryFire(int ignore) {//ignore 无用
Thread w; //Signaller 自持有创建者线程,tryFire 只是单纯唤醒创建它的线程。if ((w = thread) != null) {
thread = null;// 释放引用
LockSupport.unpark(w);// 解除停顿。}
// 返回 null,当 action 已执行并进行 postComlete 调用时,f 依旧指向当前 CompletableFuture 引用并解除停顿。return null;
}
public boolean isReleasable() {
// 线程是空,允许释放。这可能是某一次调用本方法或 tryFire 方法造成。if (thread == null)
return true;
if (Thread.interrupted()) {
// 如果调用 isReleasable 方法的线程被扰动了,则置扰动信号为 -1
int i = interruptControl;
interruptControl = -1;
if (i > 0)
// 原扰动信号是”可扰动“,则是本次调用置为”已扰动“,返回 true。return true;
}
// 未定时(deadline 是 0)的情况只能在上面释放,定时的情况,本次计算 nanos(deadline-System.nanoTime())
// 或上次计算的 nanos 不大于 0 时,说明可以释放。if (deadline != 0L &&
(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
// 只要可释放,将创建者线程的引用释放。下次调用直接返回 true,线程运行结束销毁后可被 gc 回收。thread = null;
return true;
}
// 仍持有创建者线程,调用此方法的线程未扰动或当前扰动不是第一次,未定时或不满足定时设置的一律返回 false。return false;
}
public boolean block() {
//block 方法
if (isReleasable())
// 判断可释放,直接 return true。return true;
// 判断 deadline 是 0,说明不计时,默认 park。else if (deadline == 0L)
LockSupport.park(this);
else if (nanos > 0L)
// 计时情况,park 指定 nanos。LockSupport.parkNanos(this, nanos);
// 睡醒后再次返回 isReleasable 的结果。return isReleasable();}
// 创建者线程引用被释放即代表死亡。final boolean isLive() { return thread != null;}
}
Signaller 是一个 Completion 的直接子类,同时实现了 ForkJoinPool 的内部接口 ManagedBlocker,这使得它可以在当 ForkJoinPool 出现大量线程阻塞堆积时避免饥饿。
Signaller 的作用是持有和释放一个线程,并提供相应的阻塞策略。
前面提到的 waitingGet 方法创建了一个 Signaller(interruptible, 0L, 0L),类似的,可以看到 timedGet 方法使用 Signaller(true, nanos, d == 0L ? 1L : d) 来进行阻塞的管理,管理的方法依赖 ForkJoinPool 内部的
ForkJoinPool.managedBlock(q) 来实现,而这用到了被 Signaller 实现的 ForkJoinPool.ManagedBlocker,managedBlock 方法源码如下。
//ForkJoinPool 的 managedBlock 方法。public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();// 调用此方法的线程,即前面的 Signaller 的创建者线程。if ((t instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
// 调用 managedBlock 方法的线程是 ForkJoinWorkerThread,则它可运行在 ForkJoinPool 中。此处要求内部持有 pool 的引用。WorkQueue w = wt.workQueue;
// 循环,只要判断 blocker( 即 Signaller)不可释放。while (!blocker.isReleasable()) {
// 尝试用 ForkJoinPool 对当前线程的工作队列进行补偿。//tryCompensate 方法会尝试减少活跃数并可能创建或释放一个准备阻塞的 worker 线程,// 它会在发生竞态,脏数据,松弛或池终止时返回 false。// 关于 ForkJoinPool 的详情单独准备文章。if (p.tryCompensate(w)) {
try {
// 补偿成功,不停地对线程池尝试先 isReleasable 再 block,任何一个方法返回 true 则终止循环。do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
// 出现任何异常,或循环终止时,控制信号加上一个活跃数单元,因为前面通过补偿才会进入循环,已减少了一个单元。U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
}
}
else {
// 当前线程不是 ForkJoinWorkerThread 或不持有 ForkJoinPool 的引用。连续先尝试 isReleasable 再尝试 block,直到有一者返回 true 为止。do {} while (!blocker.isReleasable() &&
!blocker.block());
}
}
关于 ForkJoinPool 本文不做额外介绍,只列举这一个方法,到此为止,对于 CompletableFuture 的主要接口(继承自 CompletionStage)和实现已经描述完毕(其实只过了一个特殊案例的接口,但是前面提到过,其他接口的逻辑和实现方式类似,无非就是 run,active,apply 的更换,或 either,both,then,when 等,有上面的基础,再凭借规则推测语义,源码并不难理解。
CompletableFuture 还有一些独立声明的公有方法,源码也有些非常值得借鉴的地方,如 allOf,anyOf 两个方法。
//anyOf 方法,返回一个 CompletableFuture 对象,任何一个 cfs 列表中的成员进入完成态(正常完成或异常),则它也一并完成,结果一致。public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
// 直接调用 orTree
return orTree(cfs, 0, cfs.length - 1);
}
//allOf 方法,当所有 cfs 列表中的成员进入完成态后完成(使用空结果),或有任何一个列表成员异常完成时完成(使用同一个异常)。public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
// 直接调用 andTree
return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
int lo, int hi) {
// 声明一个后续返回的 dep
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (lo > hi) // 验参
d.result = NIL;
else {
CompletableFuture<?> a, b;
// 折半验证参数并归并。每相邻的两个成员会在一个递归中生成另一个 'd',// 总量奇数的最后一个单独表示这个 d。int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
// 调用 d.biRelay 的中继方法尝试完成。if (!d.biRelay(a, b)) {
// 不满足完成条件,生成一个中继并压栈,再次尝试同步完成。若不满足条件,ab 任何一个完成后都会再间接调用它的 tryFire。BiRelay<?,?> c = new BiRelay<>(d, a, b);
a.bipush(b, c);// 除非 ab 均完成,否则 bipush 要进 ab 两者的栈。c.tryFire(SYNC);
}
}
return d;
}
//biRelay 方法,有前面的基础,很简单,只要 ab 之一任何一个未完成则返回 false,都完成且 dep 未完成则进入相应的正常异常完成策略,// 不论 dep 是否已完成,只要 ab 均已完成,则返回 true
boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
Object r, s; Throwable x;
if (a == null || (r = a.result) == null ||
b == null || (s = b.result) == null)
return false;
//biRelay 是尝试根据两个 CompletableFuture 完成 dep,因为三个 complete* 方法均已做到原子性,也没有 action 要执行,因此它不需要 claim。if (result == null) {if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
completeThrowable(x, s);
else
// 正常情况,用 null 完成。completeNull();}
return true;
}
// 压入栈的 BiRelay
static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
BiRelay(CompletableFuture<Void> dep,
CompletableFuture<T> src,
CompletableFuture<U> snd) {super(null, dep, src, snd);
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null || !d.biRelay(a = src, b = snd))
// 已经完成过,或者未完成,本次也不能完成,返回一个 null
return null;
//BiRelay 通过 BiCompletion 间接继承了 UniCompletion,因此 dep 取 null 代表死亡。// 这样也能规避错误的 tryFire,如当它已被完成,持有的 dep 引用置 null,当 d 进行 postFire 的 postComplete 时会保持 f =this 并持续出栈
//dep 未完成时清栈也能有效移除已完成的任务。src = null; snd = null; dep = null;
return d.postFire(a, b, mode);
}
}
//orTree 类似上面的 andTree,有一个完成或异常,就用它的结果或异常作为返回的 CompletableFuture 的结果或异常。static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
int lo, int hi) {CompletableFuture<Object> d = new CompletableFuture<Object>();
if (lo <= hi) {
CompletableFuture<?> a, b;
int mid = (lo + hi) >>> 1;
// 同上
if ((a = (lo == mid ? cfs[lo] :
orTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
orTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
// 同上,下面简述 orRelay 和 OrRelay
if (!d.orRelay(a, b)) {OrRelay<?,?> c = new OrRelay<>(d, a, b);
// 除非 ab 任何一个已完成,否则 orpush 要进栈,且只进一个栈。a.orpush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
// 很明显,orRelay 就是两个 CompletableFuture 的或关系中继者。final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
Object r;
if (a == null || b == null ||
((r = a.result) == null && (r = b.result) == null))
return false;
// 只要 ab 有一个有结果,立即进行完成工作。if (result == null)
// 前面已介绍过 completeRelay 函数,r 可以是异常。completeRelay(r);
// 只要 ab 有一个完成或异常,即返回 true
return true;
}
static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src,
CompletableFuture<U> snd) {super(null, dep, src, snd);
}
final CompletableFuture<Object> tryFire(int mode) {
CompletableFuture<Object> d;
CompletableFuture<T> a;
CompletableFuture<U> b;
if ((d = dep) == null || !d.orRelay(a = src, b = snd))
return null;
// 置空的理由同上。src = null; snd = null; dep = null;
return d.postFire(a, b, mode);
}
}
后语
到此为止,关于 CompletableFuture 在 JDK8 的源码解析完成。
每一次读道格大神的代码时都会很痛苦,但是当痛苦变成豁然开朗时,接下来的是舒爽。
毫无疑问,道格大神的实现一如既往的优雅和巧妙,甚至不能用技术来形容,而应该用”艺术“来形容。
前面解释过选择 CompletableFuture,因为在现有的若干框架中发现了与它的父接口 CompletionStage 有关,而 CompletableFuture 是官方给出的实现。同时,它也是响应式编程里的典型,在 jdk 出品的 HttpClient 中也有与之结合的部分。我相信前面的简单例子加上源码分析,再加上对 api 的稍加理解,很快就可以上手响应式编程。
简单回顾:
1.CompletableFuture 实现了 CompletionStage 接口的一系列由 either,apply,run,then 等关键字组成的方法,可以链式调用。
2.CompletableFuture 可以不去链式调用,每个 CompletableFuture 均可以单独调用多个 1 中提到的方法,结果是立即尝试执行这些方法,若执行条件不满足,也会自身维护一个栈的结构,栈中的各方法用单向链表的形式连接,在清栈操作或完成后置处理时按这个顺序或局部逆序 fire,但任何时候执行时彼此之间不影响。
3. 调用 1 中的方法并返回新的 CompletableFuture 的 CompletableFuture 在源码的注释中称为 source,即源,生成的即 d 或 dep,作者花了很久去理解这两个概念。源的完成一般为 dep 的基本条件,dep 可以依赖一源或多源的完成,这里的完成包含正常 result 或异常 result。
4.CompletableFuture 对 null 结果和异常结果进行了编码,详情见上面的分析。
5.CompletableFuture 基本无锁,get 或 join 结果除外,但该锁阻塞的是查询线程,而不是用来计算和完成任务的线程,这些线程在 CompletableFuture 的代码中无锁,当然,可以在相应的栈元素中使用锁。
6.CompletableFuture 中的栈以 Completion 为实现,它是 ForkJoinTask,可以在 ForkJoinPool 中运行,它有各种派生类。各派生类可以实现任务的”生死“判断,”fire 操作“,以及对有执行器(线程池或同步的执行器)时的逻辑处理。
7. 通过 CompletableFuture 指定的 action 最多能有三种调用时机,但鉴于这太实现具体,将来随时可能变成四种五种,没有太大的参考价值。首先在 action 被声明时,若没有指定线程池,会进行一次调用尝试(1); 若提供了线程池或(1)的尝试失败(一般是 source 未完成),则将生成的 dep,源,action 封装成一个 Completion 实例并入栈,入栈成功后进行第二次尝试,在这次尝试中,未提供执行器的会同步尝试一次 fire(2); 提供了执行器(线程池是一种异步的执行器)的,需要先 claim,并只能 claim 一次,在 claim 方法中异步地执行,且这种情况下 claim 一定会失败,但会将执行器引用释放,下次 claim 因为 cas 的原因也一定会失败,这样保证了仅一次执行(3)。
一些 api 可以有一个线程池的成员变量,它可以在一开始在入口用线程池异步执行(静态方法为主,如 public static CompletableFuture<Void> runAsync(Runnable runnable) 这种入口方法,它负责第一个 CompletableFuture 的生成。