CompletableFuture
前言
CompletableFuture继承于java.util.concurrent.Future,它自身具备Future的所有个性,并且基于JDK1.8的流式编程以及Lambda表达式等实现一元操作符、异步性以及事件驱动编程模型,能够用来实现多线程的串行关系,并行关系,聚合关系。它的灵活性和更弱小的性能是Future无法比拟的。
一、创立形式
1. 用默认线程池
CompletableFuture<String> future = new CompletableFuture<>();
默认应用 ForkJoinPool.commonPool(),commonPool是一个会被很多工作 共享 的线程池,比方同一 JVM 上的所有 CompletableFuture、并行 Stream 都将共享 commonPool,commonPool 设计时的指标场景是运行 非阻塞的 CPU 密集型工作,为最大化利用 CPU,其线程数默认为 CPU 数量 - 1。
2. 用自定义线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy());CompletableFuture.runAsync(() -> System.out.println("Hello World!"), pool);复制代码
二、应用示例
1. 构建异步工作
办法
有无返回值
形容
runAsync
无
进行数据处理,接管前一步骤传递的数据,无返回值。
supplyAsync
有
进行数据处理,接管前一步骤传递的数据,解决加工后返回。返回数据类型能够和前一步骤返回的数据类型不同。
(1)runAsync
源码
public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); }复制代码
示例
public static void runAsync() { //应用默认线程池 CompletableFuture cf = CompletableFuture.runAsync(() -> System.out.println("Hello World!")); assertFalse(cf.isDone()); //应用自定义线程池 CompletableFuture.runAsync(() -> System.out.println("Hello World!"), Executors.newSingleThreadExecutor()); }复制代码
(2)supplyAsync
源码
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier);}复制代码
示例
public static void supplyAsync() throws ExecutionException, InterruptedException { CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> { try { //ForkJoinPool.commonPool-worker-1线程 System.out.println(Thread.currentThread().getName()); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); //阻塞期待3秒 String result = f.get(); //main线程 System.out.println(Thread.currentThread().getName()); System.out.println(result);}复制代码
2. 单任务后果生产
办法
有无返回值
形容
thenApply
有
在前一个阶段上利用thenApply函数,将上一阶段实现的后果作为以后阶段的入参
thenAccept
无返回值
生产前一阶段的后果
thenRun
无返回值,并且无入参
当上一阶段实现后,执行本阶段的工作
(1)thenApply
源码
public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn);}复制代码
示例
public static void thenApply() throws ExecutionException, InterruptedException { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { System.out.println(s); return s.toUpperCase(); }).thenApply(s->{ System.out.println(s); return s + ":body"; }); System.out.println(cf.get());}复制代码
then
意味着这个阶段的动作产生以后的阶段失常实现之后。本例中,以后节点实现,返回字符串message
。
Apply
意味着返回的阶段将会对后果前一阶段的后果利用一个函数。
函数的执行会被阻塞
(2)thenAccept
源码
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier);}复制代码
示例
public static void thenAccept() throws InterruptedException { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "message"; }).thenAccept((consumer) -> { System.out.println(consumer); });}复制代码
(3)thenRun
源码
public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action);}复制代码
示例
public static void thenRun() throws InterruptedException { CompletableFuture.supplyAsync(() -> { //执行异步工作 System.out.println("执行工作"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "success"; }).thenRun(() -> { // Computation Finished. System.out.println("上一阶段工作执行实现"); }); Thread.sleep(2000);}复制代码
3. 合并后果生产
办法
有无返回值
形容
thenCombine
有
合并另外一个工作,两个工作都实现后,执行BiFunction,入参为两个工作后果,返回新后果
thenAcceptBoth
无
合并另外一个工作,两个工作都实现后,执行这个办法期待第一个阶段的实现(大写转换), 它的后果传给一个指定的返回CompletableFuture函数,它的后果就是返回的CompletableFuture的后果,入参为两个工作后果,不返回新后果
runAfterBoth
无返回值无入参
合并另外一个工作,两个工作都实现后,执行Runnable,留神,这里的两个工作是同时执行
(1)thenCombine
如果CompletableFuture依赖两个后面阶段的后果, 它复合两个阶段的后果再返回一个后果,咱们就能够应用thenCombine()
函数。整个流水线是同步的。
源码
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn);}复制代码
示例
public static void thenCombine() { CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { System.out.println("processing a..."); return "hello"; }); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> { System.out.println("processing b..."); return " world"; }); CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> { System.out.println("processing c..."); return ", I'm CodingTao!"; }); cfA.thenCombine(cfB, (resultA, resultB) -> { System.out.println(resultA + resultB); // hello world return resultA + resultB; }).thenCombine(cfC, (resultAB, resultC) -> { System.out.println(resultAB + resultC); // hello world, I'm CodingTao! return resultAB + resultC; });}复制代码
(2)thenAcceptBoth
源码
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(null, other, action);}复制代码
示例
private static void thenAcceptBoth() { CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB"); cfA.thenAcceptBoth(cfB, (resultA, resultB) -> { //resultA,resultB System.out.println(resultA+","+resultB); });}复制代码
(3)runAfterBoth
源码
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) { return biRunStage(null, other, action);}复制代码
示例
private static void runAfterBoth() { CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("process a"); return "resultA"; }); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("process b"); return "resultB"; }); cfB.runAfterBoth(cfA, () -> { //resultA,resultB System.out.println("工作A和工作B同时实现"); }); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); }}复制代码
4. 任一后果生产
办法
有无返回值
形容
applyToEither
有
其中任一工作实现后,执行Function,后果转换,入参为已实现的工作后果。返回新后果,要求两个工作后果为同一类型
acceptEither
无
其中任一工作实现后,执行Consumer,生产后果,入参为已实现的工作后果。不返回新后果,要求两个工作后果为同一类型
runAfterEither
无返回值无入参
其中任一工作实现后,执行Runnable,生产后果,无入参。不返回新后果,不要求两个工作后果为同一类型
场景
假如查问商品a,有两种形式,A和B,然而A和B的执行速度不一样,心愿哪个先返回就用那个的返回值。
(1)applyToEither
源码
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null, other, fn);}复制代码
示例
private static void applyToEither() throws ExecutionException, InterruptedException { CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "通过形式A获取商品a"; }); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "通过形式B获取商品a"; }); CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "后果:" + product); //后果:通过形式A获取商品a System.out.println(futureC.get());}复制代码
(2)acceptEither
(3)runAfterEither
5. 级联工作
办法
有无返回值
形容
thenCompose
有
当原工作实现后,以其后果为参数,返回一个新的工作(而不是新后果,相似flatMap)
(1)thenCompose
这个办法期待第一个阶段的实现(大写转换), 它的后果传给一个指定的返回CompletableFuture函数,它的后果就是返回的CompletableFuture的后果。
源码
public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn);}复制代码
示例
private static void thenCompose() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) .thenApply(s -> upper + s)); // MESSAGEmessage System.out.println(cf.join());}复制代码
6. 单任务后果或异样生产
办法
有无返回值
形容
handle
有
工作实现后执行BiFunction,后果转换,入参为后果或者异样,返回新后果
whenComplete
无
工作实现后执行BiConsumer,后果生产,入参为后果或者异样,不返回新后果
exceptionally
无
工作异样,则执行Function,异样转换,入参为原工作的异样信息,若原工作无异样,则返回原工作后果,即不执行转换
异样流程
CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");复制代码
下面的代码中,工作 A、B、C、D 顺次执行,如果工作 A 抛出异样(当然下面的代码不会抛出异样),那么前面的工作都得不到执行。如果工作 C 抛出异样,那么工作 D 得不到执行。
那么咱们怎么解决异样呢?看上面的代码,咱们在工作 A 中抛出异样,并对其进行解决:
(1)handle
示例
private static void handle() { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") // 工作 C 抛出异样 .thenApply(resultB -> {throw new RuntimeException();}) // 解决工作 C 的返回值或异样 .handle(new BiFunction<Object, Throwable, Object>() { @Override public Object apply(Object re, Throwable throwable) { if (throwable != null) { return "errorResultC"; } return re; } }) .thenApply(resultC -> { System.out.println("resultC:" + resultC); return resultC + " resultD"; }); System.out.println(future.join());}复制代码
(2)whenComplete
示例
private static void whenComplete() throws ExecutionException, InterruptedException { // 创立异步执行工作: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } }); //cf执行实现后会将执行后果和执行过程中抛出的异样传入回调办法 // 如果是失常执行,a=1.2,b则传入的异样为null //如果异样执行,a=null,b则传入异样信息 CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); }); //期待子工作执行实现 System.out.println("main thread start wait,time->"+System.currentTimeMillis()); //如果cf是失常执行的,cf2.get的后果就是cf执行的后果 //如果cf是执行异样,则cf2.get会抛出异样 System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis());}复制代码
(3)exceptionally
7. 合并多个complete为一个
办法
形容
allOf
合并多个complete为一个,期待全副实现
anyOf
合并多个complete为一个,期待其中之一实现
(1)allOf
咱们在解决业务时,有时会有多任务异步解决,同步返回后果的状况
- 采纳多线程执异步行某种工作,比方在不同主机查问磁盘列表信息。
- 将执行后果收集,分组分类,解决。
- 将解决当前的后果给予展现。
示例
// 创立异步执行工作: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; }); CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(1500); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return 3.2; }); CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(1300); } catch (InterruptedException e) { }// throw new RuntimeException("test"); System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); return 2.2; }); //allof期待所有工作执行实现才执行cf4,如果有一个工作异样终止,则cf4.get时会抛出异样,都是失常执行,cf4.get返回null //anyOf是只有一个工作执行实现,无论是失常执行或者执行异样,都会执行cf4,cf4.get的后果就是已执行实现的工作的执行后果 CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{ if(b!=null){ System.out.println("error stack trace->"); b.printStackTrace(); }else{ System.out.println("run succ,result->"+a); } }); System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis()); //期待子工作执行实现 System.out.println("cf4 run result->"+cf4.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis());复制代码
获取返回值办法
public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) { CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()])); return allFuturesResult.thenApply(v -> futuresList.stream(). map(future -> future.join()). collect(Collectors.<T>toList()) );}复制代码
(2)anyOf
CompletableFuture.anyOf()和其名字介绍的一样,当任何一个CompletableFuture实现的时候【雷同的后果类型】,返回一个新的CompletableFuture。
示例
private static void anyOf() throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 2"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 3"; }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); System.out.println(anyOfFuture.get()); // Result of Future 2 }复制代码
三、其余相干api
1. future接口
(1)isDone()
判断工作是否实现。三种实现状况:normally(失常执行结束)、exceptionally(执行异样)、via cancellation(勾销)
(2)get()
阻塞获取后果或抛出受检测异样,须要显示进行try...catch解决。
(3)get(long timeout,TimeUnit unit)
超时阻塞获取后果
(4)cancel(boolean mayInterruptIfRunning)
勾销工作,若一个工作未实现,则以CancellationException异样。其相干未实现的子工作也会以CompletionException完结
(5)isCancelled()
是否已勾销,在工作失常执行实现前勾销,才为true。否则为false。
2. CompletableFuture接口
(1)join
阻塞获取后果或抛出非受检异样。
(2)getNow(T valueIfAbsent)
若当前任务无后果,则返回valueIfAbsent,否则返回已实现工作的后果。
(3)complete(T value)
设置工作后果,工作失常完结,之后的工作状态为已实现。
(4)completeExceptionally(Throwable ex)
设置工作异样后果,工作异样完结,之后的工作状态为已实现。
(5)isCompletedExceptionally()
判断工作是否异样完结。异样可能的起因有:勾销、显示设置工作异样后果、工作动作执行异样等。
(6)getNumberOfDependents()
返回依赖当前任务的工作数量,次要用于监控。
(7)orTimeout(long timeout,TimeUnit unit) jdk9
设置工作实现超时工夫,若在指定工夫内未失常实现,则工作会以异样(TimeoutException)完结。
(8)completeOnTimeout(T value,long timeout,TimeUnit unit) jdk9
设置工作实现超时工夫,若在指定工夫内未失常实现,则以给定的value为工作后果
四、实战
1. API网关做接口的聚合
//这两个参数从内部取得Long userId = 10006L;String orderId = "XXXXXXXXXXXXXXXXXXXXXX";//从用户服务获取用户信息UserInfo userInfo = userService.getUserInfo(userId);//从用订单务获取订单信息OrderInfo orderInfo = orderService.getOrderInfo(orderId);//返回两者的聚合DTOreturn new OrderDetailDTO(userInfo,orderInfo);复制代码
上面三个内部接口的信息肯定是不相关联的,也就是能够并行获取,三个接口的后果都获取结束之后做一次数据聚合到DTO即可,也就是聚合的耗时大抵是这三个接口中耗时最长的接口的响应工夫
@Servicepublic class OrderDetailService { /** * 建设一个线程池专门交给CompletableFuture应用 */ private final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); @Autowired private UserService userService; @Autowired private OrderService orderService; public OrderDetailDTO getOrderDetail(Long userId, String orderId) throws Exception { CompletableFuture<UserInfo> userInfoCompletableFuture = CompletableFuture.supplyAsync(() -> userService.getUserInfo(userId), executor); CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(orderId), executor); CompletableFuture<OrderDetailDTO> result = userInfoCompletableFuture.thenCombineAsync(orderInfoCompletableFuture, OrderDetailDTO::new, executor); return result.get(); }}复制代码
五、区别
(1)whenComplete和handle区别
whenComplete
与 handle
办法就相似于 try..catch..finanlly
中 finally
代码块。无论是否产生异样,都将会执行的。这两个办法区别在于 handle
反对返回后果。
(2)thenApply与thenCompose的异同
对于thenApply,fn函数是一个对一个已实现的stage或者说CompletableFuture的的返回值进行计算、操作;
对于thenCompose,fn函数是对另一个CompletableFuture进行计算、操作。
(3)有无Async的区别
没有Async的在CompleteableFuture调用它的线程定义的线程上运行,因而通常不晓得在哪里执行该线程。如果后果曾经可用,它可能会立刻执行。
有Async的无论环境如何,都在环境定义的执行程序上运行。为此CompletableFuture通常ForkJoinPool.commonPool()。
参考:《2020最新Java根底精讲视频教程和学习路线!》
链接:https://juejin.cn/post/694387...