关于express:CompletableFuture实现异步编排

43次阅读

共计 9712 个字符,预计需要花费 25 分钟才能阅读完成。

前言

为什么须要异步执行?

场景:电商零碎中获取一个残缺的商品信息可能分为以下几步:①获取商品根本信息 ②获取商品图片信息 ③获取商品促销流动信息 ④获取商品各种类的根本信息 等操作,如果应用串行形式去执行这些操作,假如每个操作执行 1s,那么用户看到残缺的商品详情就须要 4s 的工夫,如果应用并行形式执行这些操作,可能只须要 1s 就能够实现。所以这就是异步执行的益处。

JDK5 的 Future 接口

Future 接口用于代表异步计算的后果,通过 Future 接口提供的办法能够查看异步计算是否执行实现,或者期待执行后果并获取执行后果,同时还能够勾销执行。
列举 Future 接口的办法:

get():获取工作执行后果,如果工作还没实现则会阻塞期待直到工作执行实现。如果工作被勾销则会抛出 CancellationException 异样,如果工作执行过程产生异样则会抛出 ExecutionException 异样,如果阻塞期待过程中被中断则会抛出 InterruptedException 异样。
get(long timeout,Timeunit unit):带超时工夫的 get()办法,如果阻塞期待过程中超时则会抛出 TimeoutException 异样。
cancel():用于勾销异步工作的执行。如果异步工作曾经实现或者曾经被勾销,或者因为某些起因不能取消,则会返回 false。如果工作还没有被执行,则会返回 true 并且异步工作不会被执行。如果工作曾经开始执行了然而还没有执行实现,若 mayInterruptIfRunning 为 true,则会立刻中断执行工作的线程并返回 true,若 mayInterruptIfRunning 为 false,则会返回 true 且不会中断工作执行线程。
isCanceled():判断工作是否被勾销,如果工作在完结 (失常执行完结或者执行异样完结) 前被勾销则返回 true,否则返回 false。
isDone():判断工作是否曾经实现,如果实现则返回 true,否则返回 false。须要留神的是:工作执行过程中产生异样、工作被勾销也属于工作已实现,也会返回 true。

应用 Future 接口和 Callable 接口实现异步执行:
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 获取商品根本信息(能够应用 Lambda 表达式简化 Callable 接口,这里为了便于察看不应用)Future<String> future1 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {return "获取到商品根本信息";}
});
// 获取商品图片信息
Future<String> future2 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {return "获取商品图片信息";}
});
// 获取商品促销信息
Future<String> future3 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {return "获取商品促销信息";}
});
// 获取商品各种类根本信息
Future<String> future4 = executorService.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {return "获取商品各种类根本信息";}
});
    // 获取后果
try {System.out.println(future1.get());
    System.out.println(future2.get());
    System.out.println(future3.get());
    System.out.println(future4.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executorService.shutdown();
}

}
复制代码

既然 Future 能够实现异步执行并获取后果,为什么还会须要 CompletableFuture?

简述一下 Future 接口的弊病:

不反对手动实现

当提交了一个工作,然而执行太慢了,通过其余门路曾经获取到了工作后果,当初没法把这个工作后果告诉到正在执行的线程,所以必须被动勾销或者始终期待它执行实现。

不反对进一步的非阻塞调用

通过 Future 的 get()办法会始终阻塞到工作实现,然而想在获取工作之后执行额定的工作,因为 Future 不反对回调函数,所以无奈实现这个性能。

不反对链式调用

对于 Future 的执行后果,想持续传到下一个 Future 解决应用,从而造成一个链式的 pipline 调用,这在 Future 中无奈实现。

不反对多个 Future 合并

比方有 10 个 Future 并行执行,想在所有的 Future 运行结束之后,执行某些函数,是无奈通过 Future 实现的。

不反对异样解决

Future 的 API 没有任何的异样解决的 api,所以在异步运行时,如果出了异样问题不好定位。

应用 Future 接口能够通过 get()阻塞式获取后果或者通过轮询+isDone()非阻塞式获取后果,然而前一种办法会阻塞,后一种会消耗 CPU 资源,所以 JDK 的 Future 接口实现异步执行对获取后果不太敌对,所以在 JDK8 时推出了 CompletableFuture 实现异步编排。

CompletableFuture 的应用

CompletableFuture 概述

