前言

为什么须要异步执行?

场景:电商零碎中获取一个残缺的商品信息可能分为以下几步:①获取商品根本信息 ②获取商品图片信息 ③获取商品促销流动信息 ④获取商品各种类的根本信息 等操作,如果应用串行形式去执行这些操作,假如每个操作执行1s,那么用户看到残缺的商品详情就须要4s的工夫,如果应用并行形式执行这些操作,可能只须要1s就能够实现。所以这就是异步执行的益处。

JDK5的Future接口

Future接口用于代表异步计算的后果,通过Future接口提供的办法能够查看异步计算是否执行实现,或者期待执行后果并获取执行后果,同时还能够勾销执行。
列举Future接口的办法:

get():获取工作执行后果,如果工作还没实现则会阻塞期待直到工作执行实现。如果工作被勾销则会抛出CancellationException异样,如果工作执行过程产生异样则会抛出ExecutionException异样,如果阻塞期待过程中被中断则会抛出InterruptedException异样。
get(long timeout,Timeunit unit):带超时工夫的get()办法,如果阻塞期待过程中超时则会抛出TimeoutException异样。
cancel():用于勾销异步工作的执行。如果异步工作曾经实现或者曾经被勾销,或者因为某些起因不能取消,则会返回false。如果工作还没有被执行,则会返回true并且异步工作不会被执行。如果工作曾经开始执行了然而还没有执行实现,若mayInterruptIfRunning为true,则会立刻中断执行工作的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断工作执行线程。
isCanceled():判断工作是否被勾销,如果工作在完结(失常执行完结或者执行异样完结)前被勾销则返回true,否则返回false。
isDone():判断工作是否曾经实现,如果实现则返回true,否则返回false。须要留神的是:工作执行过程中产生异样、工作被勾销也属于工作已实现,也会返回true。

应用Future接口和Callable接口实现异步执行:
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executorService = Executors.newFixedThreadPool(4);// 获取商品根本信息(能够应用Lambda表达式简化Callable接口,这里为了便于察看不应用)Future<String> future1 = executorService.submit(new Callable<String>() {    @Override    public String call() throws Exception {        return "获取到商品根本信息";    }});// 获取商品图片信息Future<String> future2 = executorService.submit(new Callable<String>() {    @Override    public String call() throws Exception {        return "获取商品图片信息";    }});// 获取商品促销信息Future<String> future3 = executorService.submit(new Callable<String>() {    @Override    public String call() throws Exception {        return "获取商品促销信息";    }});// 获取商品各种类根本信息Future<String> future4 = executorService.submit(new Callable<String>() {    @Override    public String call() throws Exception {        return "获取商品各种类根本信息";    }});    // 获取后果try {    System.out.println(future1.get());    System.out.println(future2.get());    System.out.println(future3.get());    System.out.println(future4.get());} catch (InterruptedException | ExecutionException e) {    e.printStackTrace();}finally {    executorService.shutdown();}

}
复制代码

既然Future能够实现异步执行并获取后果,为什么还会须要CompletableFuture?

简述一下Future接口的弊病:

不反对手动实现

当提交了一个工作,然而执行太慢了,通过其余门路曾经获取到了工作后果,当初没法把这个工作后果告诉到正在执行的线程,所以必须被动勾销或者始终期待它执行实现。

不反对进一步的非阻塞调用

通过Future的get()办法会始终阻塞到工作实现,然而想在获取工作之后执行额定的工作,因为 Future 不反对回调函数,所以无奈实现这个性能。

不反对链式调用

对于Future的执行后果,想持续传到下一个Future解决应用,从而造成一个链式的pipline调用,这在 Future中无奈实现。

不反对多个 Future 合并

比方有10个Future并行执行,想在所有的Future运行结束之后,执行某些函数,是无奈通过Future实现的。

不反对异样解决

Future的API没有任何的异样解决的api,所以在异步运行时,如果出了异样问题不好定位。

应用Future接口能够通过get()阻塞式获取后果或者通过轮询+isDone()非阻塞式获取后果,然而前一种办法会阻塞,后一种会消耗CPU资源,所以JDK的Future接口实现异步执行对获取后果不太敌对,所以在JDK8时推出了CompletableFuture实现异步编排。

CompletableFuture的应用

CompletableFuture概述

JDK8中新减少了一个蕴含50个办法左右的类CompletableFuture,提供了十分弱小的Future的扩大性能,能够帮忙咱们简化异步编程的复杂性,提供了函数式编程的能力,能够通过回调的形式解决计算结果,并且提供了转换和组合CompletableFuture的办法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
复制代码
CompletableFuture类实现了Future接口和CompletionStage接口,即除了能够应用Future接口的所有办法之外,CompletionStage<T>接口提供了更多办法来更好的实现异步编排,并且大量的应用了JDK8引入的函数式编程概念。前面会粗疏的介绍罕用的API。

