关于java:CompletableFuture实现异步编排

71次阅读

共计 19814 个字符,预计需要花费 50 分钟才能阅读完成。

为什么须要异步执行?

场景:电商零碎中获取一个残缺的商品信息可能分为以下几步:①获取商品根本信息 ②获取商品图片信息 ③获取商品促销流动信息 ④获取商品各种类的根本信息 等操作,如果应用串行形式去执行这些操作,假如每个操作执行 1s,那么用户看到残缺的商品详情就须要 4s 的工夫,如果应用并行形式执行这些操作,可能只须要 1s 就能够实现。所以这就是异步执行的益处。

JDK5 的 Future 接口

Future接口用于 代表异步计算的后果,通过 Future 接口提供的办法能够查看异步计算是否执行实现,或者期待执行后果并获取执行后果,同时还能够勾销执行。

列举 Future 接口的办法:

  • get()获取工作执行后果 ,如果 工作还没实现则会阻塞期待 直到工作执行实现。如果工作被勾销则会抛出 CancellationException 异样,如果工作执行过程产生异样则会抛出 ExecutionException 异样,如果 阻塞期待过程中被中断 则会抛出 InterruptedException 异样。
  • get(long timeout,Timeunit unit)带超时工夫的 get() 办法 ,如果 阻塞期待过程中超时 则会抛出 TimeoutException 异样。
  • cancel()用于勾销异步工作的执行 。如果异步工作曾经实现或者曾经被勾销,或者因为某些起因不能取消,则会返回 false。如果工作还没有被执行,则会返回 true 并且异步工作不会被执行。如果 工作曾经开始执行了然而还没有执行实现 ,若mayInterruptIfRunning 为 true,则会立刻中断执行工作的线程并返回 true,若 mayInterruptIfRunning 为 false,则会返回 true 且不会中断工作执行线程。
  • isCanceled()判断工作是否被勾销 ,如果工作在完结(失常执行完结或者执行异样完结) 前被勾销则返回 true,否则返回 false。
  • isDone()判断工作是否曾经实现,如果实现则返回 true,否则返回 false。须要留神的是:工作执行过程中产生异样、工作被勾销也属于工作已实现,也会返回 true。

应用 Future 接口和 Callable 接口实现异步执行:

public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    // 获取商品根本信息(能够应用 Lambda 表达式简化 Callable 接口,这里为了便于察看不应用)Future<String> future1 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {return "获取到商品根本信息";}
    });
    // 获取商品图片信息
    Future<String> future2 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {return "获取商品图片信息";}
    });
    // 获取商品促销信息
    Future<String> future3 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {return "获取商品促销信息";}
    });
    // 获取商品各种类根本信息
    Future<String> future4 = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {return "获取商品各种类根本信息";}
    });
        // 获取后果
    try {System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
        System.out.println(future4.get());
    } catch (InterruptedException | ExecutionException e) {e.printStackTrace();
    }finally {executorService.shutdown();
    }
}

既然 Future 能够实现异步执行并获取后果,为什么还会须要 CompletableFuture?

简述一下 Future 接口的弊病:

  • 不反对手动实现

    • 当提交了一个工作,然而执行太慢了,通过其余门路曾经获取到了工作后果,当初没法把这个工作后果告诉到正在执行的线程,所以必须被动勾销或者始终期待它执行实现。
  • 不反对进一步的非阻塞调用

    • 通过 Future 的 get() 办法会始终阻塞到工作实现,然而想在获取工作之后执行额定的工作,因为 Future 不反对回调函数,所以无奈实现这个性能。
  • 不反对链式调用

    • 对于 Future 的执行后果,想持续传到下一个 Future 解决应用,从而造成一个链式的 pipline 调用,这在 Future 中无奈实现。
  • 不反对多个 Future 合并

    • 比方有 10 个 Future 并行执行,想在所有的 Future 运行结束之后,执行某些函数,是无奈通过 Future 实现的。
  • 不反对异样解决

    • Future 的 API 没有任何的异样解决的 api,所以在异步运行时,如果出了异样问题不好定位。

应用 Future 接口能够 通过 get() 阻塞式获取后果或者通过轮询+isDone()非阻塞式获取后果 ,然而 前一种办法会阻塞,后一种会消耗 CPU 资源,所以 JDK 的 Future 接口实现异步执行对获取后果不太敌对,所以在JDK8 时推出了 CompletableFuture 实现异步编排


CompletableFuture 的应用

CompletableFuture 概述