JDK8 中新减少了一个蕴含 50 个办法左右的类 CompletableFuture,提供了十分弱小的 Future 的扩大性能,能够帮忙咱们简化异步编程的复杂性,提供了函数式编程的能力,能够通过回调的形式解决计算结果,并且提供了转换和组合 CompletableFuture 的办法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
复制代码
CompletableFuture 类实现了 Future 接口和 CompletionStage 接口,即除了能够应用 Future 接口的所有办法之外,CompletionStage<T> 接口提供了更多办法来更好的实现异步编排,并且大量的应用了 JDK8 引入的函数式编程概念。前面会粗疏的介绍罕用的 API。

① 创立 CompletableFuture 的形式

应用 new 关键字创立

// 无返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>(“result”);
// 已知返回后果(底层其实也是带参数的结构器赋值)
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture(“result”);
复制代码
创立一个返回后果类型为 String 的 CompletableFuture,能够应用 Future 接口的 get()办法获取该值(同样也会阻塞)。
能够应用无参结构器返回一个没有后果的 CompletableFuture,也能够通过结构器的传参 CompletableFuture 设置好返回后果,或者应用 CompletableFuture.completedFuture(U value)结构一个已知后果的 CompletableFuture。

应用 CompletableFuture 类的动态工厂办法(罕用)

runAsync() 无返回值

// 应用默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 应用自定义线程池(举荐)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
复制代码
runAsync() 办法的参数是 Runnable 接口,这是一个函数式接口,不容许返回值。当须要异步操作且不关怀返回后果的时候能够应用 runAsync()办法。
// 例子
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
    // 通过 Lambda 表达式实现 Runnable 接口
    CompletableFuture.runAsync(()-> System.out.println("获取商品根本信息胜利"), executor).get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executor.shutdown();
}

}
复制代码

supplyAsync() 有返回值

// 应用默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 应用自定义线程池(举荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
复制代码
supplyAsync() 办法的参数是 Supplier<U> 供应型接口(无参有返回值),这也是一个函数式接口,U 是返回后果值的类型。当须要异步操作且关怀返回后果的时候, 能够应用 supplyAsync()办法。
// 例子
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
    // 通过 Lambda 表达式实现执行内容,并返回后果通过 CompletableFuture 接管
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("获取商品信息胜利");
        return "信息";
    }, executor);
    // 输入后果
    System.out.println(completableFuture.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executor.shutdown();
}

}
复制代码

对于第二个参数 Executor executor 阐明

在没有指定第二个参数(即没有指定线程池)时,CompletableFuture 间接应用默认的 ForkJoinPool.commonPool()作为它的线程池执行异步代码。
在理论生产中会应用自定义的线程池来执行异步代码,具体能够参考另一篇文章深刻了解线程池 ThreadPoolExecutor – 掘金 (juejin.cn),外面的第二节有生产中怎么创立自定义线程的例子,能够参考一下。

② 取得异步执行后果

get() 阻塞式获取执行后果

public T get() throws InterruptedException, ExecutionException
复制代码
该办法调用后如果工作还没实现则会阻塞期待直到工作执行实现。如果工作执行过程产生异样则会抛出 ExecutionException 异样,如果阻塞期待过程中被中断则会抛出 InterruptedException 异样。

get(long timeout, TimeUnit unit) 带超时的阻塞式获取执行后果

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
复制代码
该办法调用后如果如果工作还没实现则会阻塞期待直到工作执行实现或者超出 timeout 工夫,如果阻塞期待过程中超时则会抛出 TimeoutException 异样。

getNow(T valueIfAbsent) 立即获取执行后果

public T getNow(T valueIfAbsent)
复制代码
该办法调用后,会立即获取后果不会阻塞期待。如果工作实现则间接返回执行实现后的后果,如果工作没有实现,则返回调用办法时传入的参数 valueIfAbsent 值。

join() 不抛异样的阻塞时获取执行后果

public T join()
复制代码
该办法和 get()办法作用一样,只是不会抛出异样。

complete(T value) 被动触发计算,返回异步是否执行结束

public boolean complete(T value)
复制代码
该办法调用后,会被动触发计算结果,如果此时异步执行并没有实现(此时 boolean 值返回 true),则通过 get()拿到的数据会是 complete()设置的参数 value 值,如果此时异步执行曾经实现(此时 boolean 值返回 false),则通过 get()拿到的就是执行实现的后果。
// 例子
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
    // 通过 Lambda 表达式实现执行内容,并返回后果通过 CompletableFuture 接管
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        // 休眠 2 秒,使得异步执行变慢,会导致被动触发计算先执行,此时返回的 get 就是 555
        try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
        return 666;
    }, executor);
    // 被动触发计算,判断异步执行是否实现
    System.out.println(completableFuture.complete(555));
    // 输入后果
    System.out.println(completableFuture.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executor.shutdown();
}

}

