关于express:任务编排CompletableFuture从入门到精通

5次阅读

共计 11687 个字符,预计需要花费 30 分钟才能阅读完成。

前言
最近遇到了一个业务场景,波及到多数据源之间的申请的流程编排,正好看到了一篇某团介绍 CompletableFuture 原理和应用的技术文章,次要还是波及应用层面。网上很多文章波及原理的局部讲的不是特地具体且比拟形象。因为波及到多线程的工具必须要了解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下 CompletableFuture 的工作原理
背景
咱们把 Runnable 了解为最根本的线程工作,只具备在线程下执行一段逻辑的能力。为了获取执行的返回值,发明了 Callable 和与其配合应用的 Future。为了将工作之间进行逻辑编排,就诞生了 CompletableFuture。对于如何了解工作的逻辑编排,举一个简略的例子:

关上电脑 - 更新零碎这两个操作是有先后顺序的,然而泡茶和这两个操作没有先后顺序,是能够并行的,而开始办公必须要期待其余操作完结之后能力进行,这就造成了工作编排的执行链。
在 IO 密集型零碎中,相似的场景有很多。因为不同数据集的查问依赖主键不同,A 数据集的查问主键是 B 数据集的一个字段这种状况很常见,通常还须要并发查问多个数据集的数据,所以对于多线程的执行编排是有需要的。
一种解决办法是 CountDownLatch,让线程执行到某个中央后进行期待,直到依赖的工作执行完结。对于一些简略的执行链是能够满足的,然而当编排逻辑简单起来,CountDownLatch 会导致代码难以保护和调试。所以诞生了 CompletableFuture 用来形容和保护工作之间的依赖关系以进行工作编排。在理论利用中,有以下两类场景是适宜应用工作编排的:

多数据源申请的流程编排

非阻塞化网关等 NIO 场景

应用形式
创立与执行
同步办法
和 FutureTask 相似,CompletableFuture 也通过 get()办法获取执行后果。然而不同的是,CompletableFuture 自身能够不承载可执行的工作(相比 FutureTask 则必须承载一个可执行的工作 Callable),通过一个用于标记执行胜利并设置返回值的函数,在应用上也更为灵便,如下:

CompletableFuture<String> demo = new CompletableFuture<>();
demo.complete("success");
System.out.println(demo.get());

复制代码

执行后果:success

和 Future 相似,get()函数也是同步阻塞的,调用 get 函数后线程会阻塞直到调用 complete 办法标记工作曾经执行胜利。
除了手动触发工作的实现,也能够让创建对象的同时就标记工作实现:

CompletableFuture<String> demo = CompletableFuture.completedFuture("success");
System.out.println(demo.get());

复制代码

执行后果:success

异步办法
相比于同步办法,异步执行更为常见。比方上面这个例子:

    CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {System.out.println("do something by thread" + Thread.currentThread().getName());
        return "success";
    });
    System.out.println(demo.get());

复制代码

执行后果:
do something by threadForkJoinPool.commonPool-worker-9
success

supplyAsync 办法接管一个 Supplier 对象,逻辑函数交给线程池中的线程异步执行

默认会应用 ForkJoinPool 的公共线程池来执行代码(不举荐),当然也能够指定线程池,如下:

ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {System.out.println("do something by thread" + Thread.currentThread().getName());
    return "success";
}, executor);
System.out.println(demo.get());

复制代码

执行后果:
do something by threadpool-1-thread-1
success

如果不须要执行后果,也能够用 runAsync 办法:

CompletableFuture.runAsync(() -> {System.out.println("do something by thread" + Thread.currentThread().getName());
});

复制代码

执行后果:
do something by threadForkJoinPool.commonPool-worker-9

多任务编排
多任务编排是 CompletableFuture 的外围,这里列举不同的场景来进行阐明
一元依赖

步骤 2 须要依赖步骤 1 执行结束能力执行,相似主线程的程序执行,能够通过以下形式实现:

  ExecutorService executor = Executors.newFixedThreadPool(4);
  CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {System.out.println("执行【步骤 1】");
    return "【步骤 1 的执行后果】";
  }, executor);

  CompletableFuture<String> step2 = step1.thenApply(result -> {System.out.println("上一步操作后果为:" + result);
    return "【步骤 2 的执行后果】";
  });
  System.out.println("步骤 2 的执行后果:" + step2.get());

