关于java:CompletableFuture实现异步编排

为什么须要异步执行?

场景:电商零碎中获取一个残缺的商品信息可能分为以下几步:①获取商品根本信息 ②获取商品图片信息 ③获取商品促销流动信息 ④获取商品各种类的根本信息 等操作,如果应用串行形式去执行这些操作,假如每个操作执行1s,那么用户看到残缺的商品详情就须要4s的工夫,如果应用并行形式执行这些操作,可能只须要1s就能够实现。所以这就是异步执行的益处。

JDK5的Future接口

Future接口用于代表异步计算的后果,通过Future接口提供的办法能够查看异步计算是否执行实现,或者期待执行后果并获取执行后果,同时还能够勾销执行。

列举Future接口的办法:

  • get()获取工作执行后果,如果工作还没实现则会阻塞期待直到工作执行实现。如果工作被勾销则会抛出CancellationException异样,如果工作执行过程产生异样则会抛出ExecutionException异样,如果阻塞期待过程中被中断则会抛出InterruptedException异样。
  • get(long timeout,Timeunit unit)带超时工夫的get()办法,如果阻塞期待过程中超时则会抛出TimeoutException异样。
  • cancel()用于勾销异步工作的执行。如果异步工作曾经实现或者曾经被勾销,或者因为某些起因不能取消,则会返回false。如果工作还没有被执行,则会返回true并且异步工作不会被执行。如果工作曾经开始执行了然而还没有执行实现,若mayInterruptIfRunning为true,则会立刻中断执行工作的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断工作执行线程。
  • isCanceled()判断工作是否被勾销,如果工作在完结(失常执行完结或者执行异样完结)前被勾销则返回true,否则返回false。
  • isDone()判断工作是否曾经实现,如果实现则返回true,否则返回false。须要留神的是:工作执行过程中产生异样、工作被勾销也属于工作已实现,也会返回true。

应用Future接口和Callable接口实现异步执行:

public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    // 获取商品根本信息(能够应用Lambda表达式简化Callable接口,这里为了便于察看不应用)
    Future<String> future1 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "获取到商品根本信息";
        }
    });
    // 获取商品图片信息
    Future<String> future2 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "获取商品图片信息";
        }
    });
    // 获取商品促销信息
    Future<String> future3 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "获取商品促销信息";
        }
    });
    // 获取商品各种类根本信息
    Future<String> future4 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "获取商品各种类根本信息";
        }
    });
        // 获取后果
    try {
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
        System.out.println(future4.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executorService.shutdown();
    }
}

既然Future能够实现异步执行并获取后果,为什么还会须要CompletableFuture?

简述一下Future接口的弊病:

  • 不反对手动实现

    • 当提交了一个工作,然而执行太慢了,通过其余门路曾经获取到了工作后果,当初没法把这个工作后果告诉到正在执行的线程,所以必须被动勾销或者始终期待它执行实现。
  • 不反对进一步的非阻塞调用

    • 通过Future的get()办法会始终阻塞到工作实现,然而想在获取工作之后执行额定的工作,因为 Future 不反对回调函数,所以无奈实现这个性能。
  • 不反对链式调用

    • 对于Future的执行后果,想持续传到下一个Future解决应用,从而造成一个链式的pipline调用,这在 Future中无奈实现。
  • 不反对多个 Future 合并

    • 比方有10个Future并行执行,想在所有的Future运行结束之后,执行某些函数,是无奈通过Future实现的。
  • 不反对异样解决

    • Future的API没有任何的异样解决的api,所以在异步运行时,如果出了异样问题不好定位。

应用Future接口能够通过get()阻塞式获取后果或者通过轮询+isDone()非阻塞式获取后果,然而前一种办法会阻塞,后一种会消耗CPU资源,所以JDK的Future接口实现异步执行对获取后果不太敌对,所以在JDK8时推出了CompletableFuture实现异步编排


CompletableFuture的应用

CompletableFuture概述

JDK8中新减少了一个蕴含50个办法左右的类CompletableFuture,提供了十分弱小的Future的扩大性能,能够帮忙咱们简化异步编程的复杂性,提供了函数式编程的能力,能够通过回调的形式解决计算结果,并且提供了转换和组合CompletableFuture的办法

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