① 创立CompletableFuture的形式

应用new关键字创立

// 无返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>("result");
// 已知返回后果(底层其实也是带参数的结构器赋值)
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");
复制代码
创立一个返回后果类型为String的CompletableFuture,能够应用Future接口的get()办法获取该值(同样也会阻塞)。
能够应用无参结构器返回一个没有后果的CompletableFuture,也能够通过结构器的传参CompletableFuture设置好返回后果,或者应用CompletableFuture.completedFuture(U value)结构一个已知后果的CompletableFuture。

应用CompletableFuture类的动态工厂办法(罕用)

runAsync() 无返回值

// 应用默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 应用自定义线程池(举荐)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
复制代码
runAsync()办法的参数是Runnable接口,这是一个函数式接口,不容许返回值。当须要异步操作且不关怀返回后果的时候能够应用runAsync()办法。
// 例子
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executor = Executors.newFixedThreadPool(4);try {    // 通过Lambda表达式实现Runnable接口    CompletableFuture.runAsync(()-> System.out.println("获取商品根本信息胜利"), executor).get();} catch (InterruptedException | ExecutionException e) {    e.printStackTrace();}finally {    executor.shutdown();}

}
复制代码

supplyAsync() 有返回值

// 应用默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 应用自定义线程池(举荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
复制代码
supplyAsync()办法的参数是Supplier<U>供应型接口(无参有返回值),这也是一个函数式接口,U是返回后果值的类型。当须要异步操作且关怀返回后果的时候,能够应用supplyAsync()办法。
// 例子
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executor = Executors.newFixedThreadPool(4);try {    // 通过Lambda表达式实现执行内容,并返回后果通过CompletableFuture接管    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {        System.out.println("获取商品信息胜利");        return "信息";    }, executor);    // 输入后果    System.out.println(completableFuture.get());} catch (InterruptedException | ExecutionException e) {    e.printStackTrace();}finally {    executor.shutdown();}

}
复制代码

对于第二个参数Executor executor阐明

在没有指定第二个参数(即没有指定线程池)时,CompletableFuture间接应用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
在理论生产中会应用自定义的线程池来执行异步代码,具体能够参考另一篇文章深刻了解线程池ThreadPoolExecutor - 掘金 (juejin.cn),外面的第二节有生产中怎么创立自定义线程的例子,能够参考一下。

② 取得异步执行后果

get() 阻塞式获取执行后果

public T get() throws InterruptedException, ExecutionException
复制代码
该办法调用后如果工作还没实现则会阻塞期待直到工作执行实现。如果工作执行过程产生异样则会抛出ExecutionException异样,如果阻塞期待过程中被中断则会抛出InterruptedException异样。

get(long timeout, TimeUnit unit) 带超时的阻塞式获取执行后果

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
复制代码
该办法调用后如果如果工作还没实现则会阻塞期待直到工作执行实现或者超出timeout工夫,如果阻塞期待过程中超时则会抛出TimeoutException异样。

getNow(T valueIfAbsent) 立即获取执行后果

public T getNow(T valueIfAbsent)
复制代码
该办法调用后,会立即获取后果不会阻塞期待。如果工作实现则间接返回执行实现后的后果,如果工作没有实现,则返回调用办法时传入的参数valueIfAbsent值。

join() 不抛异样的阻塞时获取执行后果

public T join()
复制代码
该办法和get()办法作用一样,只是不会抛出异样。

complete(T value) 被动触发计算,返回异步是否执行结束

public boolean complete(T value)
复制代码
该办法调用后,会被动触发计算结果,如果此时异步执行并没有实现(此时boolean值返回true),则通过get()拿到的数据会是complete()设置的参数value值,如果此时异步执行曾经实现(此时boolean值返回false),则通过get()拿到的就是执行实现的后果。
// 例子
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executor = Executors.newFixedThreadPool(4);try {    // 通过Lambda表达式实现执行内容,并返回后果通过CompletableFuture接管    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {        // 休眠2秒,使得异步执行变慢,会导致被动触发计算先执行,此时返回的get就是555        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }        return 666;    }, executor);    // 被动触发计算,判断异步执行是否实现    System.out.println(completableFuture.complete(555));    // 输入后果    System.out.println(completableFuture.get());} catch (InterruptedException | ExecutionException e) {    e.printStackTrace();}finally {    executor.shutdown();}

}

/**
输入后果:

true555

**/
复制代码

③ 对执行后果进行解决

whenComplete 期待后面工作执行完再执行以后解决

public CompletableFuture<T> whenComplete(

    BiConsumer<? super T, ? super Throwable> action)