复制代码

执行后果:
执行【步骤 1】
上一步操作后果为:【步骤 1 的执行后果】
步骤 2 的执行后果:【步骤 2 的执行后果】

通过 thenApply 办法,接管上一个 CompletableFuture 对象的返回值,其中隐含的逻辑是,该处逻辑只有等上一个 CompletableFuture 对象执行完后才会执行
二元依赖
相比于一元依赖的程序执行链,二元依赖更为常见,比方上面这个场景:

步骤 1 和 2 是并行的,而步骤 3 须要等步骤 1 和 2 执行完之后能力执行,通过 CompletableFuture 是这么实现的:

    ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {System.out.println("执行【步骤 1】");
        return "【步骤 1 的执行后果】";
    }, executor);

    CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {System.out.println("执行【步骤 2】");
        return "【步骤 2 的执行后果】";
    }, executor);

    CompletableFuture<String> step3 = step1.thenCombine(step2, (result1, result2) -> {System.out.println("前两步操作后果别离为:" + result1 + result2);
        return "【步骤 3 的执行后果】";
    });
    
    System.out.println("步骤 3 的执行后果:" + step3.get());

复制代码

执行后果:
执行【步骤 1】
执行【步骤 2】
前两步操作后果别离为:【步骤 1 的执行后果】【步骤 2 的执行后果】
步骤 3 的执行后果:【步骤 3 的执行后果】

通过 thenCombine 办法,期待 step1 和 step2 都执行结束后,获取其返回后果并执行一段新的逻辑
多元依赖
当然还可能有上面这种场景,步骤 M 须要依赖 1 - N 的 N 个前置节点:

这种状况会更为简单,实现形式如下:

    ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {System.out.println("执行【步骤 1】");
        return "【步骤 1 的执行后果】";
    }, executor);
    CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {System.out.println("执行【步骤 2】");
        return "【步骤 2 的执行后果】";
    }, executor);
    CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> {System.out.println("执行【步骤 3】");
        return "【步骤 3 的执行后果】";
    }, executor);

    CompletableFuture<Void> stepM = CompletableFuture.allOf(step1, step2, step3);
    CompletableFuture<String> stepMResult = stepM.thenApply(res -> {
       // 通过 join 函数获取返回值
       String result1 = step1.join();
       String result2 = step2.join();
       String result3 = step3.join();
    
       return result1 + result2 + result3;
    });
    System.out.println("步骤 M 的后果:" + stepMResult.get());

复制代码

执行后果:
执行【步骤 1】
执行【步骤 2】
执行【步骤 3】
步骤 M 的后果:【步骤 1 的执行后果】【步骤 2 的执行后果】【步骤 3 的执行后果】

通过 allOf 函数申明当参数中的所有工作执行结束后,才会执行下一步操作,然而要留神,allOf 自身只是定义节点,真正阻塞的地位是 thenApply 函数。
和之前的形式不同,因为采纳了不定变量,所以要通过 CompletableFuture#join 来获取每个工作的返回值。
除了 allOf 之外,如果咱们须要任意一个工作实现后就执行下一步操作,能够应用 anyOf 办法,如下:

// step1/2/ 3 的定义雷同
    // ...
    CompletableFuture<Object> stepM = CompletableFuture.anyOf(step1, step2, step3);
System.out.println("步骤 M 的后果:" + stepM.get());

复制代码

执行后果:
步骤 M 的后果:【步骤 1 的执行后果】

与 allOf 不同,anyOf 的返回值即为第一个执行结束的工作
工作原理
概念
在讲原理之前,先来理解一下 CompletableFuture 的定义。在实现上,CompletableFuture 继承了 Future 和 CompletionStage

Future 毋庸置疑,CompletableFuture 最根本的能力就是获取异步计算的后果。CompletionStage 则是申明了编排节点的能力,每一个 CompletionStage 都申明了流程树上的一个节点(见下图)