CompletableFuture类实现了Future接口和CompletionStage接口,即除了能够应用Future接口的所有办法之外,CompletionStage<T>接口提供了更多办法来更好的实现异步编排,并且大量的应用了JDK8引入的函数式编程概念。前面会粗疏的介绍罕用的API。


① 创立CompletableFuture的形式

  1. 应用new关键字创立
// 无返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>("result");
// 已知返回后果(底层其实也是带参数的结构器赋值)
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");

创立一个返回后果类型为String的CompletableFuture,能够应用Future接口的get()办法获取该值(同样也会阻塞)。

能够应用无参结构器返回一个没有后果的CompletableFuture,也能够通过结构器的传参CompletableFuture设置好返回后果,或者应用CompletableFuture.completedFuture(U value)结构一个已知后果的CompletableFuture。

  1. 应用CompletableFuture类的动态工厂办法(罕用)
  1. runAsync() 无返回值
// 应用默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 应用自定义线程池(举荐)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) 

runAsync()办法的参数是Runnable接口,这是一个函数式接口,不容许返回值。当须要异步操作且不关怀返回后果的时候能够应用runAsync()办法。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过Lambda表达式实现Runnable接口
        CompletableFuture.runAsync(()-> System.out.println("获取商品根本信息胜利"), executor).get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
  1. supplyAsync() 有返回值
// 应用默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 应用自定义线程池(举荐)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

supplyAsync()办法的参数是Supplier<U>供应型接口(无参有返回值),这也是一个函数式接口,U是返回后果值的类型当须要异步操作且关怀返回后果的时候,能够应用supplyAsync()办法。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过Lambda表达式实现执行内容,并返回后果通过CompletableFuture接管
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("获取商品信息胜利");
            return "信息";
        }, executor);
        // 输入后果
        System.out.println(completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}  

对于第二个参数Executor executor阐明

在没有指定第二个参数(即没有指定线程池)时,CompletableFuture间接应用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。

在理论生产中会应用自定义的线程池来执行异步代码,具体能够参考另一篇文章深刻了解线程池ThreadPoolExecutor – 掘金 (juejin.cn),外面的第二节有生产中怎么创立自定义线程的例子,能够参考一下。


② 取得异步执行后果

  1. get() 阻塞式获取执行后果
public T get() throws InterruptedException, ExecutionException

该办法调用后如果工作还没实现则会阻塞期待直到工作执行实现。如果工作执行过程产生异样则会抛出ExecutionException异样,如果阻塞期待过程中被中断则会抛出InterruptedException异样。

  1. get(long timeout, TimeUnit unit) 带超时的阻塞式获取执行后果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

该办法调用后如果如果工作还没实现则会阻塞期待直到工作执行实现或者超出timeout工夫,如果阻塞期待过程中超时则会抛出TimeoutException异样。

  1. getNow(T valueIfAbsent) 立即获取执行后果
public T getNow(T valueIfAbsent)

该办法调用后,会立即获取后果不会阻塞期待。如果工作实现则间接返回执行实现后的后果,如果工作没有实现,则返回调用办法时传入的参数valueIfAbsent值。

  1. join() 不抛异样的阻塞时获取执行后果
public T join()

该办法和get()办法作用一样,只是不会抛出异样

  1. complete(T value) 被动触发计算,返回异步是否执行结束
public boolean complete(T value)

该办法调用后,会被动触发计算结果,如果此时异步执行并没有实现(此时boolean值返回true),则通过get()拿到的数据会是complete()设置的参数value值,如果此时异步执行曾经实现(此时boolean值返回false),则通过get()拿到的就是执行实现的后果。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过Lambda表达式实现执行内容,并返回后果通过CompletableFuture接管
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 休眠2秒,使得异步执行变慢,会导致被动触发计算先执行,此时返回的get就是555
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 666;
        }, executor);
        // 被动触发计算,判断异步执行是否实现
        System.out.println(completableFuture.complete(555));
        // 输入后果
        System.out.println(completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    true
    555
**/

③ 对执行后果进行解决

whenComplete 期待后面工作执行完再执行以后解决

public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action)

在创立好的初始工作或者是上一个工作后通过链式调用该办法,会在之前工作执行实现后继续执行whenComplete里的内容(whenComplete传入的action只是对之前工作的后果进行解决),即应用该办法能够防止后面说到的Future接口的问题,不再须要通过阻塞或者轮询的形式去获取后果,而是通过调用该办法等工作执行结束主动调用。