JDK8 中新减少了一个蕴含 50 个办法左右的类 CompletableFuture,提供了十分弱小的 Future 的扩大性能,能够帮忙咱们简化异步编程的复杂性,提供了函数式编程的能力,能够 通过回调的形式 解决计算结果,并且 提供了转换和组合 CompletableFuture 的办法

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

CompletableFuture类实现了 Future 接口和 CompletionStage 接口,即除了能够应用 Future 接口的所有办法之外,CompletionStage<T>接口提供了更多办法来更好的实现异步编排,并且大量的应用了 JDK8 引入的函数式编程概念。前面会粗疏的介绍罕用的 API。


① 创立 CompletableFuture 的形式

  1. 应用 new 关键字创立
// 无返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>("result");
// 已知返回后果(底层其实也是带参数的结构器赋值)CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");

创立一个返回后果类型为 String 的 CompletableFuture,能够应用 Future 接口的 get() 办法获取该值(同样也会阻塞)。

能够应用无参结构器返回一个没有后果的 CompletableFuture,也能够通过结构器的传参 CompletableFuture 设置好返回后果,或者应用 CompletableFuture.completedFuture(U value) 结构一个已知后果的 CompletableFuture。

  1. 应用 CompletableFuture 类的动态工厂办法(罕用)
  1. runAsync() 无返回值
// 应用默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 应用自定义线程池(举荐)public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) 

runAsync()办法的参数是 Runnable 接口,这是一个函数式接口,不容许返回值。当须要异步操作且不关怀返回后果的时候能够应用 runAsync() 办法。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过 Lambda 表达式实现 Runnable 接口
        CompletableFuture.runAsync(()-> System.out.println("获取商品根本信息胜利"), executor).get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}
  1. supplyAsync() 有返回值
// 应用默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 应用自定义线程池(举荐)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

supplyAsync()办法的参数是 Supplier<U> 供应型接口(无参有返回值),这也是一个函数式接口,U是返回后果值的类型 当须要异步操作且关怀返回后果的时候, 能够应用 supplyAsync() 办法。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过 Lambda 表达式实现执行内容,并返回后果通过 CompletableFuture 接管
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("获取商品信息胜利");
            return "信息";
        }, executor);
        // 输入后果
        System.out.println(completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}  

对于第二个参数 Executor executor 阐明

在没有指定第二个参数(即没有指定线程池)时,CompletableFuture 间接应用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

在理论生产中会应用自定义的线程池来执行异步代码,具体能够参考另一篇文章深刻了解线程池 ThreadPoolExecutor – 掘金 (juejin.cn),外面的第二节有生产中怎么创立自定义线程的例子,能够参考一下。


② 取得异步执行后果

  1. get() 阻塞式获取执行后果
public T get() throws InterruptedException, ExecutionException

该办法调用后如果 工作还没实现则会阻塞期待 直到工作执行实现。如果 工作执行过程产生异样 则会抛出 ExecutionException 异样,如果 阻塞期待过程中被中断 则会抛出 InterruptedException 异样。

  1. get(long timeout, TimeUnit unit) 带超时的阻塞式获取执行后果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

该办法调用后如果如果 工作还没实现则会阻塞期待 直到工作执行实现或者超出 timeout 工夫,如果 阻塞期待过程中超时 则会抛出 TimeoutException 异样。

  1. getNow(T valueIfAbsent) 立即获取执行后果
public T getNow(T valueIfAbsent)

该办法调用后,会立即获取后果不会阻塞期待 。如果工作实现则间接返回执行实现后的后果,如果工作没有实现,则返回调用办法时传入的参数valueIfAbsent 值。

  1. join() 不抛异样的阻塞时获取执行后果
public T join()

该办法和 get() 办法作用一样,只是 不会抛出异样

  1. complete(T value) 被动触发计算,返回异步是否执行结束
public boolean complete(T value)

该办法调用后,会 被动触发计算结果 ,如果此时异步执行并没有实现(此时 boolean 值返回 true),则通过get() 拿到的数据会是 complete() 设置的参数 value 值,如果此时异步执行曾经实现(此时 boolean 值返回 false),则通过 get() 拿到的就是执行实现的后果。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {
        // 通过 Lambda 表达式实现执行内容,并返回后果通过 CompletableFuture 接管
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 休眠 2 秒,使得异步执行变慢,会导致被动触发计算先执行,此时返回的 get 就是 555
            try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
            return 666;
        }, executor);
        // 被动触发计算,判断异步执行是否实现
        System.out.println(completableFuture.complete(555));
        // 输入后果
        System.out.println(completableFuture.get());
    } catch (InterruptedException | ExecutionException e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    true
    555
**/

③ 对执行后果进行解决

whenComplete 期待后面工作执行完再执行以后解决

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)