复制代码
在创立好的初始工作或者是上一个工作后通过链式调用该办法,会在之前工作执行实现后继续执行whenComplete里的内容(whenComplete传入的action只是对之前工作的后果进行解决),即应用该办法能够防止后面说到的Future接口的问题,不再须要通过阻塞或者轮询的形式去获取后果,而是通过调用该办法等工作执行结束主动调用。
该办法的参数为BiConsumer<? super T, ? super Throwable> action消费者接口,能够接管两个参数,一个是工作执行完的后果,一个是执行工作时的异样。
// 例子
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executor = Executors.newFixedThreadPool(4);try {    CompletableFuture.supplyAsync(() -> 666, executor)            .whenComplete((res, ex) -> System.out.println("工作执行结束,后果为" + res + " 异样为" + ex)            );} catch (Exception e) {    e.printStackTrace();}finally {    executor.shutdown();}

}

/**
输入后果:

工作执行结束,后果为666 异样为null

**/
复制代码

除了上述的办法外,还有一些相似的办法如XXXAsync()或者是XXXAsync(XX,Executor executor),对于这些办法,这里对立阐明,后续文章中将不会再列举

public CompletableFuture<T> whenCompleteAsync(

    BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(

    BiConsumer<? super T, ? super Throwable> action, Executor executor)

复制代码
XXXAsync():示意上一个工作执行实现后,不会再应用之前工作中的线程,而是从新应用从默认线程(ForkJoinPool 线程池)中从新获取新的线程执行当前任务。
XXXAsync(XX,Executor executor):示意不会沿用之前工作的线程,而是应用本人第二个参数指定的线程池从新获取线程执行当前任务。

④ 对执行后果进行生产

thenRun 后面工作执行完后执行当前任务,不关怀后面工作的后果,也没返回值

public CompletableFuture<Void> thenRun(Runnable action)
复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作A实现后接着执行工作B,然而工作B不须要A的后果,并且执行完工作B也不会返回后果。
thenRun(Runnable action)的参数为Runnable接口即没有传入参数。
// 例子
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executor = Executors.newFixedThreadPool(4);try {    CompletableFuture.supplyAsync(() -> 666, executor)                .thenRun(() -> System.out.println("我都没有参数怎么拿到之前的后果,我也没有返回值。")            );} catch (Exception e) {    e.printStackTrace();}finally {    executor.shutdown();}

}

/**
输入后果:

我都没有参数怎么拿到之前的后果,我也没有返回值。

**/
复制代码

thenAccept 后面工作执行完后执行当前任务,生产后面的后果,没有返回值

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作A实现后接着执行工作B,而且工作B须要A的后果,然而执行完工作B不会返回后果。
thenAccept(Consumer<? super T> action)的参数为消费者接口,即能够传入一个参数,该参数为上一个工作的执行后果。
// 例子
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executor = Executors.newFixedThreadPool(4);try {    CompletableFuture.supplyAsync(() -> 666, executor)            .thenAccept((res) -> System.out.println("我能拿到上一个的后果" + res + ",然而我没法传出去。")            );} catch (Exception e) {    e.printStackTrace();}finally {    executor.shutdown();}

}

/**
输入后果:

我能拿到上一个的后果666,然而我没法传出去。

**/
复制代码

thenApply 后面工作执行完后执行当前任务,生产后面的后果,具备返回值

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作A实现后接着执行工作B,而且工作B须要A的后果,并且执行完工作B须要有返回后果。
thenApply(Function<? super T,? extends U> fn)的参数为函数式接口,即能够传入一个参数类型为T,该参数是上一个工作的执行后果,并且函数式接口须要有返回值,类型为U。
// 例子
public static void main(String[] args) {

// 疾速创立线程池ExecutorService executor = Executors.newFixedThreadPool(4);try {    CompletableFuture.supplyAsync(() -> 666, executor)            .thenApply((res) -> {                    System.out.println("我能拿到上一个的后果" + res + "并且我要将后果传出去");                    return res;                }            ).whenComplete((res, ex) -> System.out.println("后果" + res));} catch (Exception e) {    e.printStackTrace();}finally {    executor.shutdown();}

}
/**
输入后果:

我能拿到上一个的后果666并且我要将后果传出去后果666

**/
复制代码

⑤ 异样解决

exceptionally 异样捕捉,只生产后面工作中呈现的异样信息,具备返回值

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
复制代码
能够通过链式调用该办法来获取异样信息,并且具备返回值。如果某一个工作出现异常被exceptionally捕捉到则残余的工作将不会再执行。相似于Java异样解决的catch。
exceptionally(Function<Throwable, ? extends T> fn)的参数是函数式接口,具备一个参数以及返回值,该参数为后面工作的异样信息。