CompletionStage 申明的接口 thenXXX,包含 thenApply、thenCompose 等,定义了节点之间的连贯形式(理论状况更为简单,具体原理参考下节函数剖析),通过这种形式,最终定义出一颗流程树,进而实现了多线程的工作编排。CompletionStage 的办法返回值通常是另一个 CompletionStage,进而形成了链式调用。
构造剖析
CompletableFuture 里蕴含两个变量,result 和 stack

result 很好了解,就是以后节点的执行后果。stack 就比较复杂,是一个无锁并发栈,申明了以后节点执行结束后要触发的节点列表,接下来咱们具体讲一下
CompletableFuture 中的栈设计
Completion 是一个无锁并发栈,申明了以后节点执行结束后要触发的节点列表。在结构上是一个链式节点,其中只蕴含了一个指向下一个节点的 next 对象

咱们能够看到 Completion 有繁多的实现类,示意不同的依赖形式。

咱们晓得,在 CompletableFuture 中的流程编排是通过 thenApply、thenAccept、thenCombine 等形式来实现的,

thenApply:接管上一步的处理结果,进行下一步生产,并返回后果

thenAccept:和 thenApply 相似,不过无后果返回

thenCombine:同时接管两个流程节点,等其都执行结束后一起处理结果

每个函数理论别离对应了一种 Completion 实现类,以方才的三种函数为例,别离对应了 UniApply、UniAccept、UniCombine 三个对象。所以 Completion 能够认为是流程编排逻辑的形象对象,能够了解为流程节点,或者工作节点。
以 UniCompletion 为例,构造如下:
abstract static class UniCompletion<T,V> extends Completion {

Executor executor;                 // 线程池
CompletableFuture<V> dep;          // 实现工作依赖的 cf
CompletableFuture<T> src;          // 实现工作所需资源所在的 cf

/**
 * 如果工作能够被执行则返回 true,通过 FJ 标记位保障只有一个线程判断胜利。* 如果工作是异步的,则在工作启动后通过 tryFire 来执行工作
 */
final boolean claim() {
    Executor e = executor;
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null)
            return true;
        executor = null; // disable
        e.execute(this);
    }
    return false;
}

/**
 * 如果 dep 不为空返回 true,用以判断当前任务节点是否已被激活
 */
final boolean isLive() {return dep != null;}

}
复制代码
先来看 claim 函数,这个比拟容易解释,该函数用于判断工作是否可被执行。通过 compareAndSetForkJoinTaskTag 函数的 CAS 操作保障只有一个线程执行胜利,次要作用就是在多线程状况下确保工作的正确执行。
接下来就是重头戏,源工作与依赖工作,这两个概念是 CompletableFuture 的外围,贯通了所有逻辑的执行,只有了解了这两个概念,能力对执行原理有比拟透彻的了解
源工作与依赖工作
源工作和依赖工作在 UniCompletion 中别离为 src 和 dep 属性,举个具体的例子,比方上面这段代码:
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {

return "A";

});