在创立好的初始工作或者是上一个工作后通过链式调用该办法,会在 之前工作执行实现后继续执行 whenComplete 里的内容(whenComplete传入的 action 只是对之前工作的后果进行解决),即应用该办法能够防止后面说到的 Future 接口的问题,不再须要通过阻塞或者轮询的形式去获取后果,而是通过调用该办法等工作执行结束主动调用。

该办法的参数为 BiConsumer<? super T, ? super Throwable> action 消费者接口,能够接管两个参数,一个是工作执行完的后果,一个是执行工作时的异样

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture.supplyAsync(() -> 666, executor)
                .whenComplete((res, ex) -> System.out.println("工作执行结束,后果为" + res + "异样为" + ex)
                );
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    工作执行结束,后果为 666 异样为 null
**/

除了上述的办法外,还有一些相似的办法如 XXXAsync() 或者是XXXAsync(XX,Executor executor), 对于这些办法,这里对立阐明,后续文章中将不会再列举

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

XXXAsync():示意上一个工作执行实现后,不会再应用之前工作中的线程,而是 从新应用从默认线程(ForkJoinPool 线程池)中从新获取新的线程执行当前任务

XXXAsync(XX,Executor executor):示意不会沿用之前工作的线程,而是 应用本人第二个参数指定的线程池从新获取线程执行当前任务


④ 对执行后果进行生产

  1. thenRun 后面工作执行完后执行当前任务,不关怀后面工作的后果,也没返回值
public CompletableFuture<Void> thenRun(Runnable action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,然而工作 B 不须要 A 的后果,并且执行完工作 B 也不会返回后果

thenRun(Runnable action)的参数为 Runnable 接口 即没有传入参数

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture.supplyAsync(() -> 666, executor)
                    .thenRun(() -> System.out.println("我都没有参数怎么拿到之前的后果,我也没有返回值。")
                );
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    我都没有参数怎么拿到之前的后果,我也没有返回值。**/
  1. thenAccept 后面工作执行完后执行当前任务,生产后面的后果,没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,而且工作 B 须要 A 的后果,然而执行完工作 B 不会返回后果

thenAccept(Consumer<? super T> action)的参数为消费者接口,即能够传入一个参数,该参数为上一个工作的执行后果。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture.supplyAsync(() -> 666, executor)
                .thenAccept((res) -> System.out.println("我能拿到上一个的后果" + res + ", 然而我没法传出去。")
                );
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    我能拿到上一个的后果 666, 然而我没法传出去。**/
  1. thenApply 后面工作执行完后执行当前任务,生产后面的后果,具备返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

CompletableFuture.supplyAsync(actionA).thenRun(actionB)像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,而且工作 B 须要 A 的后果,并且执行完工作 B 须要有返回后果

thenApply(Function<? super T,? extends U> fn)的参数为函数式接口,即能够传入一个参数类型为 T,该参数是上一个工作的执行后果,并且函数式接口须要有返回值, 类型为 U。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {System.out.println("我能拿到上一个的后果" + res + "并且我要将后果传出去");
                        return res;
                    }
                ).whenComplete((res, ex) -> System.out.println("后果" + res));
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}
/**
输入后果:
    我能拿到上一个的后果 666 并且我要将后果传出去
    后果 666
**/

⑤ 异样解决

  1. exceptionally 异样捕捉,只生产后面工作中呈现的异样信息,具备返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

能够通过链式调用该办法来获取异样信息,并且具备返回值。如果某一个工作出现异常被 exceptionally 捕捉到则残余的工作将不会再执行。相似于 Java 异样解决的 catch。

