共计 4604 个字符,预计需要花费 12 分钟才能阅读完成。
CompletableFuture 有什么用
- CompletableFuture 是用来描述多线程任务的时序关系的:串行关系,并行关系,聚合关系。
- CompletableFuture 是 Java 8 新增加的 Api, 该类实现,Future 和 CompletionStage 两个接口,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
创建 CompletableFuture 对象
方式一:使用默认线程池
/**
* 创建一个不带返回值得任务。*/
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {// 业务逻辑}
});
/**
* 创建一个带返回值的任务。*/
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 业务逻辑
return null;
}
});
方式二:使用自定义线程池(建议使用)
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 创建一个不带返回值得任务。CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {}
},executor);
// 创建一个带返回值的任务。CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 业务逻辑
return null;
}
},executor);
- 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的 核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I / O 操 作,就会导致线程池中所有线程都阻塞在 I / O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要 根据不同的业务类型创建不同的线程池,以避免互相干扰。
- 创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run()方法或者 supplier.get()方法。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口。
常用 API
- public T get():获取计算结果,该方法为阻塞方法会等待计算结果完成。
- public T get(long timeout,TimeUnit unit):有时间限制的阻塞方法
- public T getNow(T valueIfAbsent)立即获取方法结果,如果没有计算结束则返回传的值
- public T join()和 get() 方法类似也是主动阻塞线程,等待计算结果。和 get() 方法有细微的差别。
- public boolean complete(T value):立即完成计算,并把结果设置为传的值,返回是否设置成功
- public boolean completeExceptionally(Throwable ex):立即完成计算,并抛出异常
理解 CompletionStage 接口
CompletionStage 接口可以清晰地描述任务之间的这种时序关系,时序关系:串行,并行,汇聚。
串行
线程与线程之间的执行顺序是串行的。
//Async 代表的是异步执行 fn
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
-
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四 个系列的接口。
- thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的 是 CompletionStage<R>。
-
而 thenAccept 系列方法里参数 consumer 的类型是接口 Consumer<T>,这个接口里与 CompletionStage 相关 的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回 的是 CompletionStage<Void>。
- thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是 CompletionStage<Void>。
- 这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列 方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。
演示串行
//supplyAsync()启动一个异步 流程
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> "Hello World") //①
.thenApply(s -> s + "girl") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
// 输出结果 HELLO WORLD girl
虽然这是一个异步流程,但任务①②③却是 串行执行的,②依赖①的执行结果,③依赖②的执行结果。
AND 汇聚
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer); CompletionStage<Void> thenAcceptBothAsync(other, consumer); CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
演示:
// 启动一个异步流程 f1
CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{System.out.println("异步任务 -1");
});
// 启动一个异步流程 f2
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
String s = "异步任务 -2";
System.out.println(s);
return s;
});
// 启动一个汇聚流程 f3
CompletableFuture<String> f3 = f1.thenCombine(f2,(a,tf)->{return tf;//tf 是 f2 的值});
// 等待任务 3 执行结果
System.out.println(f3.join());
OR 汇聚
CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的 接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
功能演示:
// 启动一个异步流程 f1
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{int t = new Random().nextInt(10);
System.out.println("f1-t ="+t);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
// 启动一个异步流程 f2
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{int t = new Random().nextInt(10);
System.out.println("f2-t ="+t);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
// 将 f1 和 f2 的值汇总到 f3
CompletableFuture<String> f3 = f1.applyToEither(f2,s ->
Integer.parseInt(f2.join())+Integer.parseInt(s)+""
);
System.out.println(f3.join());
码字不易如果对你有帮助请给个关注
爱技术爱生活 QQ 群: 894109590
正文完
发表至: java
2019-09-24