为什么须要异步执行?
场景:电商零碎中获取一个残缺的商品信息可能分为以下几步:①获取商品根本信息 ②获取商品图片信息 ③获取商品促销流动信息 ④获取商品各种类的根本信息 等操作,如果应用串行形式去执行这些操作,假如每个操作执行 1s,那么用户看到残缺的商品详情就须要 4s 的工夫,如果应用并行形式执行这些操作,可能只须要 1s 就能够实现。所以这就是异步执行的益处。
JDK5 的
Future
接口
Future
接口用于 代表异步计算的后果,通过 Future 接口提供的办法能够查看异步计算是否执行实现,或者期待执行后果并获取执行后果,同时还能够勾销执行。
列举 Future
接口的办法:
get()
:获取工作执行后果 ,如果 工作还没实现则会阻塞期待 直到工作执行实现。如果工作被勾销则会抛出CancellationException
异样,如果工作执行过程产生异样则会抛出ExecutionException
异样,如果 阻塞期待过程中被中断 则会抛出InterruptedException
异样。get(long timeout,Timeunit unit)
:带超时工夫的get()
办法 ,如果 阻塞期待过程中超时 则会抛出TimeoutException
异样。cancel()
:用于勾销异步工作的执行 。如果异步工作曾经实现或者曾经被勾销,或者因为某些起因不能取消,则会返回 false。如果工作还没有被执行,则会返回 true 并且异步工作不会被执行。如果 工作曾经开始执行了然而还没有执行实现 ,若mayInterruptIfRunning
为 true,则会立刻中断执行工作的线程并返回 true,若mayInterruptIfRunning
为 false,则会返回 true 且不会中断工作执行线程。isCanceled()
:判断工作是否被勾销 ,如果工作在完结(失常执行完结或者执行异样完结) 前被勾销则返回 true,否则返回 false。isDone()
:判断工作是否曾经实现,如果实现则返回 true,否则返回 false。须要留神的是:工作执行过程中产生异样、工作被勾销也属于工作已实现,也会返回 true。
应用 Future
接口和 Callable
接口实现异步执行:
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 获取商品根本信息(能够应用 Lambda 表达式简化 Callable 接口,这里为了便于察看不应用)Future<String> future1 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {return "获取到商品根本信息";}
});
// 获取商品图片信息
Future<String> future2 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {return "获取商品图片信息";}
});
// 获取商品促销信息
Future<String> future3 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {return "获取商品促销信息";}
});
// 获取商品各种类根本信息
Future<String> future4 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {return "获取商品各种类根本信息";}
});
// 获取后果
try {System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
System.out.println(future4.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executorService.shutdown();
}
}
既然 Future 能够实现异步执行并获取后果,为什么还会须要 CompletableFuture?
简述一下 Future 接口的弊病:
-
不反对手动实现
- 当提交了一个工作,然而执行太慢了,通过其余门路曾经获取到了工作后果,当初没法把这个工作后果告诉到正在执行的线程,所以必须被动勾销或者始终期待它执行实现。
-
不反对进一步的非阻塞调用
- 通过 Future 的
get()
办法会始终阻塞到工作实现,然而想在获取工作之后执行额定的工作,因为 Future 不反对回调函数,所以无奈实现这个性能。
- 通过 Future 的
-
不反对链式调用
- 对于 Future 的执行后果,想持续传到下一个 Future 解决应用,从而造成一个链式的 pipline 调用,这在 Future 中无奈实现。
-
不反对多个 Future 合并
- 比方有 10 个 Future 并行执行,想在所有的 Future 运行结束之后,执行某些函数,是无奈通过 Future 实现的。
-
不反对异样解决
- Future 的 API 没有任何的异样解决的 api,所以在异步运行时,如果出了异样问题不好定位。
应用 Future 接口能够 通过 get()
阻塞式获取后果或者通过轮询+isDone()
非阻塞式获取后果 ,然而 前一种办法会阻塞,后一种会消耗 CPU 资源,所以 JDK 的 Future 接口实现异步执行对获取后果不太敌对,所以在JDK8 时推出了 CompletableFuture 实现异步编排。
CompletableFuture 的应用
CompletableFuture 概述
JDK8 中新减少了一个蕴含 50 个办法左右的类 CompletableFuture,提供了十分弱小的 Future 的扩大性能,能够帮忙咱们简化异步编程的复杂性,提供了函数式编程的能力,能够 通过回调的形式 解决计算结果,并且 提供了转换和组合 CompletableFuture 的办法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture
类实现了 Future
接口和 CompletionStage
接口,即除了能够应用 Future
接口的所有办法之外,CompletionStage<T>
接口提供了更多办法来更好的实现异步编排,并且大量的应用了 JDK8 引入的函数式编程概念。前面会粗疏的介绍罕用的 API。
① 创立 CompletableFuture 的形式
- 应用
new
关键字创立
// 无返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// 已知返回后果
CompletableFuture<String> completableFuture = new CompletableFuture<>("result");
// 已知返回后果(底层其实也是带参数的结构器赋值)CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");
创立一个返回后果类型为 String 的 CompletableFuture,能够应用 Future
接口的 get()
办法获取该值(同样也会阻塞)。
能够应用无参结构器返回一个没有后果的 CompletableFuture,也能够通过结构器的传参 CompletableFuture 设置好返回后果,或者应用 CompletableFuture.completedFuture(U value)
结构一个已知后果的 CompletableFuture。
- 应用 CompletableFuture 类的动态工厂办法(罕用)
runAsync()
无返回值
// 应用默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 应用自定义线程池(举荐)public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
runAsync()
办法的参数是 Runnable 接口,这是一个函数式接口,不容许返回值。当须要异步操作且不关怀返回后果的时候能够应用 runAsync()
办法。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 通过 Lambda 表达式实现 Runnable 接口
CompletableFuture.runAsync(()-> System.out.println("获取商品根本信息胜利"), executor).get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
supplyAsync()
有返回值
// 应用默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 应用自定义线程池(举荐)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
supplyAsync()
办法的参数是 Supplier<U>
供应型接口(无参有返回值),这也是一个函数式接口,U
是返回后果值的类型 。 当须要异步操作且关怀返回后果的时候, 能够应用 supplyAsync()
办法。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 通过 Lambda 表达式实现执行内容,并返回后果通过 CompletableFuture 接管
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("获取商品信息胜利");
return "信息";
}, executor);
// 输入后果
System.out.println(completableFuture.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
对于第二个参数
Executor executor
阐明
在没有指定第二个参数(即没有指定线程池)时,CompletableFuture 间接应用默认的 ForkJoinPool.commonPool()
作为它的线程池执行异步代码。
在理论生产中会应用自定义的线程池来执行异步代码,具体能够参考另一篇文章深刻了解线程池 ThreadPoolExecutor – 掘金 (juejin.cn),外面的第二节有生产中怎么创立自定义线程的例子,能够参考一下。
② 取得异步执行后果
get()
阻塞式获取执行后果
public T get() throws InterruptedException, ExecutionException
该办法调用后如果 工作还没实现则会阻塞期待 直到工作执行实现。如果 工作执行过程产生异样 则会抛出 ExecutionException
异样,如果 阻塞期待过程中被中断 则会抛出 InterruptedException
异样。
get(long timeout, TimeUnit unit)
带超时的阻塞式获取执行后果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
该办法调用后如果如果 工作还没实现则会阻塞期待 直到工作执行实现或者超出 timeout 工夫,如果 阻塞期待过程中超时 则会抛出 TimeoutException
异样。
getNow(T valueIfAbsent)
立即获取执行后果
public T getNow(T valueIfAbsent)
该办法调用后,会立即获取后果不会阻塞期待 。如果工作实现则间接返回执行实现后的后果,如果工作没有实现,则返回调用办法时传入的参数valueIfAbsent
值。
join()
不抛异样的阻塞时获取执行后果
public T join()
该办法和 get()
办法作用一样,只是 不会抛出异样。
complete(T value)
被动触发计算,返回异步是否执行结束
public boolean complete(T value)
该办法调用后,会 被动触发计算结果 ,如果此时异步执行并没有实现(此时 boolean 值返回 true),则通过get()
拿到的数据会是 complete()
设置的参数 value
值,如果此时异步执行曾经实现(此时 boolean 值返回 false),则通过 get()
拿到的就是执行实现的后果。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 通过 Lambda 表达式实现执行内容,并返回后果通过 CompletableFuture 接管
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
// 休眠 2 秒,使得异步执行变慢,会导致被动触发计算先执行,此时返回的 get 就是 555
try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
return 666;
}, executor);
// 被动触发计算,判断异步执行是否实现
System.out.println(completableFuture.complete(555));
// 输入后果
System.out.println(completableFuture.get());
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
true
555
**/
③ 对执行后果进行解决
whenComplete
期待后面工作执行完再执行以后解决
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
在创立好的初始工作或者是上一个工作后通过链式调用该办法,会在 之前工作执行实现后继续执行 whenComplete
里的内容(whenComplete
传入的 action 只是对之前工作的后果进行解决),即应用该办法能够防止后面说到的 Future
接口的问题,不再须要通过阻塞或者轮询的形式去获取后果,而是通过调用该办法等工作执行结束主动调用。
该办法的参数为 BiConsumer<? super T, ? super Throwable> action
消费者接口,能够接管两个参数,一个是工作执行完的后果,一个是执行工作时的异样。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
.whenComplete((res, ex) -> System.out.println("工作执行结束,后果为" + res + "异样为" + ex)
);
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
工作执行结束,后果为 666 异样为 null
**/
除了上述的办法外,还有一些相似的办法如
XXXAsync()
或者是XXXAsync(XX,Executor executor)
, 对于这些办法,这里对立阐明,后续文章中将不会再列举
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
XXXAsync()
:示意上一个工作执行实现后,不会再应用之前工作中的线程,而是 从新应用从默认线程(ForkJoinPool 线程池)中从新获取新的线程执行当前任务。
XXXAsync(XX,Executor executor)
:示意不会沿用之前工作的线程,而是 应用本人第二个参数指定的线程池从新获取线程执行当前任务。
④ 对执行后果进行生产
thenRun
后面工作执行完后执行当前任务,不关怀后面工作的后果,也没返回值
public CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,然而工作 B 不须要 A 的后果,并且执行完工作 B 也不会返回后果。
thenRun(Runnable action)
的参数为 Runnable 接口 即没有传入参数。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
.thenRun(() -> System.out.println("我都没有参数怎么拿到之前的后果,我也没有返回值。")
);
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
我都没有参数怎么拿到之前的后果,我也没有返回值。**/
thenAccept
后面工作执行完后执行当前任务,生产后面的后果,没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,而且工作 B 须要 A 的后果,然而执行完工作 B 不会返回后果。
thenAccept(Consumer<? super T> action)
的参数为消费者接口,即能够传入一个参数,该参数为上一个工作的执行后果。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
.thenAccept((res) -> System.out.println("我能拿到上一个的后果" + res + ", 然而我没法传出去。")
);
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
我能拿到上一个的后果 666, 然而我没法传出去。**/
thenApply
后面工作执行完后执行当前任务,生产后面的后果,具备返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该办法示意:执行工作 A 实现后接着执行工作 B,而且工作 B 须要 A 的后果,并且执行完工作 B 须要有返回后果。
thenApply(Function<? super T,? extends U> fn)
的参数为函数式接口,即能够传入一个参数类型为 T,该参数是上一个工作的执行后果,并且函数式接口须要有返回值, 类型为 U。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
.thenApply((res) -> {System.out.println("我能拿到上一个的后果" + res + "并且我要将后果传出去");
return res;
}
).whenComplete((res, ex) -> System.out.println("后果" + res));
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
我能拿到上一个的后果 666 并且我要将后果传出去
后果 666
**/
⑤ 异样解决
exceptionally
异样捕捉,只生产后面工作中呈现的异样信息,具备返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
能够通过链式调用该办法来获取异样信息,并且具备返回值。如果某一个工作出现异常被 exceptionally
捕捉到则残余的工作将不会再执行。相似于 Java 异样解决的 catch。
exceptionally(Function<Throwable, ? extends T> fn)
的参数是函数式接口,具备一个参数以及返回值,该参数为后面工作的异样信息。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) throw new RuntimeException("error");
return 666;
}, executor)
.thenApply((res) -> {System.out.println("不出现异常,后果为" + res);
return res;
}).exceptionally((ex) -> {ex.printStackTrace();
return -1;
});
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
// 这是不出现异常的状况
不出现异常,后果为 666
// 这是出现异常的状况
java.util.concurrent.CompletionException: java.lang.RuntimeException: error
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: error
at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
**/
handle
异样解决,生产后面的后果及异样信息,具备返回值,不会中断后续工作
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
能够通过链式调用该办法能够跟 thenApply()
一样能够生产后面工作的后果并实现本人工作内容,并且具备返回值。不同之处在于出现异常也能够接着往下执行,依据异样参数做进一步解决。
handle(BiFunction<? super T, Throwable, ? extends U> fn)
的参数是消费者接口,一个参数是工作执行后果,一个是异样信息,并且具备返回值。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture.supplyAsync(() -> 666, executor)
.thenApply((res) -> {if (Math.random() < 0.5) throw new RuntimeException("error");
return res;
}).handle((res, ex) -> {System.out.println("后果" + res + "(null 示意之前出现异常导致后果无奈传过来)");
return res == null ? -1 : res;
}).thenApply((res) -> {System.out.println("后果为" + res + "(- 1 示意之前出现异常,通过 handler 使得后果解决成 -1)");
return res;
}).exceptionally((ex) -> {ex.printStackTrace();
return -1;
});
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
// 这是不出现异常的状况
后果 666(null 示意之前出现异常导致后果无奈传过来)
后果为 666(- 1 示意之前出现异常,通过 handler 使得后果解决成 -1)
// 这是出现异常的状况
后果 null(null 示意之前出现异常导致后果无奈传过来)
后果为 -1(- 1 示意之前出现异常,通过 handler 使得后果解决成 -1)
**/
能够看到通过 handle
相似于 Java 异样解决的 finally,出现异常并不会像应用 exceptionally
那样中断后续的工作,而是继续执行,能够通过 handle 为之前出现异常无奈取得的后果从新赋值(依据业务需要设置安全值之类的)。
⑥ 两组工作按程序执行
thenCompose
实现两组工作按前后程序执行
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
复制代码
A.thenCompose(B)
相当于工作 A 要排在工作 B 后面,即程序的执行工作 A、工作 B 。该办法的参数是函数式接口,函数式接口的参数是调用者的执行后果,返回值是另一个工作 B。
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {System.out.println("工作 A 先执行后果为 666");
return 666;
}, executor);
actionA.thenCompose((res) -> CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 后执行后果加上 333");
return 333 + res;
})).whenComplete((res, ex) -> System.out.println(res));
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
工作 A 先执行后果为 666
工作 B 后执行后果加上 333
999
**/
⑦ 两组工作谁快用谁
applyToEither
比拟两组工作执行速度,谁快生产谁的执行后果
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
该办法用于 比拟两组工作的执行速度,谁先执行完就用谁的执行后果。
传入参数阐明:第一个参数传入的是另一个工作的执行内容,第二个参数传入的是最终这两个工作谁快返回谁的后果,并通过以后函数式接口进行接管和解决(应用函数式接口,有参且有返回值)。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 A 期待久一点,执行后果为 555");
return 555;
}, executor);
actionA.applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 很快,执行后果为 666");
return 666;
}), (res) -> {System.out.println("最终应用的执行后果为" + res);
return res;
});
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
工作 B 很快,执行后果为 666
最终应用的执行后果为 666
工作 A 期待久一点,执行后果为 555
**/
除了
applyToEither
对工作最终后果进行获取并生产,并且具备返回值的办法外,还有两个相似的办法。
// 这个办法成果和下面的一样,比谁快拿谁的后果,不同的是这个办法只生产不具备返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
// 这个办法成果和下面的一样,比谁快拿谁的后果,不同的是这个办法不生产后果也不具备返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
⑧ 两组工作实现后合并
thenCombine
期待两组工作执行结束后,合并两组工作的执行后果
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
该办法用于 两组工作都实现后,将两组工作的执行后果一起交给以后办法的 BiFunction 解决。先实现的工作会期待后者工作实现。
传入参数阐明:第一个参数传入的是另一个工作的执行内容,第二个参数传入的是带两个参数的函数式接口(第一个参数是工作 1 的执行后果,第二个参数是工作 2 的执行后果,具备返回值)。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 A 期待久一点,执行后果为 333");
return 333;
}, executor);
CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 很快,执行后果为 666");
return 666;
}, executor);
actionA.thenCombine(actionB, (res1, res2) -> {System.out.println("最终应用的执行后果为" + (res1 + res2));
return res1 + res2;
});
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
工作 B 很快,执行后果为 666
工作 A 期待久一点,执行后果为 333
最终应用的执行后果为 999
**/
除了
thenCombine
对工作最终后果进行获取并生产,并且具备返回值的办法外,还有两个相似的办法。
// 这个办法成果和下面的一样,获取合并后果,不同的是这个办法只生产不具备返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
// 这个办法成果和下面的一样,获取合并后果,不同的是这个办法不生产后果也不具备返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
⑨ 多任务组合
allOf
实现并行地执行多个工作,期待所有工作执行实现(无需思考执行程序)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
该办法能够 实现并行地执行多个工作,实用于多个工作没有依赖关系,能够相互独立执行的,传入参数为多个工作,没有返回值。
allOf()
办法会期待所有的工作执行结束再返回,能够通过 get()
阻塞确保所有工作执行结束
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 A 期待 2 秒后执行结束");
}, executor);
CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> {System.out.println("工作 B 很快执行结束");
}, executor);
CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> {try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 C 期待 1 秒后执行结束");
}, executor);
CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> {try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 D 期待 5 秒后执行结束");
}, executor);
CompletableFuture.allOf(actionA, actionB, actionC, actionD).get();} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
工作 B 很快执行结束
工作 C 期待 1 秒后执行结束
工作 A 期待 2 秒后执行结束
工作 D 期待 5 秒后执行结束
**/
anyOf
实现并行地执行多个工作,只有有个一个实现的便会返回执行后果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
该办法能够 实现并行地执行多个工作 ,传入参数为多个工作,具备返回值。该办法 不会期待所有工作执行实现后再返回后果,而是当有一个工作实现时,便会返回那个工作的执行后果。
// 例子
public static void main(String[] args) {
// 疾速创立线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
try {CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 A 期待 2 秒后执行结束");
return 555;
}, executor);
CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {System.out.println("工作 B 很快执行结束");
return 666;
}, executor);
CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 C 期待 1 秒后执行结束");
return 999;
}, executor);
CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {e.printStackTrace(); }
System.out.println("工作 D 期待 5 秒后执行结束");
return 888;
}, executor);
System.out.println("最先执行完的返回后果为" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get());
} catch (Exception e) {e.printStackTrace();
}finally {executor.shutdown();
}
}
/**
输入后果:
工作 B 很快执行结束
最先执行完的返回后果为 666
工作 C 期待 1 秒后执行结束
工作 A 期待 2 秒后执行结束
工作 D 期待 5 秒后执行结束
**/
一个应用 CompletableFuture 异步编排的例子
不须要关怀例子中的业务内容,应用时依照本人业务的需要,对不同的需要调用不同 API 即可。
编写工作时次要关怀以下几点:① 是否须要生产之前工作的后果 ② 是否须要返回后果给其余工作生产 ③ 是否要求程序执行(是否容许并行,有没有前置要求)
/**
* 该办法用于获取单个商品的所有信息
* 1\. 商品的根本信息
* 2\. 商品的图片信息
* 3\. 商品的销售属性组合
* 4\. 商品的各种分类根本信息
* 5\. 商品的促销信息
*/
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
// 创立商品 Vo 通过各个工作去欠缺 Vo 的信息
SkuItemVo skuItemVo = new SkuItemVo();
// 获取商品根本信息 查问到后设置进 Vo 中,返回根本信息给后续工作生产(应用自定义的线程池进行异步)CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
// 获取商品的图片信息 获取后设置进 Vo 中,此处不须要生产图片信息,也不须要返回后果。所以应用 runAsync 即可
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
// 获取商品销售属性 因为要利用之前查问到的根本信息,但后续工作不须要生产销售属性(不须要返回后果),所以应用 thenAcceptAsync 生产之前的根本信息,不返回销售信息。CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
// 获取商品各分类根本信息,同样要生产之前的根本信息,但无需返回,所以应用 thenAcceptAsync 即可
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
// 获取商品的促销信息 这个也不须要生产之前工作的后果,也不须要返回后果。所以间接应用 runAsync 即可
CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
if (skuSeckilInfo.getCode() == 0) {SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {});
skuItemVo.setSeckillSkuVo(seckilInfoData);
if (seckilInfoData != null) {long currentTime = System.currentTimeMillis();
if (currentTime > seckilInfoData.getEndTime()) {skuItemVo.setSeckillSkuVo(null);
}
}
}
}, executor);
// 应用 allOf()组合所有工作,并且应用 get()阻塞,期待所有工作实现。// 补充:infoFuture 不能放入 allOf 中,因为 allOf 是并行无序执行(须要多个条件是无依赖性的)的,当下面工作中有须要生产 infoFuture 的后果,所以须要先执行 infoFuture。CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get();
// 最初返回商品 Vo
return skuItemVo;
}