exceptionally(Function<Throwable, ? extends T> fn)的参数是函数式接口,具备一个参数以及返回值,该参数为后面工作的异样信息。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) throw new RuntimeException("error");
                    return 666;
                }, executor)
                .thenApply((res) -> {System.out.println("不出现异常,后果为" + res);
                    return res;
                }).exceptionally((ex) -> {ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
// 这是不出现异常的状况
不出现异常,后果为 666

// 这是出现异常的状况
java.util.concurrent.CompletionException: java.lang.RuntimeException: error
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: error
        at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 3 more
**/
  1. handle 异样解决,生产后面的后果及异样信息,具备返回值,不会中断后续工作
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

能够通过链式调用该办法能够跟 thenApply() 一样能够生产后面工作的后果并实现本人工作内容,并且具备返回值。不同之处在于出现异常也能够接着往下执行,依据异样参数做进一步解决。

handle(BiFunction<? super T, Throwable, ? extends U> fn)的参数是消费者接口,一个参数是工作执行后果,一个是异样信息,并且具备返回值。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture.supplyAsync(() -> 666, executor)
                .thenApply((res) -> {if (Math.random() < 0.5) throw new RuntimeException("error");
                    return res;
                }).handle((res, ex) -> {System.out.println("后果" + res + "(null 示意之前出现异常导致后果无奈传过来)");
                    return res == null ? -1 : res;
                }).thenApply((res) -> {System.out.println("后果为" + res + "(- 1 示意之前出现异常,通过 handler 使得后果解决成 -1)");
                    return res;
                }).exceptionally((ex) -> {ex.printStackTrace();
                    return -1;
                });
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
// 这是不出现异常的状况
后果 666(null 示意之前出现异常导致后果无奈传过来)
后果为 666(- 1 示意之前出现异常,通过 handler 使得后果解决成 -1)

// 这是出现异常的状况
后果 null(null 示意之前出现异常导致后果无奈传过来)
后果为 -1(- 1 示意之前出现异常,通过 handler 使得后果解决成 -1)
**/

能够看到通过 handle 相似于 Java 异样解决的 finally,出现异常并不会像应用 exceptionally 那样中断后续的工作,而是继续执行,能够通过 handle 为之前出现异常无奈取得的后果从新赋值(依据业务需要设置安全值之类的)。


⑥ 两组工作按程序执行

thenCompose 实现两组工作按前后程序执行

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
复制代码

A.thenCompose(B)相当于工作 A 要排在工作 B 后面,即程序的执行工作 A、工作 B 。该办法的参数是函数式接口,函数式接口的参数是调用者的执行后果,返回值是另一个工作 B。

public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {System.out.println("工作 A 先执行后果为 666");
            return 666;
        }, executor);

        actionA.thenCompose((res) ->  CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 后执行后果加上 333");
            return 333 + res;
        })).whenComplete((res, ex) -> System.out.println(res));
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    工作 A 先执行后果为 666
    工作 B 后执行后果加上 333
    999
**/

⑦ 两组工作谁快用谁

applyToEither 比拟两组工作执行速度,谁快生产谁的执行后果

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)

该办法用于 比拟两组工作的执行速度,谁先执行完就用谁的执行后果

传入参数阐明:第一个参数传入的是另一个工作的执行内容,第二个参数传入的是最终这两个工作谁快返回谁的后果,并通过以后函数式接口进行接管和解决(应用函数式接口,有参且有返回值)。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 A 期待久一点,执行后果为 555");
            return 555;
        }, executor);

        actionA.applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 很快,执行后果为 666");
            return 666;
        }), (res) -> {System.out.println("最终应用的执行后果为" + res);
            return res;
        });
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}
/**
输入后果:
    工作 B 很快,执行后果为 666
    最终应用的执行后果为 666
    工作 A 期待久一点,执行后果为 555
**/

除了 applyToEither 对工作最终后果进行获取并生产,并且具备返回值的办法外,还有两个相似的办法。

// 这个办法成果和下面的一样,比谁快拿谁的后果,不同的是这个办法只生产不具备返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
// 这个办法成果和下面的一样,比谁快拿谁的后果,不同的是这个办法不生产后果也不具备返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)

⑧ 两组工作实现后合并

thenCombine 期待两组工作执行结束后,合并两组工作的执行后果

 public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

该办法用于 两组工作都实现后,将两组工作的执行后果一起交给以后办法的 BiFunction 解决。先实现的工作会期待后者工作实现。

传入参数阐明:第一个参数传入的是另一个工作的执行内容,第二个参数传入的是带两个参数的函数式接口(第一个参数是工作 1 的执行后果,第二个参数是工作 2 的执行后果,具备返回值)。

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 A 期待久一点,执行后果为 333");
            return 333;
        }, executor);

        CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 很快,执行后果为 666");
            return 666;
        }, executor);

        actionA.thenCombine(actionB, (res1, res2) -> {System.out.println("最终应用的执行后果为" + (res1 + res2));
            return res1 + res2;
        });
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    工作 B 很快,执行后果为 666
    工作 A 期待久一点,执行后果为 333
    最终应用的执行后果为 999
**/

除了 thenCombine 对工作最终后果进行获取并生产,并且具备返回值的办法外,还有两个相似的办法。

// 这个办法成果和下面的一样,获取合并后果,不同的是这个办法只生产不具备返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action)
// 这个办法成果和下面的一样,获取合并后果,不同的是这个办法不生产后果也不具备返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action)

⑨ 多任务组合

  1. allOf 实现并行地执行多个工作,期待所有工作执行实现(无需思考执行程序)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