该办法的参数为BiConsumer<? super T, ? super Throwable> action消费者接口,能够接管两个参数,一个是工作执行完的后果,一个是执行工作时的异样

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .whenComplete((res, ex) -> System.out.println("工作执行结束,后果为" + res + " 异样为" + ex)
                );
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    工作执行结束,后果为666 异样为null
**/

除了上述的办法外,还有一些相似的办法如XXXAsync()或者是XXXAsync(XX,Executor executor),对于这些办法,这里对立阐明,后续文章中将不会再列举

public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor)

XXXAsync():示意上一个工作执行实现后,不会再应用之前工作中的线程,而是从新应用从默认线程(ForkJoinPool 线程池)中从新获取新的线程执行当前任务

XXXAsync(XX,Executor executor):示意不会沿用之前工作的线程,而是应用本人第二个参数指定的线程池从新获取线程执行当前任务


④ 对执行后果进行生产

  1. thenRun 后面工作执行完后执行当前任务,不关怀后面工作的后果,也没返回值
public CompletableFuture<Void> thenRun(Runnable action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作A实现后接着执行工作B,然而工作B不须要A的后果,并且执行完工作B也不会返回后果

thenRun(Runnable action)的参数为Runnable接口即没有传入参数

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                    .thenRun(() -> System.out.println("我都没有参数怎么拿到之前的后果,我也没有返回值。")
                );
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    我都没有参数怎么拿到之前的后果,我也没有返回值。
**/
  1. thenAccept 后面工作执行完后执行当前任务,生产后面的后果,没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作A实现后接着执行工作B,而且工作B须要A的后果,然而执行完工作B不会返回后果

thenAccept(Consumer<? super T> action)的参数为消费者接口,即能够传入一个参数,该参数为上一个工作的执行后果。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenAccept((res) -> System.out.println("我能拿到上一个的后果" + res + ",然而我没法传出去。")
                );
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    我能拿到上一个的后果666,然而我没法传出去。
**/
  1. thenApply 后面工作执行完后执行当前任务,生产后面的后果,具备返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作A实现后接着执行工作B,而且工作B须要A的后果,并且执行完工作B须要有返回后果

thenApply(Function<? super T,? extends U> fn)的参数为函数式接口,即能够传入一个参数类型为T,该参数是上一个工作的执行后果,并且函数式接口须要有返回值,类型为U。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {
                        System.out.println("我能拿到上一个的后果" + res + "并且我要将后果传出去");
                        return res;
                    }
                ).whenComplete((res, ex) -> System.out.println("后果" + res));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
输入后果:
    我能拿到上一个的后果666并且我要将后果传出去
    后果666
**/

⑤ 异样解决

  1. exceptionally 异样捕捉,只生产后面工作中呈现的异样信息,具备返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

能够通过链式调用该办法来获取异样信息,并且具备返回值。如果某一个工作出现异常被exceptionally捕捉到则残余的工作将不会再执行。相似于Java异样解决的catch。

