共计 12281 个字符,预计需要花费 31 分钟才能阅读完成。
一个示例回顾 Future
一些业务场景咱们须要应用多线程异步执行工作,放慢工作执行速度。
JDK5 新增了 Future
接口,用于形容一个异步计算的后果。
尽管 Future 以及相干应用办法提供了异步执行工作的能力,然而对于后果的获取却是很不不便,咱们必须应用 Future.get()
的形式阻塞调用线程,或者应用轮询形式判断 Future.isDone
工作是否完结,再获取后果。
这两种解决形式都不是很优雅,相干代码如下:
@Test
public void testFuture() throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> {Thread.sleep(2000);
return "hello";
});
System.out.println(future.get());
System.out.println("end");
}
与此同时,Future 无奈解决多个异步工作须要相互依赖的场景,简略点说就是,主线程须要期待子线程工作执行结束之后在进行执行,这个时候你可能想到了「CountDownLatch」,没错的确能够解决,代码如下。
这里定义两个 Future,第一个通过用户 id 获取用户信息,第二个通过商品 id 获取商品信息。
@Test
public void testCountDownLatch() throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch downLatch = new CountDownLatch(2);
long startTime = System.currentTimeMillis();
Future<String> userFuture = executorService.submit(() -> {
// 模仿查问商品耗时 500 毫秒
Thread.sleep(500);
downLatch.countDown();
return "用户 A";
});
Future<String> goodsFuture = executorService.submit(() -> {
// 模仿查问商品耗时 500 毫秒
Thread.sleep(400);
downLatch.countDown();
return "商品 A";
});
downLatch.await();
// 模仿主程序耗时工夫
Thread.sleep(600);
System.out.println("获取用户信息:" + userFuture.get());
System.out.println("获取商品信息:" + goodsFuture.get());
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
「运行后果」
获取用户信息: 用户 A
获取商品信息: 商品 A
总共用时 1110ms
从运行后果能够看出后果都曾经获取,而且如果咱们不必异步操作,执行工夫应该是:500+400+600 = 1500, 用异步操作后理论只用 1110。
然而 Java8 当前我不在认为这是一种优雅的解决形式,接下来来理解下 CompletableFuture 的应用。
通过 CompletableFuture 实现下面示例
@Test
public void testCompletableInfo() throws InterruptedException, ExecutionException {long startTime = System.currentTimeMillis();
// 调用用户服务获取用户根本信息
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
// 模仿查问商品耗时 500 毫秒
{
try {Thread.sleep(500);
} catch (InterruptedException e) {e.printStackTrace();
}
return "用户 A";
});
// 调用商品服务获取商品根本信息
CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
// 模仿查问商品耗时 500 毫秒
{
try {Thread.sleep(400);
} catch (InterruptedException e) {e.printStackTrace();
}
return "商品 A";
});
System.out.println("获取用户信息:" + userFuture.get());
System.out.println("获取商品信息:" + goodsFuture.get());
// 模仿主程序耗时工夫
Thread.sleep(600);
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
运行后果
获取用户信息: 用户 A
获取商品信息: 商品 A
总共用时 1112ms
通过 CompletableFuture 能够很轻松的实现 CountDownLatch 的性能,你认为这就完结了,远远不止,CompletableFuture 比这要强多了。
比方能够实现: 工作 1 执行完了再执行工作 2, 甚至工作 1 执行的后果,作为工作 2 的入参数等等弱小性能,上面就来学学 CompletableFuture 的 API。
CompletableFuture 创立形式
1、罕用的 4 种创立形式
CompletableFuture 源码中有四个静态方法用来执行异步工作
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
个别咱们用下面的静态方法来创立 CompletableFuture,这里也解释下他们的区别:
- 「supplyAsync」执行工作,反对返回值。
- 「runAsync」执行工作,没有返回值。
「supplyAsync 办法」
// 应用默认内置线程池 ForkJoinPool.commonPool(),依据 supplier 构建执行工作
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 自定义线程,依据 supplier 构建执行工作
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
「runAsync 办法」
// 应用默认内置线程池 ForkJoinPool.commonPool(),依据 runnable 构建执行工作
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 自定义线程,依据 runnable 构建执行工作
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
2、后果获取的 4 种形式
对于后果的获取 CompltableFuture 类提供了四种形式
// 形式一
public T get()
// 形式二
public T get(long timeout, TimeUnit unit)
// 形式三
public T getNow(T valueIfAbsent)
// 形式四
public T join()
阐明
:
- 「get()和 get(long timeout, TimeUnit unit)」 => 在 Future 中就曾经提供了,后者提供超时解决,如果在指定工夫内未获取后果将抛出超时异样
- 「getNow」 => 立刻获取后果不阻塞,后果计算已实现将返回后果或计算过程中的异样,如果未计算实现将返回设定的 valueIfAbsent 值
- 「join」 => 办法里不会抛出异样
示例
:
@Test
public void testCompletableGet() throws InterruptedException, ExecutionException {CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
return "商品 A";
});
// getNow 办法测试
System.out.println(cp1.getNow("商品 B"));
//join 办法测试
CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));
System.out.println(cp2.join());
System.out.println("-----------------------------------------------------");
//get 办法测试
CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));
System.out.println(cp3.get());
}
「运行后果」:
- 第一个执行后果为 「商品 B」,因为要先睡上 1 秒后果不能立刻获取
- join 办法获取后果办法里不会抛异样,然而执行后果会抛异样,抛出的异样为 CompletionException
- get 办法获取后果办法里将抛出异样,执行后果抛出的异样为 ExecutionException
异步回调办法
1、thenRun/thenRunAsync
艰深点讲就是,「做完第一个工作后,再做第二个工作, 第二个工作也没有返回值」。
示例
@Test
public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {long startTime = System.currentTimeMillis();
CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {
try {
// 执行工作 A
Thread.sleep(600);
} catch (InterruptedException e) {e.printStackTrace();
}
});
CompletableFuture<Void> cp2 = cp1.thenRun(() -> {
try {
// 执行工作 B
Thread.sleep(400);
} catch (InterruptedException e) {e.printStackTrace();
}
});
// get 办法测试
System.out.println(cp2.get());
// 模仿主程序耗时工夫
Thread.sleep(600);
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
// 运行后果
/**
* null
* 总共用时 1610ms
*/
「thenRun 和 thenRunAsync 有什么区别呢?」
如果你执行第一个工作的时候,传入了一个自定义线程池:
- 调用 thenRun 办法执行第二个工作时,则第二个工作和第一个工作是共用同一个线程池。
- 调用 thenRunAsync 执行第二个工作时,则第一个工作应用的是你本人传入的线程池,第二个工作应用的是 ForkJoin 线程池。
阐明
: 前面介绍的 thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是这个。
2、thenAccept/thenAcceptAsync
第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,然而回调办法是没有返回值的。
示例
@Test
public void testCompletableThenAccept() throws ExecutionException, InterruptedException {long startTime = System.currentTimeMillis();
CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {return "dev";});
CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> {System.out.println("上一个工作的返回后果为:" + a);
});
cp2.get();}
3、thenApply/thenApplyAsync
示意第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,并且回调办法是有返回值的。
示例
@Test
public void testCompletableThenApply() throws ExecutionException, InterruptedException {CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {return "dev";}).thenApply((a) -> {if(Objects.equals(a,"dev")){return "dev";}
return "prod";
});
System.out.println("以后环境为:" + cp1.get());
// 输入: 以后环境为:dev
}
异样回调
当 CompletableFuture 的工作不论是失常实现还是出现异常它都会调用 「whenComplete」 这回调函数。
- 「失常实现」:whenComplete 返回后果和下级工作统一,异样为 null;
- 「出现异常」:whenComplete 返回后果为 null,异样为下级工作的异样;
即调用 get()时,失常实现时就获取到后果,出现异常时就会抛出异样,须要你解决该异样。
上面来看看示例
1、只用 whenComplete
@Test
public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");
}
System.out.println("失常完结");
return 0.11;
}).whenComplete((aDouble, throwable) -> {if (aDouble == null) {System.out.println("whenComplete aDouble is null");
} else {System.out.println("whenComplete aDouble is" + aDouble);
}
if (throwable == null) {System.out.println("whenComplete throwable is null");
} else {System.out.println("whenComplete throwable is" + throwable.getMessage());
}
});
System.out.println("最终返回的后果 =" + future.get());
}
失常实现,没有异样时:
失常完结
whenComplete aDouble is 0.11
whenComplete throwable is null
最终返回的后果 = 0.11
出现异常时:get()会抛出异样
whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
2、whenComplete + exceptionally 示例
@Test
public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");
}
System.out.println("失常完结");
return 0.11;
}).whenComplete((aDouble, throwable) -> {if (aDouble == null) {System.out.println("whenComplete aDouble is null");
} else {System.out.println("whenComplete aDouble is" + aDouble);
}
if (throwable == null) {System.out.println("whenComplete throwable is null");
} else {System.out.println("whenComplete throwable is" + throwable.getMessage());
}
}).exceptionally((throwable) -> {System.out.println("exceptionally 中异样:" + throwable.getMessage());
return 0.0;
});
System.out.println("最终返回的后果 =" + future.get());
}
当出现异常时,exceptionally 中会捕捉该异样,给出默认返回值 0.0。
whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
exceptionally 中异样:java.lang.RuntimeException: 出错了
最终返回的后果 = 0.0
多任务组合回调
1、AND 组合关系
thenCombine / thenAcceptBoth / runAfterBoth 都示意:「当工作一和工作二都实现再执行工作三」。
区别在于:
- 「runAfterBoth」 不会把执行后果当做办法入参,且没有返回值
- 「thenAcceptBoth」: 会将两个工作的执行后果作为办法入参,传递到指定办法中,且无返回值
- 「thenCombine」:会将两个工作的执行后果作为办法入参,传递到指定办法中,且有返回值
示例
@Test
public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
// 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 开启异步工作 1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 1,以后线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步工作 1 完结");
return result;
}, executorService);
// 开启异步工作 2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 2,以后线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步工作 2 完结");
return result;
}, executorService);
// 工作组合
CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {System.out.println("执行工作 3,以后线程是:" + Thread.currentThread().getId());
System.out.println("工作 1 返回值:" + f1);
System.out.println("工作 2 返回值:" + f2);
return f1 + f2;
}, executorService);
Integer res = task3.get();
System.out.println("最终后果:" + res);
}
「运行后果」
异步工作 1,以后线程是:17
异步工作 1 完结
异步工作 2,以后线程是:18
异步工作 2 完结
执行工作 3,以后线程是:19
工作 1 返回值:2
工作 2 返回值:2
最终后果:4
2、OR 组合关系
applyToEither / acceptEither / runAfterEither 都示意:「两个工作,只有有一个工作实现,就执行工作三」。
区别在于:
- 「runAfterEither」:不会把执行后果当做办法入参,且没有返回值
- 「acceptEither」: 会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且无返回值
- 「applyToEither」:会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且有返回值
示例
@Test
public void testCompletableEitherAsync() {
// 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 开启异步工作 1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 1,以后线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步工作 1 完结");
return result;
}, executorService);
// 开启异步工作 2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 2,以后线程是:" + Thread.currentThread().getId());
int result = 1 + 2;
try {Thread.sleep(3000);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("异步工作 2 完结");
return result;
}, executorService);
// 工作组合
task.acceptEitherAsync(task2, (res) -> {System.out.println("执行工作 3,以后线程是:" + Thread.currentThread().getId());
System.out.println("上一个工作的后果为:"+res);
}, executorService);
}
运行后果
// 通过后果能够看出,异步工作 2 都没有执行完结,工作 3 获取的也是 1 的执行后果
异步工作 1,以后线程是:17
异步工作 1 完结
异步工作 2,以后线程是:18
执行工作 3,以后线程是:19
上一个工作的后果为:2
留神
如果把下面的外围线程数改为 1 也就是
ExecutorService executorService = Executors.newFixedThreadPool(1);
运行后果就是上面的了,会发现基本没有执行工作 3,显然是工作 3 间接被抛弃了。
异步工作 1,以后线程是:17
异步工作 1 完结
异步工作 2,以后线程是:17
3、多任务组合
- 「allOf」:期待所有工作实现
- 「anyOf」:只有有一个工作实现
示例
allOf:期待所有工作实现
@Test
public void testCompletableAnyOf() throws ExecutionException, InterruptedException {
// 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 开启异步工作 1
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
int result = 1 + 1;
return result;
}, executorService);
// 开启异步工作 2
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
int result = 1 + 2;
return result;
}, executorService);
// 开启异步工作 3
CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
int result = 1 + 3;
return result;
}, executorService);
// 工作组合
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);
// 只有有一个有工作实现
Object o = anyOf.get();
System.out.println("实现的工作的后果:" + o);
CompletableFuture 应用有哪些留神点
CompletableFuture 使咱们的异步编程更加便当的、代码更加优雅的同时,咱们也要关注下它,应用的一些留神点。
1、Future 须要获取返回值,能力获取异样信息
@Test
public void testWhenCompleteExceptionally() {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (1 == 1) {throw new RuntimeException("出错了");
}
return 0.11;
});
// 如果不加 get()办法这一行,看不到异样信息
//future.get();}
Future 须要获取返回值,能力获取到异样信息。如果不加 get()/join()办法,看不到异样信息。
小伙伴们应用的时候,留神一下哈, 思考是否加 try…catch… 或者应用 exceptionally 办法。
2、CompletableFuture 的 get()办法是阻塞的
CompletableFuture 的 get()办法是阻塞的,如果应用它来获取异步调用的返回值,须要增加超时工夫。
// 反例
CompletableFuture.get();
// 正例
CompletableFuture.get(5, TimeUnit.SECONDS);
3、不倡议应用默认线程池
CompletableFuture 代码中又应用了默认的「ForkJoin 线程池」,解决的线程个数是电脑「CPU 核数 -1」。在大量申请过去的时候,解决逻辑简单的话,响应会很慢。个别倡议应用自定义线程池,优化线程池配置参数。
4、自定义线程池时,留神饱和策略
CompletableFuture 的 get()办法是阻塞的,咱们个别倡议应用 future.get(5, TimeUnit.SECONDS)。并且个别倡议应用自定义线程池。
然而如果线程池回绝策略是 DiscardPolicy 或者 DiscardOldestPolicy,当线程池饱和时,会间接抛弃工作,不会摈弃异样。因而倡议,CompletableFuture 线程池策略最好应用 AbortPolicy,而后耗时的异步线程,做好线程池隔离哈。