该办法能够 实现并行地执行多个工作,实用于多个工作没有依赖关系,能够相互独立执行的,传入参数为多个工作,没有返回值。

allOf()办法会期待所有的工作执行结束再返回,能够通过 get() 阻塞确保所有工作执行结束

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 A 期待 2 秒后执行结束");
        }, executor);

        CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> {System.out.println("工作 B 很快执行结束");
        }, executor);

        CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> {try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 C 期待 1 秒后执行结束");
        }, executor);

        CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> {try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 D 期待 5 秒后执行结束");
        }, executor);

        CompletableFuture.allOf(actionA, actionB, actionC, actionD).get();} catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    工作 B 很快执行结束
    工作 C 期待 1 秒后执行结束
    工作 A 期待 2 秒后执行结束
    工作 D 期待 5 秒后执行结束
**/
  1. anyOf 实现并行地执行多个工作,只有有个一个实现的便会返回执行后果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

该办法能够 实现并行地执行多个工作 ,传入参数为多个工作,具备返回值。该办法 不会期待所有工作执行实现后再返回后果,而是当有一个工作实现时,便会返回那个工作的执行后果

// 例子
public static void main(String[] args) {
    // 疾速创立线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 A 期待 2 秒后执行结束");
            return 555;
        }, executor);

        CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 很快执行结束");
            return 666;
        }, executor);

        CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 C 期待 1 秒后执行结束");
            return 999;
        }, executor);

        CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println("工作 D 期待 5 秒后执行结束");
            return 888;
        }, executor);

        System.out.println("最先执行完的返回后果为" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get());
    } catch (Exception e) {e.printStackTrace();
    }finally {executor.shutdown();
    }
}

/**
输入后果:
    工作 B 很快执行结束
    最先执行完的返回后果为 666
    工作 C 期待 1 秒后执行结束
    工作 A 期待 2 秒后执行结束
    工作 D 期待 5 秒后执行结束
**/

一个应用 CompletableFuture 异步编排的例子

不须要关怀例子中的业务内容,应用时依照本人业务的需要,对不同的需要调用不同 API 即可。

编写工作时次要关怀以下几点:① 是否须要生产之前工作的后果 ② 是否须要返回后果给其余工作生产 ③ 是否要求程序执行(是否容许并行,有没有前置要求)

/**
 * 该办法用于获取单个商品的所有信息
 * 1\. 商品的根本信息
 * 2\. 商品的图片信息
 * 3\. 商品的销售属性组合
 * 4\. 商品的各种分类根本信息
 * 5\. 商品的促销信息
 */
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
    // 创立商品 Vo 通过各个工作去欠缺 Vo 的信息
    SkuItemVo skuItemVo = new SkuItemVo();

    // 获取商品根本信息 查问到后设置进 Vo 中,返回根本信息给后续工作生产(应用自定义的线程池进行异步)CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {SkuInfoEntity info = this.getById(skuId);
        skuItemVo.setInfo(info);
        return info;
    }, executor);

    // 获取商品的图片信息 获取后设置进 Vo 中,此处不须要生产图片信息,也不须要返回后果。所以应用 runAsync 即可
    CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
        skuItemVo.setImages(imagesEntities);
    }, executor);

    // 获取商品销售属性 因为要利用之前查问到的根本信息,但后续工作不须要生产销售属性(不须要返回后果),所以应用 thenAcceptAsync 生产之前的根本信息,不返回销售信息。CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
        skuItemVo.setSaleAttr(saleAttrVos);
    }, executor);

    // 获取商品各分类根本信息,同样要生产之前的根本信息,但无需返回,所以应用 thenAcceptAsync 即可
    CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesc(spuInfoDescEntity);
    }, executor);

    // 获取商品的促销信息 这个也不须要生产之前工作的后果,也不须要返回后果。所以间接应用 runAsync 即可
    CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
        if (skuSeckilInfo.getCode() == 0) {SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {});
            skuItemVo.setSeckillSkuVo(seckilInfoData);

            if (seckilInfoData != null) {long currentTime = System.currentTimeMillis();
                if (currentTime > seckilInfoData.getEndTime()) {skuItemVo.setSeckillSkuVo(null);
                }
            }
        }
    }, executor);

    // 应用 allOf()组合所有工作,并且应用 get()阻塞,期待所有工作实现。// 补充:infoFuture 不能放入 allOf 中,因为 allOf 是并行无序执行(须要多个条件是无依赖性的)的,当下面工作中有须要生产 infoFuture 的后果,所以须要先执行 infoFuture。CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get();

    // 最初返回商品 Vo
    return skuItemVo;
}

正文完
 0