exceptionally(Function<Throwable, ? extends T> fn)的参数是函数式接口,具备一个参数以及返回值,该参数为后面工作的异样信息。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> {
                    if (Math.random() < 0.5) throw new RuntimeException("error");
                    return 666;
                }, executor)
                .thenApply((res) -> {
                    System.out.println("不出现异常,后果为" + res);
                    return res;
                }).exceptionally((ex) -> {
                    ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
// 这是不出现异常的状况
不出现异常,后果为666

// 这是出现异常的状况
java.util.concurrent.CompletionException: java.lang.RuntimeException: error
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: error
        at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 3 more
**/
  1. handle 异样解决,生产后面的后果及异样信息,具备返回值,不会中断后续工作
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

能够通过链式调用该办法能够跟thenApply()一样能够生产后面工作的后果并实现本人工作内容,并且具备返回值。不同之处在于出现异常也能够接着往下执行,依据异样参数做进一步解决。

handle(BiFunction<? super T, Throwable, ? extends U> fn)的参数是消费者接口,一个参数是工作执行后果,一个是异样信息,并且具备返回值。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {
                    if (Math.random() < 0.5) throw new RuntimeException("error");
                    return res;
                }).handle((res, ex) -> {
                    System.out.println("后果" + res + "(null示意之前出现异常导致后果无奈传过来)");
                    return res == null ? -1 : res;
                }).thenApply((res) -> {
                    System.out.println("后果为" + res + "(-1示意之前出现异常,通过handler使得后果解决成-1)");
                    return res;
                }).exceptionally((ex) -> {
                    ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
// 这是不出现异常的状况
后果666(null示意之前出现异常导致后果无奈传过来)
后果为666(-1示意之前出现异常,通过handler使得后果解决成-1)

// 这是出现异常的状况
后果null(null示意之前出现异常导致后果无奈传过来)
后果为-1(-1示意之前出现异常,通过handler使得后果解决成-1)
**/

能够看到通过handle相似于Java异样解决的finally,出现异常并不会像应用exceptionally那样中断后续的工作,而是继续执行,能够通过handle为之前出现异常无奈取得的后果从新赋值(依据业务需要设置安全值之类的)。


⑥ 两组工作按程序执行

thenCompose 实现两组工作按前后程序执行

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn)
复制代码

A.thenCompose(B)相当于工作A要排在工作B后面,即程序的执行工作A、工作B。该办法的参数是函数式接口,函数式接口的参数是调用者的执行后果,返回值是另一个工作B。

public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            System.out.println("工作A先执行后果为666");
            return 666;
        }, executor);

        actionA.thenCompose((res) ->  CompletableFuture.supplyAsync(() -> {
            System.out.println("工作B后执行后果加上333");
            return 333 + res;
        })).whenComplete((res, ex) -> System.out.println(res));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    工作A先执行后果为666
    工作B后执行后果加上333
    999
**/

⑦ 两组工作谁快用谁

applyToEither 比拟两组工作执行速度,谁快生产谁的执行后果

public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn)

该办法用于比拟两组工作的执行速度,谁先执行完就用谁的执行后果

传入参数阐明:第一个参数传入的是另一个工作的执行内容,第二个参数传入的是最终这两个工作谁快返回谁的后果,并通过以后函数式接口进行接管和解决(应用函数式接口,有参且有返回值)。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作A期待久一点,执行后果为555");
            return 555;
        }, executor);

        actionA.applyToEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("工作B很快,执行后果为666");
            return 666;
        }), (res) -> {
            System.out.println("最终应用的执行后果为" + res);
            return res;
        });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}
/**
输入后果:
    工作B很快,执行后果为666
    最终应用的执行后果为666
    工作A期待久一点,执行后果为555
**/

除了applyToEither对工作最终后果进行获取并生产,并且具备返回值的办法外,还有两个相似的办法。

// 这个办法成果和下面的一样,比谁快拿谁的后果,不同的是这个办法只生产不具备返回值
public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action)
// 这个办法成果和下面的一样,比谁快拿谁的后果,不同的是这个办法不生产后果也不具备返回值
public CompletableFuture<Void> runAfterEither(
        CompletionStage<?> other, Runnable action)

⑧ 两组工作实现后合并

thenCombine 期待两组工作执行结束后,合并两组工作的执行后果

 public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

该办法用于两组工作都实现后,将两组工作的执行后果一起交给以后办法的BiFunction解决。先实现的工作会期待后者工作实现。

传入参数阐明:第一个参数传入的是另一个工作的执行内容,第二个参数传入的是带两个参数的函数式接口(第一个参数是工作1的执行后果,第二个参数是工作2的执行后果,具备返回值)。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作A期待久一点,执行后果为333");
            return 333;
        }, executor);

        CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
            System.out.println("工作B很快,执行后果为666");
            return 666;
        }, executor);

        actionA.thenCombine(actionB, (res1, res2) -> {
            System.out.println("最终应用的执行后果为" + (res1 + res2));
            return res1 + res2;
        });
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    工作B很快,执行后果为666
    工作A期待久一点,执行后果为333
    最终应用的执行后果为999
**/

除了thenCombine对工作最终后果进行获取并生产,并且具备返回值的办法外,还有两个相似的办法。

// 这个办法成果和下面的一样,获取合并后果,不同的是这个办法只生产不具备返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action)
// 这个办法成果和下面的一样,获取合并后果,不同的是这个办法不生产后果也不具备返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action)

⑨ 多任务组合

  1. allOf 实现并行地执行多个工作,期待所有工作执行实现(无需思考执行程序)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