/**
输入后果:

true
555

**/
复制代码

③ 对执行后果进行解决

whenComplete 期待后面工作执行完再执行以后解决

public CompletableFuture<T> whenComplete(

    BiConsumer<? super T, ? super Throwable> action)

复制代码
在创立好的初始工作或者是上一个工作后通过链式调用该办法,会在之前工作执行实现后继续执行 whenComplete 里的内容(whenComplete 传入的 action 只是对之前工作的后果进行解决),即应用该办法能够防止后面说到的 Future 接口的问题,不再须要通过阻塞或者轮询的形式去获取后果,而是通过调用该办法等工作执行结束主动调用。
该办法的参数为 BiConsumer<? super T, ? super Throwable> action 消费者接口,能够接管两个参数,一个是工作执行完的后果,一个是执行工作时的异样。
// 例子
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
            .whenComplete((res, ex) -> System.out.println("工作执行结束,后果为" + res + "异样为" + ex)
            );
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}

}

/**
输入后果:

工作执行结束,后果为 666 异样为 null

**/
复制代码

除了上述的办法外,还有一些相似的办法如 XXXAsync()或者是 XXXAsync(XX,Executor executor), 对于这些办法,这里对立阐明,后续文章中将不会再列举

public CompletableFuture<T> whenCompleteAsync(

    BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(

    BiConsumer<? super T, ? super Throwable> action, Executor executor)

复制代码
XXXAsync():示意上一个工作执行实现后,不会再应用之前工作中的线程,而是从新应用从默认线程(ForkJoinPool 线程池)中从新获取新的线程执行当前任务。
XXXAsync(XX,Executor executor):示意不会沿用之前工作的线程,而是应用本人第二个参数指定的线程池从新获取线程执行当前任务。

④ 对执行后果进行生产

thenRun 后面工作执行完后执行当前任务,不关怀后面工作的后果,也没返回值

public CompletableFuture<Void> thenRun(Runnable action)
复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB) 像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,然而工作 B 不须要 A 的后果,并且执行完工作 B 也不会返回后果。
thenRun(Runnable action)的参数为 Runnable 接口即没有传入参数。
// 例子
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
                .thenRun(() -> System.out.println("我都没有参数怎么拿到之前的后果,我也没有返回值。")
            );
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}

}

/**
输入后果:

我都没有参数怎么拿到之前的后果,我也没有返回值。

**/
复制代码

thenAccept 后面工作执行完后执行当前任务,生产后面的后果,没有返回值

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB) 像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,而且工作 B 须要 A 的后果,然而执行完工作 B 不会返回后果。
thenAccept(Consumer<? super T> action)的参数为消费者接口,即能够传入一个参数,该参数为上一个工作的执行后果。
// 例子
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
            .thenAccept((res) -> System.out.println("我能拿到上一个的后果" + res + ", 然而我没法传出去。")
            );
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}

}

/**
输入后果:

我能拿到上一个的后果 666, 然而我没法传出去。

**/
复制代码

thenApply 后面工作执行完后执行当前任务,生产后面的后果,具备返回值

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB) 像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,而且工作 B 须要 A 的后果,并且执行完工作 B 须要有返回后果。
thenApply(Function<? super T,? extends U> fn)的参数为函数式接口,即能够传入一个参数类型为 T,该参数是上一个工作的执行后果,并且函数式接口须要有返回值, 类型为 U。
// 例子
public static void main(String[] args) {

// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
            .thenApply((res) -> {System.out.println("我能拿到上一个的后果" + res + "并且我要将后果传出去");
                    return res;
                }
            ).whenComplete((res, ex) -> System.out.println("后果" + res));
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}

}
/**
输入后果:

我能拿到上一个的后果 666 并且我要将后果传出去
后果 666

**/
复制代码

⑤ 异样解决

exceptionally 异样捕捉,只生产后面工作中呈现的异样信息,具备返回值

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
复制代码
能够通过链式调用该办法来获取异样信息,并且具备返回值。如果某一个工作出现异常被 exceptionally 捕捉到则残余的工作将不会再执行。相似于 Java 异样解决的 catch。
exceptionally(Function<Throwable, ? extends T> fn)的参数是函数式接口,具备一个参数以及返回值,该参数为后面工作的异样信息。

正文完
 0