CompletableFuture<String> b = a.thenApply(res -> {

return "B" + res;

});
复制代码
调用 a.thenApply(Function fn)时,能够认为是生成了一个 UniApply 的流程节点(具体怎么生成的下文会提到),其中源工作就是 a,而依赖工作则是 thenApply 的返回值。
换个简略的说法,在下面的代码中,咱们有 a、b 两个工作,b 工作的实现须要依赖于 a 工作的实现,所以 a 会生成一个流程节点(UniApply 对象),其中蕴含了 b 想要执行实现的全副资源(a 的执行后果等),这时 a 工作就叫做源工作(因为 a 工作中有工作资源)。而 b 工作须要依赖 a 工作来实现,所以 b 工作叫做依赖工作。
源工作的实现会触发依赖工作的执行,这个就是工作编排的基本原理
函数剖析
在本节中,CompletableFuture 因为名字太长,会以 cf 来代指
因为 thenAccept、thenCombine 函数等逻辑比拟相似,咱们以最根底的 thenApply 函数为例进行剖析
外围函数
咱们先不要间接从 thenApply、complete 等函数动手,咱们先来看这几个外围函数,不明确做什么的不要紧,先了解这几个函数的原理就好
uniApply
CompletableFuture 的逻辑在于“只有当 X 条件满足时,再执行 Y 逻辑”,uniApply 函数就是负责这样的逻辑判断,首先看源码:
final <S> boolean uniApply(CompletableFuture<S> a,

                        Consumer<? super S> f, UniApply<S> c) {
Object r; Throwable x;
// 1
if (a == null || (r = a.result) == null || f == null)
    return false;

tryComplete: if (result == null) {if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {completeThrowable(x, r);
            break tryComplete;
        }
        r = null;
    }

    try {
        // 2
        if (c != null && !c.claim())
            return false;

        // 3
        S s = (S) r;
        completeValue(f.apply(s));
    } catch (Throwable ex) {completeThrowable(ex);
    }
}
return true;

}
复制代码
整个办法能够分为三段(已在代码中标出),咱们离开来说。
第一段,判断所给的工作节点是否曾经执行结束,如果曾经执行结束则进入下一步
第二段,如果有关联的流程节点,则通过 claim 函数判断当前任务是否可被执行,如果可执行则进入下一步(确保多线程状况下工作的正确执行)
第三段,执行传入的函数并把值设置到以后对象中。
整个逻辑是这样的,首先咱们传入了一个 cf 对象、一个函数,和一个流程节点。只有当传入的 cf 对象执行实现(result 不为空),再执行给定的函数,并把执行后果设置到以后对象中。如果不思考非凡状况,uniApply 办法用一句话解释就是:如果给定的工作曾经执行结束,就执行传入的函数并把执行后果设置到以后对象中
tryFire
uniApply 函数仅仅是一个有条件的函数执行器,真正想要达到工作编排还须要其余函数的参加,咱们先来看 tryFire 办法:
final CompletableFuture<V> tryFire(int mode) {

        CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
    !d.uniApply(a = src, fn, mode > 0 ? null : this))
    return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);

}
复制代码
tryFire 依据关联关系的不同有多种实现,理论执行流程相差不大,这里以罕用的 UniApply 的实现来举例。
首先这个办法接管了一个 mode 参数,有以下几种取值:

-1:流传模式,或者叫嵌套模式。示意工作理论曾经执行结束,只是在传递状态

0:同步模式。工作由以后线程调用解决

1:异步模式。工作须要提交到指定线程池解决