该办法能够实现并行地执行多个工作,实用于多个工作没有依赖关系,能够相互独立执行的,传入参数为多个工作,没有返回值。

allOf()办法会期待所有的工作执行结束再返回,能够通过get()阻塞确保所有工作执行结束

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作A期待2秒后执行结束");
        }, executor);

        CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> {
            System.out.println("工作B很快执行结束");
        }, executor);

        CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作C期待1秒后执行结束");
        }, executor);

        CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> {
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作D期待5秒后执行结束");
        }, executor);

        CompletableFuture.allOf(actionA, actionB, actionC, actionD).get();
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    工作B很快执行结束
    工作C期待1秒后执行结束
    工作A期待2秒后执行结束
    工作D期待5秒后执行结束
**/
  1. anyOf 实现并行地执行多个工作,只有有个一个实现的便会返回执行后果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

该办法能够实现并行地执行多个工作,传入参数为多个工作,具备返回值。该办法不会期待所有工作执行实现后再返回后果,而是当有一个工作实现时,便会返回那个工作的执行后果

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作A期待2秒后执行结束");
            return 555;
        }, executor);

        CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {
            System.out.println("工作B很快执行结束");
            return 666;
        }, executor);

        CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作C期待1秒后执行结束");
            return 999;
        }, executor);

        CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("工作D期待5秒后执行结束");
            return 888;
        }, executor);

        System.out.println("最先执行完的返回后果为" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get());
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        executor.shutdown();
    }
}

/**
输入后果:
    工作B很快执行结束
    最先执行完的返回后果为666
    工作C期待1秒后执行结束
    工作A期待2秒后执行结束
    工作D期待5秒后执行结束
**/

一个应用CompletableFuture异步编排的例子

不须要关怀例子中的业务内容,应用时依照本人业务的需要,对不同的需要调用不同API即可。

编写工作时次要关怀以下几点: ① 是否须要生产之前工作的后果 ② 是否须要返回后果给其余工作生产 ③ 是否要求程序执行(是否容许并行,有没有前置要求)

/**
 * 该办法用于获取单个商品的所有信息
 * 1\. 商品的根本信息
 * 2\. 商品的图片信息
 * 3\. 商品的销售属性组合
 * 4\. 商品的各种分类根本信息
 * 5\. 商品的促销信息
 */
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
    // 创立商品Vo通过各个工作去欠缺Vo的信息
    SkuItemVo skuItemVo = new SkuItemVo();

    // 获取商品根本信息 查问到后设置进Vo中,返回根本信息给后续工作生产 (应用自定义的线程池进行异步)
    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        SkuInfoEntity info = this.getById(skuId);
        skuItemVo.setInfo(info);
        return info;
    }, executor);

    // 获取商品的图片信息 获取后设置进Vo中,此处不须要生产图片信息,也不须要返回后果。所以应用runAsync即可
    CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
        List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
        skuItemVo.setImages(imagesEntities);
    }, executor);

    // 获取商品销售属性 因为要利用之前查问到的根本信息,但后续工作不须要生产销售属性(不须要返回后果),所以应用thenAcceptAsync生产之前的根本信息,不返回销售信息。
    CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
        List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
        skuItemVo.setSaleAttr(saleAttrVos);
    }, executor);

    // 获取商品各分类根本信息,同样要生产之前的根本信息,但无需返回,所以应用thenAcceptAsync即可
    CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesc(spuInfoDescEntity);
    }, executor);

    // 获取商品的促销信息 这个也不须要生产之前工作的后果,也不须要返回后果。所以间接应用runAsync即可
    CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
        R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
        if (skuSeckilInfo.getCode() == 0) {
            SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
            });
            skuItemVo.setSeckillSkuVo(seckilInfoData);

            if (seckilInfoData != null) {
                long currentTime = System.currentTimeMillis();
                if (currentTime > seckilInfoData.getEndTime()) {
                    skuItemVo.setSeckillSkuVo(null);
                }
            }
        }
    }, executor);

    // 应用allOf()组合所有工作,并且应用get()阻塞,期待所有工作实现。
        // 补充:infoFuture不能放入allOf中,因为allOf是并行无序执行(须要多个条件是无依赖性的)的,当下面工作中有须要生产infoFuture的后果,所以须要先执行infoFuture。
    CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get();

    // 最初返回商品Vo
    return skuItemVo;
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理