一个示例回顾Future
一些业务场景咱们须要应用多线程异步执行工作,放慢工作执行速度。
JDK5新增了Future
接口,用于形容一个异步计算的后果。
尽管 Future 以及相干应用办法提供了异步执行工作的能力,然而对于后果的获取却是很不不便,咱们必须应用Future.get()
的形式阻塞调用线程,或者应用轮询形式判断 Future.isDone
工作是否完结,再获取后果。
这两种解决形式都不是很优雅,相干代码如下:
@Test public void testFuture() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); Future<String> future = executorService.submit(() -> { Thread.sleep(2000); return "hello"; }); System.out.println(future.get()); System.out.println("end"); }
与此同时,Future无奈解决多个异步工作须要相互依赖的场景,简略点说就是,主线程须要期待子线程工作执行结束之后在进行执行,这个时候你可能想到了「CountDownLatch」,没错的确能够解决,代码如下。
这里定义两个Future,第一个通过用户id获取用户信息,第二个通过商品id获取商品信息。
@Test public void testCountDownLatch() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(5); CountDownLatch downLatch = new CountDownLatch(2); long startTime = System.currentTimeMillis(); Future<String> userFuture = executorService.submit(() -> { //模仿查问商品耗时500毫秒 Thread.sleep(500); downLatch.countDown(); return "用户A"; }); Future<String> goodsFuture = executorService.submit(() -> { //模仿查问商品耗时500毫秒 Thread.sleep(400); downLatch.countDown(); return "商品A"; }); downLatch.await(); //模仿主程序耗时工夫 Thread.sleep(600); System.out.println("获取用户信息:" + userFuture.get()); System.out.println("获取商品信息:" + goodsFuture.get()); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }
「运行后果」
获取用户信息:用户A获取商品信息:商品A总共用时1110ms
从运行后果能够看出后果都曾经获取,而且如果咱们不必异步操作,执行工夫应该是:500+400+600 = 1500,用异步操作后理论只用1110。
然而Java8当前我不在认为这是一种优雅的解决形式,接下来来理解下CompletableFuture的应用。
通过CompletableFuture实现下面示例
@Test public void testCompletableInfo() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); //调用用户服务获取用户根本信息 CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> //模仿查问商品耗时500毫秒 { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "用户A"; }); //调用商品服务获取商品根本信息 CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() -> //模仿查问商品耗时500毫秒 { try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); System.out.println("获取用户信息:" + userFuture.get()); System.out.println("获取商品信息:" + goodsFuture.get()); //模仿主程序耗时工夫 Thread.sleep(600); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }
运行后果
获取用户信息:用户A获取商品信息:商品A总共用时1112ms
通过CompletableFuture能够很轻松的实现CountDownLatch的性能,你认为这就完结了,远远不止,CompletableFuture比这要强多了。
比方能够实现:工作1执行完了再执行工作2,甚至工作1执行的后果,作为工作2的入参数等等弱小性能,上面就来学学CompletableFuture的API。
CompletableFuture创立形式
1、罕用的4种创立形式
CompletableFuture源码中有四个静态方法用来执行异步工作
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}public static CompletableFuture<Void> runAsync(Runnable runnable){..}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
个别咱们用下面的静态方法来创立CompletableFuture,这里也解释下他们的区别:
- 「supplyAsync」执行工作,反对返回值。
- 「runAsync」执行工作,没有返回值。
「supplyAsync办法」
//应用默认内置线程池ForkJoinPool.commonPool(),依据supplier构建执行工作public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//自定义线程,依据supplier构建执行工作public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
「runAsync办法」
//应用默认内置线程池ForkJoinPool.commonPool(),依据runnable构建执行工作public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,依据runnable构建执行工作public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
2、后果获取的4种形式
对于后果的获取CompltableFuture类提供了四种形式
//形式一public T get()//形式二public T get(long timeout, TimeUnit unit)//形式三public T getNow(T valueIfAbsent)//形式四public T join()
阐明
:
- 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就曾经提供了,后者提供超时解决,如果在指定工夫内未获取后果将抛出超时异样
- 「getNow」 => 立刻获取后果不阻塞,后果计算已实现将返回后果或计算过程中的异样,如果未计算实现将返回设定的valueIfAbsent值
- 「join」 => 办法里不会抛出异样
示例
:
@Test public void testCompletableGet() throws InterruptedException, ExecutionException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); // getNow办法测试 System.out.println(cp1.getNow("商品B")); //join办法测试 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp2.join()); System.out.println("-----------------------------------------------------"); //get办法测试 CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp3.get()); }
「运行后果」:
- 第一个执行后果为 「商品B」,因为要先睡上1秒后果不能立刻获取
- join办法获取后果办法里不会抛异样,然而执行后果会抛异样,抛出的异样为CompletionException
- get办法获取后果办法里将抛出异样,执行后果抛出的异样为ExecutionException
异步回调办法
1、thenRun/thenRunAsync
艰深点讲就是,「做完第一个工作后,再做第二个工作,第二个工作也没有返回值」。
示例
@Test public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> { try { //执行工作A Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> cp2 = cp1.thenRun(() -> { try { //执行工作B Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } }); // get办法测试 System.out.println(cp2.get()); //模仿主程序耗时工夫 Thread.sleep(600); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } //运行后果 /** * null * 总共用时1610ms */
「thenRun 和thenRunAsync有什么区别呢?」
如果你执行第一个工作的时候,传入了一个自定义线程池:
- 调用thenRun办法执行第二个工作时,则第二个工作和第一个工作是共用同一个线程池。
- 调用thenRunAsync执行第二个工作时,则第一个工作应用的是你本人传入的线程池,第二个工作应用的是ForkJoin线程池。
阐明
: 前面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。
2、thenAccept/thenAcceptAsync
第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,然而回调办法是没有返回值的。
示例
@Test public void testCompletableThenAccept() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }); CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> { System.out.println("上一个工作的返回后果为: " + a); }); cp2.get(); }
3、 thenApply/thenApplyAsync
示意第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,并且回调办法是有返回值的。
示例
@Test public void testCompletableThenApply() throws ExecutionException, InterruptedException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }).thenApply((a) -> { if(Objects.equals(a,"dev")){ return "dev"; } return "prod"; }); System.out.println("以后环境为:" + cp1.get()); //输入: 以后环境为:dev }
异样回调
当CompletableFuture的工作不论是失常实现还是出现异常它都会调用「whenComplete」这回调函数。
- 「失常实现」:whenComplete返回后果和下级工作统一,异样为null;
- 「出现异常」:whenComplete返回后果为null,异样为下级工作的异样;
即调用get()时,失常实现时就获取到后果,出现异常时就会抛出异样,须要你解决该异样。
上面来看看示例
1、只用whenComplete
@Test public void testCompletableWhenComplete() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出错了"); } System.out.println("失常完结"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }); System.out.println("最终返回的后果 = " + future.get()); }
失常实现,没有异样时:
失常完结whenComplete aDouble is 0.11whenComplete throwable is null最终返回的后果 = 0.11
出现异常时:get()会抛出异样
whenComplete aDouble is nullwhenComplete throwable is java.lang.RuntimeException: 出错了java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
2、whenComplete + exceptionally示例
@Test public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出错了"); } System.out.println("失常完结"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }).exceptionally((throwable) -> { System.out.println("exceptionally中异样:" + throwable.getMessage()); return 0.0; }); System.out.println("最终返回的后果 = " + future.get()); }
当出现异常时,exceptionally中会捕捉该异样,给出默认返回值0.0。
whenComplete aDouble is nullwhenComplete throwable is java.lang.RuntimeException: 出错了exceptionally中异样:java.lang.RuntimeException: 出错了最终返回的后果 = 0.0
多任务组合回调
1、AND组合关系
thenCombine / thenAcceptBoth / runAfterBoth都示意:「当工作一和工作二都实现再执行工作三」。
区别在于:
- 「runAfterBoth」 不会把执行后果当做办法入参,且没有返回值
- 「thenAcceptBoth」: 会将两个工作的执行后果作为办法入参,传递到指定办法中,且无返回值
- 「thenCombine」:会将两个工作的执行后果作为办法入参,传递到指定办法中,且有返回值
示例
@Test public void testCompletableThenCombine() throws ExecutionException, InterruptedException { //创立线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步工作1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步工作1,以后线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步工作1完结"); return result; }, executorService); //开启异步工作2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步工作2,以后线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步工作2完结"); return result; }, executorService); //工作组合 CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> { System.out.println("执行工作3,以后线程是:" + Thread.currentThread().getId()); System.out.println("工作1返回值:" + f1); System.out.println("工作2返回值:" + f2); return f1 + f2; }, executorService); Integer res = task3.get(); System.out.println("最终后果:" + res); }
「运行后果」
异步工作1,以后线程是:17异步工作1完结异步工作2,以后线程是:18异步工作2完结执行工作3,以后线程是:19工作1返回值:2工作2返回值:2最终后果:4
2、OR组合关系
applyToEither / acceptEither / runAfterEither 都示意:「两个工作,只有有一个工作实现,就执行工作三」。
区别在于:
- 「runAfterEither」:不会把执行后果当做办法入参,且没有返回值
- 「acceptEither」: 会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且无返回值
- 「applyToEither」:会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且有返回值
示例
@Test public void testCompletableEitherAsync() { //创立线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步工作1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步工作1,以后线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步工作1完结"); return result; }, executorService); //开启异步工作2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步工作2,以后线程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步工作2完结"); return result; }, executorService); //工作组合 task.acceptEitherAsync(task2, (res) -> { System.out.println("执行工作3,以后线程是:" + Thread.currentThread().getId()); System.out.println("上一个工作的后果为:"+res); }, executorService); }
运行后果
//通过后果能够看出,异步工作2都没有执行完结,工作3获取的也是1的执行后果异步工作1,以后线程是:17异步工作1完结异步工作2,以后线程是:18执行工作3,以后线程是:19上一个工作的后果为:2
留神
如果把下面的外围线程数改为1也就是
ExecutorService executorService = Executors.newFixedThreadPool(1);
运行后果就是上面的了,会发现基本没有执行工作3,显然是工作3间接被抛弃了。
异步工作1,以后线程是:17异步工作1完结异步工作2,以后线程是:17
3、多任务组合
- 「allOf」:期待所有工作实现
- 「anyOf」:只有有一个工作实现
示例
allOf:期待所有工作实现
@Test public void testCompletableAnyOf() throws ExecutionException, InterruptedException { //创立线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步工作1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { int result = 1 + 1; return result; }, executorService); //开启异步工作2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { int result = 1 + 2; return result; }, executorService); //开启异步工作3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { int result = 1 + 3; return result; }, executorService); //工作组合 CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3); //只有有一个有工作实现 Object o = anyOf.get(); System.out.println("实现的工作的后果:" + o);
CompletableFuture应用有哪些留神点
CompletableFuture 使咱们的异步编程更加便当的、代码更加优雅的同时,咱们也要关注下它,应用的一些留神点。
1、Future须要获取返回值,能力获取异样信息
@Test public void testWhenCompleteExceptionally() { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (1 == 1) { throw new RuntimeException("出错了"); } return 0.11; }); //如果不加 get()办法这一行,看不到异样信息 //future.get(); }
Future须要获取返回值,能力获取到异样信息。如果不加 get()/join()办法,看不到异样信息。
小伙伴们应用的时候,留神一下哈,思考是否加try...catch...或者应用exceptionally办法。
2、CompletableFuture的get()办法是阻塞的
CompletableFuture的get()办法是阻塞的,如果应用它来获取异步调用的返回值,须要增加超时工夫。
//反例 CompletableFuture.get();//正例CompletableFuture.get(5, TimeUnit.SECONDS);
3、不倡议应用默认线程池
CompletableFuture代码中又应用了默认的「ForkJoin线程池」,解决的线程个数是电脑「CPU核数-1」。在大量申请过去的时候,解决逻辑简单的话,响应会很慢。个别倡议应用自定义线程池,优化线程池配置参数。
4、自定义线程池时,留神饱和策略
CompletableFuture的get()办法是阻塞的,咱们个别倡议应用future.get(5, TimeUnit.SECONDS)。并且个别倡议应用自定义线程池。
然而如果线程池回绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会间接抛弃工作,不会摈弃异样。因而倡议,CompletableFuture线程池策略最好应用AbortPolicy,而后耗时的异步线程,做好线程池隔离哈。