依据 mode 的不同,理论 tryFire 执行的流程也会产生很大区别。不过归根到底,tryFire 办法的实质是调用了 uniApply 执行一次工作,如果执行胜利,则会清空 dep、src 等本身属性(清空之后 isLive 办法会返回 false,示意工作曾经执行结束),同时通过 postFire 办法执行该工作下的其余依赖工作,实现工作的流传执行。
postFire 办法因为和 tryFire 办法关联比拟亲密,这里放在一起阐明:
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {

    if (a != null && a.stack != null) {if (mode < 0 || a.result == null)
            a.cleanStack();
        else
            a.postComplete();}
    if (result != null && stack != null) {if (mode < 0)
            return this;
        else
            postComplete();}
    return null;

}
复制代码
这里简略概括一下执行原理,如果是嵌套模式,则清理栈内有效工作,并返回对象自身(能够认为什么都没做);否则通过 postComplete 办法执行栈内依赖此工作的其余工作项
postComplete
当一个 CompletionStage 执行实现之后,会触发依赖它的其余 CompletionStage 的执行,这些 Stage 的执行又会触发新一批的 Stage 执行,这就是工作的程序编排
如果说 uniApply 是根底性能,是负责线程平安且恪守依赖程序地执行一个函数,那么 postComplete 就是外围逻辑,负责当一个工作执行结束后触发依赖该工作的其余工作项,先来看源码:
final void postComplete() {

CompletableFuture<?> f = this; Completion h;
// 1
while ((h = f.stack) != null ||
       (f != this && (h = (f = this).stack) != null)) {
    CompletableFuture<?> d; Completion t;

    // 2
    if (f.casStack(h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);
                continue;
            }
            h.next = null;    // detach
        }
        // 3
        f = (d = h.tryFire(NESTED)) == null ? this : d;
    }
}

}
复制代码
在源码上标记了三个地位,别离代表三层构造,首先是第一层 while 循环,只有以后对象栈中还有流程节点,那么就循环执行外部逻辑。
第二层,因为 continue 的存在,和第一层联合起来看就是一个批量压栈的操作,将所有须要触发的依赖树按程序压入以后对象栈中。
第三层,通过 tryFire 按程序触发栈中所有的依赖工作。上节咱们能够看到 tryFire 函数内依据 mode 的不同会触发不同的逻辑,这里 mode 指定为 NESTED 就是为了防止循环调用 postComplete
执行函数
几个外围函数介绍完了,接下来咱们回到最外层,来看看工作是如何执行的,首先咱们以 thenApply 为例剖析外围执行函数
supplyAsync(理论调用为 asyncSupplyStage)
该办法用于提交一个工作到线程池中执行,并将该工作打包为一个 CompletableFuture 节点
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {

if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;

}
复制代码
其中 AsyncSupply 实现了 Runnable 接口,所以了解为一种非凡的工作即可。这种工作在执行实现后会调用 completeValue 将函数执行的后果设置以后对象中。
所以整体逻辑为,首先创立一个 cf 对象,并立刻将工作增加到线程池执行,在执行结束后会将工作执行的后果保留到所创立的 cf 对象中。
complete
public boolean complete(T value) {

boolean triggered = completeValue(value);
postComplete();
return triggered;

}
复制代码
该办法间接调用 completeValue 办法设置值,设置完值之后调用 postComplete 办法来顺次执行后续工作。当调用该办法时,会实现工作的依赖扩散执行
thenApply(理论调用为 uniApplyStage)
private <V> CompletableFuture<V> uniApplyStage(

    Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();

CompletableFuture<V> d =  new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
    push(c);
    c.tryFire(SYNC);
}
return d;

}
复制代码
联合上节剖析的外围函数,咱们很容易能够剖析该函数的流程:执行 function 函数,如果条件不满足则执行失败,会生成一个流程节点并压入栈,同时再通过 tryFire 再尝试执行一次,如果条件仍然不满足,那么只能期待所依赖的工作执行实现后通过 postComplete 触发执行。
get
public T get() throws InterruptedException, ExecutionException {

Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);

}
复制代码
办法外围在于 waitingGet,外部应用了 ForkJoinPool.managedBlock 来阻塞线程直到执行结束
流程剖析
在函数剖析中,咱们理论曾经阐明了工作依赖执行的基本原理,这里为了更为具体地阐明,咱们以一个简略的例子来剖析。
首先咱们抛开所有简单的因素,以最根本的同步串行代码来讲,咱们当初有这样一个对象:

CompletableFuture<String> A = new CompletableFuture<>();

复制代码

而后咱们这时候给其加上了工作编排,减少了一个 thenApply 依赖

AtomicInteger seq = new AtomicInteger(0);
Function<String, String> func = s -> s + "|" + seq.incrementAndGet();

CompletableFuture<String> a = new CompletableFuture<>();
CompletableFuture<String> b = a.thenApply(func);

复制代码

于是咱们就有了这样一个构造,A 的 stack 中压入了一个 Completion 节点,该节点的源工作指向 A 自身,而依赖工作指向了 B,示意 B 工作依赖 A 工作的执行。
接下来咱们再加一条依赖

AtomicInteger seq = new AtomicInteger(0);
Function<String, String> func = s -> s + "|" + seq.incrementAndGet();

CompletableFuture<String> a = new CompletableFuture<>();
CompletableFuture<String> b = a.thenApply(func);
  CompletableFuture<String> c = a.thenApply(func);

复制代码

咱们会发现两个特点:

和栈的性质一样,越晚增加的编排逻辑越早被执行

基于同一个对象衍生进去的流程节点的源工作是统一的

以此类推,thenXXX 的其余逻辑也是相似的原理,当 a 调用 complete 函数时(无论是同步还是异步),都会顺次触发 A 工作的 stack 下挂接的其余依赖工作。而只有 a 没有调用 complete 函数,那么 thenApply 中挂接的依赖工作无论如何都无奈执行(因为 a 对象的 result 属性为空)
注意事项
防止主工作和子工作向同一个线程池中申请线程,因为存在依赖关系,当通过 join 来获取子工作的值时,一旦子工作因为线程队列已满进入阻塞队列,那么将会造成死

正文完
 0