共计 14790 个字符,预计需要花费 37 分钟才能阅读完成。
CompletableFuture
前言
CompletableFuture 继承于 java.util.concurrent.Future,它自身具备 Future 的所有个性,并且基于 JDK1.8 的流式编程以及 Lambda 表达式等实现一元操作符、异步性以及事件驱动编程模型,能够用来实现多线程的串行关系,并行关系,聚合关系。它的灵活性和更弱小的性能是 Future 无法比拟的。
一、创立形式
1. 用默认线程池
CompletableFuture<String> future = new CompletableFuture<>();
默认应用 ForkJoinPool.commonPool(),commonPool 是一个会被很多工作 共享 的线程池,比方同一 JVM 上的所有 CompletableFuture、并行 Stream 都将共享 commonPool,commonPool 设计时的指标场景是运行 非阻塞的 CPU 密集型工作,为最大化利用 CPU,其线程数默认为 CPU 数量 – 1。
2. 用自定义线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
CompletableFuture.runAsync(() -> System.out.println("Hello World!"), pool);
复制代码
二、应用示例
1. 构建异步工作
办法
有无返回值
形容
runAsync
无
进行数据处理,接管前一步骤传递的数据,无返回值。
supplyAsync
有
进行数据处理,接管前一步骤传递的数据,解决加工后返回。返回数据类型能够和前一步骤返回的数据类型不同。
(1)runAsync
源码
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}
复制代码
示例
public static void runAsync() {
// 应用默认线程池
CompletableFuture cf = CompletableFuture.runAsync(() -> System.out.println("Hello World!"));
assertFalse(cf.isDone());
// 应用自定义线程池
CompletableFuture.runAsync(() -> System.out.println("Hello World!"), Executors.newSingleThreadExecutor());
}
复制代码
(2)supplyAsync
源码
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
复制代码
示例
public static void supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
try {
//ForkJoinPool.commonPool-worker- 1 线程
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {e.printStackTrace();
}
return "hello";
});
// 阻塞期待 3 秒
String result = f.get();
//main 线程
System.out.println(Thread.currentThread().getName());
System.out.println(result);
}
复制代码
2. 单任务后果生产
办法
有无返回值
形容
thenApply
有
在前一个阶段上利用 thenApply 函数,将上一阶段实现的后果作为以后阶段的入参
thenAccept
无返回值
生产前一阶段的后果
thenRun
无返回值,并且无入参
当上一阶段实现后,执行本阶段的工作
(1)thenApply
源码
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(asyncPool, fn);
}
复制代码
示例
public static void thenApply() throws ExecutionException, InterruptedException {CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {System.out.println(s);
return s.toUpperCase();}).thenApply(s->{System.out.println(s);
return s + ":body";
});
System.out.println(cf.get());
}
复制代码
then
意味着这个阶段的动作产生以后的阶段失常实现之后。本例中,以后节点实现,返回字符串message
。
Apply
意味着返回的阶段将会对后果前一阶段的后果利用一个函数。
函数的执行会被 阻塞
(2)thenAccept
源码
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
复制代码
示例
public static void thenAccept() throws InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "message";}).thenAccept((consumer) -> {System.out.println(consumer);
});
}
复制代码
(3)thenRun
源码
public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);
}
复制代码
示例
public static void thenRun() throws InterruptedException {CompletableFuture.supplyAsync(() -> {
// 执行异步工作
System.out.println("执行工作");
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
return "success";
}).thenRun(() -> {
// Computation Finished.
System.out.println("上一阶段工作执行实现");
});
Thread.sleep(2000);
}
复制代码
3. 合并后果生产
办法
有无返回值
形容
thenCombine
有
合并另外一个工作,两个工作都实现后,执行 BiFunction,入参为两个工作后果,返回新后果
thenAcceptBoth
无
合并另外一个工作,两个工作都实现后,执行这个办法期待第一个阶段的实现(大写转换),它的后果传给一个指定的返回 CompletableFuture 函数,它的后果就是返回的 CompletableFuture 的后果,入参为两个工作后果,不返回新后果
runAfterBoth
无返回值无入参
合并另外一个工作,两个工作都实现后,执行 Runnable,留神,这里的两个工作是同时执行
(1)thenCombine
如果 CompletableFuture 依赖两个后面阶段的后果,它复合两个阶段的后果再返回一个后果,咱们就能够应用 thenCombine()
函数。整个流水线是同步的。
源码
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);
}
复制代码
示例
public static void thenCombine() {CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> {System.out.println("processing a...");
return "hello";
});
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> {System.out.println("processing b...");
return "world";
});
CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> {System.out.println("processing c...");
return ", I'm CodingTao!";
});
cfA.thenCombine(cfB, (resultA, resultB) -> {System.out.println(resultA + resultB); // hello world
return resultA + resultB;
}).thenCombine(cfC, (resultAB, resultC) -> {System.out.println(resultAB + resultC); // hello world, I'm CodingTao!
return resultAB + resultC;
});
}
复制代码
(2)thenAcceptBoth
源码
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);
}
复制代码
示例
private static void thenAcceptBoth() {CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {
//resultA,resultB
System.out.println(resultA+","+resultB);
});
}
复制代码
(3)runAfterBoth
源码
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {return biRunStage(null, other, action);
}
复制代码
示例
private static void runAfterBoth() {CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(5000);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("process a");
return "resultA";
});
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(5000);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("process b");
return "resultB";
});
cfB.runAfterBoth(cfA, () -> {
//resultA,resultB
System.out.println("工作 A 和工作 B 同时实现");
});
try {Thread.sleep(6000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
复制代码
4. 任一后果生产
办法
有无返回值
形容
applyToEither
有
其中任一工作实现后,执行 Function,后果转换,入参为已实现的工作后果。返回新后果,要求两个工作后果为同一类型
acceptEither
无
其中任一工作实现后,执行 Consumer,生产后果,入参为已实现的工作后果。不返回新后果,要求两个工作后果为同一类型
runAfterEither
无返回值无入参
其中任一工作实现后,执行 Runnable,生产后果,无入参。不返回新后果,不要求两个工作后果为同一类型
场景
假如查问商品 a,有两种形式,A 和 B,然而 A 和 B 的执行速度不一样,心愿哪个先返回就用那个的返回值。
(1)applyToEither
源码
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(null, other, fn);
}
复制代码
示例
private static void applyToEither() throws ExecutionException, InterruptedException {CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
return "通过形式 A 获取商品 a";
});
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
}
return "通过形式 B 获取商品 a";
});
CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "后果:" + product);
// 后果: 通过形式 A 获取商品 a
System.out.println(futureC.get());
}
复制代码
(2)acceptEither
(3)runAfterEither
5. 级联工作
办法
有无返回值
形容
thenCompose
有
当原工作实现后,以其后果为参数,返回一个新的工作(而不是新后果,相似 flatMap)
(1)thenCompose
这个办法期待第一个阶段的实现(大写转换),它的后果传给一个指定的返回 CompletableFuture 函数,它的后果就是返回的 CompletableFuture 的后果。
源码
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);
}
复制代码
示例
private static void thenCompose() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
.thenApply(s -> upper + s));
// MESSAGEmessage
System.out.println(cf.join());
}
复制代码
6. 单任务后果或异样生产
办法
有无返回值
形容
handle
有
工作实现后执行 BiFunction,后果转换,入参为后果或者异样,返回新后果
whenComplete
无
工作实现后执行 BiConsumer,后果生产,入参为后果或者异样,不返回新后果
exceptionally
无
工作异样,则执行 Function,异样转换,入参为原工作的异样信息,若原工作无异样,则返回原工作后果,即不执行转换
异样流程
CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + "resultB")
.thenApply(resultB -> resultB + "resultC")
.thenApply(resultC -> resultC + "resultD");
复制代码
下面的代码中,工作 A、B、C、D 顺次执行,如果工作 A 抛出异样(当然下面的代码不会抛出异样),那么前面的工作都得不到执行。如果工作 C 抛出异样,那么工作 D 得不到执行。
那么咱们怎么解决异样呢?看上面的代码,咱们在工作 A 中抛出异样,并对其进行解决:
(1)handle
示例
private static void handle() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + "resultB")
// 工作 C 抛出异样
.thenApply(resultB -> {throw new RuntimeException();})
// 解决工作 C 的返回值或异样
.handle(new BiFunction<Object, Throwable, Object>() {
@Override
public Object apply(Object re, Throwable throwable) {if (throwable != null) {return "errorResultC";}
return re;
}
})
.thenApply(resultC -> {System.out.println("resultC:" + resultC);
return resultC + "resultD";
});
System.out.println(future.join());
}
复制代码
(2)whenComplete
示例
private static void whenComplete() throws ExecutionException, InterruptedException {
// 创立异步执行工作:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
try {Thread.sleep(2000);
} catch (InterruptedException e) { }
if(true){throw new RuntimeException("test");
}else{System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
return 1.2;
}
});
//cf 执行实现后会将执行后果和执行过程中抛出的异样传入回调办法
// 如果是失常执行,a=1.2,b 则传入的异样为 null
// 如果异样执行,a=null,b 则传入异样信息
CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
try {Thread.sleep(2000);
} catch (InterruptedException e) { }
if(b!=null){System.out.println("error stack trace->");
b.printStackTrace();}else{System.out.println("run succ,result->"+a);
}
System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
});
// 期待子工作执行实现
System.out.println("main thread start wait,time->"+System.currentTimeMillis());
// 如果 cf 是失常执行的,cf2.get 的后果就是 cf 执行的后果
// 如果 cf 是执行异样,则 cf2.get 会抛出异样
System.out.println("run result->"+cf2.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
复制代码
(3)exceptionally
7. 合并多个 complete 为一个
办法
形容
allOf
合并多个 complete 为一个,期待全副实现
anyOf
合并多个 complete 为一个,期待其中之一实现
(1)allOf
咱们在解决业务时,有时会有多任务异步解决,同步返回后果的状况
- 采纳多线程执异步行某种工作,比方在不同主机查问磁盘列表信息。
- 将执行后果收集,分组分类,解决。
- 将解决当前的后果给予展现。
示例
// 创立异步执行工作:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"start job1,time->"+System.currentTimeMillis());
try {Thread.sleep(2000);
} catch (InterruptedException e) { }
System.out.println(Thread.currentThread()+"exit job1,time->"+System.currentTimeMillis());
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"start job2,time->"+System.currentTimeMillis());
try {Thread.sleep(1500);
} catch (InterruptedException e) { }
System.out.println(Thread.currentThread()+"exit job2,time->"+System.currentTimeMillis());
return 3.2;
});
CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+"start job3,time->"+System.currentTimeMillis());
try {Thread.sleep(1300);
} catch (InterruptedException e) { }
// throw new RuntimeException("test");
System.out.println(Thread.currentThread()+"exit job3,time->"+System.currentTimeMillis());
return 2.2;
});
//allof 期待所有工作执行实现才执行 cf4,如果有一个工作异样终止,则 cf4.get 时会抛出异样,都是失常执行,cf4.get 返回 null
//anyOf 是只有一个工作执行实现,无论是失常执行或者执行异样,都会执行 cf4,cf4.get 的后果就是已执行实现的工作的执行后果
CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{if(b!=null){System.out.println("error stack trace->");
b.printStackTrace();}else{System.out.println("run succ,result->"+a);
}
});
System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis());
// 期待子工作执行实现
System.out.println("cf4 run result->"+cf4.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
复制代码
获取返回值办法
public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
CompletableFuture<Void> allFuturesResult =
CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
return allFuturesResult.thenApply(v ->
futuresList.stream().
map(future -> future.join()).
collect(Collectors.<T>toList())
);
}
复制代码
(2)anyOf
CompletableFuture.anyOf()和其名字介绍的一样,当任何一个 CompletableFuture 实现的时候【雷同的后果类型】,返回一个新的 CompletableFuture。
示例
private static void anyOf() throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
return "Result of Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
return "Result of Future 2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
return "Result of Future 3";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
System.out.println(anyOfFuture.get()); // Result of Future 2
}
复制代码
三、其余相干 api
1. future 接口
(1)isDone()
判断工作是否实现。三种实现状况:normally(失常执行结束)、exceptionally(执行异样)、via cancellation(勾销)
(2)get()
阻塞获取后果或抛出受检测异样,须要显示进行 try…catch 解决。
(3)get(long timeout,TimeUnit unit)
超时阻塞获取后果
(4)cancel(boolean mayInterruptIfRunning)
勾销工作,若一个工作未实现,则以 CancellationException 异样。其相干未实现的子工作也会以 CompletionException 完结
(5)isCancelled()
是否已勾销,在工作失常执行实现前勾销,才为 true。否则为 false。
2. CompletableFuture 接口
(1)join
阻塞获取后果或抛出非受检异样。
(2)getNow(T valueIfAbsent)
若当前任务无后果,则返回 valueIfAbsent,否则返回已实现工作的后果。
(3)complete(T value)
设置工作后果,工作失常完结,之后的工作状态为已实现。
(4)completeExceptionally(Throwable ex)
设置工作异样后果,工作异样完结,之后的工作状态为已实现。
(5)isCompletedExceptionally()
判断工作是否异样完结。异样可能的起因有:勾销、显示设置工作异样后果、工作动作执行异样等。
(6)getNumberOfDependents()
返回依赖当前任务的工作数量,次要用于监控。
(7)orTimeout(long timeout,TimeUnit unit) jdk9
设置工作实现超时工夫,若在指定工夫内未失常实现,则工作会以异样 (TimeoutException) 完结。
(8)completeOnTimeout(T value,long timeout,TimeUnit unit) jdk9
设置工作实现超时工夫,若在指定工夫内未失常实现,则以给定的 value 为工作后果
四、实战
1. API 网关做接口的聚合
// 这两个参数从内部取得
Long userId = 10006L;
String orderId = "XXXXXXXXXXXXXXXXXXXXXX";
// 从用户服务获取用户信息
UserInfo userInfo = userService.getUserInfo(userId);
// 从用订单务获取订单信息
OrderInfo orderInfo = orderService.getOrderInfo(orderId);
// 返回两者的聚合 DTO
return new OrderDetailDTO(userInfo,orderInfo);
复制代码
上面三个内部接口的信息肯定是不相关联的,也就是能够并行获取,三个接口的后果都获取结束之后做一次数据聚合到 DTO 即可,也就是聚合的耗时大抵是这三个接口中耗时最长的接口的响应工夫
@Service
public class OrderDetailService {
/**
* 建设一个线程池专门交给 CompletableFuture 应用
*/
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
@Autowired
private UserService userService;
@Autowired
private OrderService orderService;
public OrderDetailDTO getOrderDetail(Long userId, String orderId) throws Exception {CompletableFuture<UserInfo> userInfoCompletableFuture = CompletableFuture.supplyAsync(() -> userService.getUserInfo(userId), executor);
CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(orderId), executor);
CompletableFuture<OrderDetailDTO> result
= userInfoCompletableFuture.thenCombineAsync(orderInfoCompletableFuture, OrderDetailDTO::new, executor);
return result.get();}
}
复制代码
五、区别
(1)whenComplete 和 handle 区别
whenComplete
与 handle
办法就相似于 try..catch..finanlly
中 finally
代码块。无论是否产生异样,都将会执行的。这两个办法区别在于 handle
反对返回后果。
(2)thenApply 与 thenCompose 的异同
对于 thenApply,fn 函数是一个对一个已实现的 stage 或者说 CompletableFuture 的的返回值进行计算、操作;
对于 thenCompose,fn 函数是对另一个 CompletableFuture 进行计算、操作。
(3)有无 Async 的区别
没有 Async 的在 CompleteableFuture 调用它的线程定义的线程上运行,因而通常不晓得在哪里执行该线程。如果后果曾经可用,它可能会立刻执行。
有 Async 的无论环境如何,都在环境定义的执行程序上运行。为此 CompletableFuture 通常 ForkJoinPool.commonPool()。
参考:《2020 最新 Java 根底精讲视频教程和学习路线!》
链接:https://juejin.cn/post/694387…