一个示例回顾Future

一些业务场景咱们须要应用多线程异步执行工作,放慢工作执行速度。

JDK5新增了Future接口,用于形容一个异步计算的后果。

尽管 Future 以及相干应用办法提供了异步执行工作的能力,然而对于后果的获取却是很不不便,咱们必须应用Future.get()的形式阻塞调用线程,或者应用轮询形式判断 Future.isDone 工作是否完结,再获取后果。

这两种解决形式都不是很优雅,相干代码如下:

 @Test    public void testFuture() throws ExecutionException, InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(5);        Future<String> future = executorService.submit(() -> {            Thread.sleep(2000);            return "hello";        });        System.out.println(future.get());        System.out.println("end");    }

与此同时,Future无奈解决多个异步工作须要相互依赖的场景,简略点说就是,主线程须要期待子线程工作执行结束之后在进行执行,这个时候你可能想到了「CountDownLatch」,没错的确能够解决,代码如下。

这里定义两个Future,第一个通过用户id获取用户信息,第二个通过商品id获取商品信息。

@Test    public void testCountDownLatch() throws InterruptedException, ExecutionException {        ExecutorService executorService = Executors.newFixedThreadPool(5);        CountDownLatch downLatch = new CountDownLatch(2);        long startTime = System.currentTimeMillis();        Future<String> userFuture = executorService.submit(() -> {            //模仿查问商品耗时500毫秒            Thread.sleep(500);            downLatch.countDown();            return "用户A";        });        Future<String> goodsFuture = executorService.submit(() -> {            //模仿查问商品耗时500毫秒            Thread.sleep(400);            downLatch.countDown();            return "商品A";        });        downLatch.await();        //模仿主程序耗时工夫        Thread.sleep(600);        System.out.println("获取用户信息:" + userFuture.get());        System.out.println("获取商品信息:" + goodsFuture.get());        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");    }

「运行后果」

获取用户信息:用户A获取商品信息:商品A总共用时1110ms

从运行后果能够看出后果都曾经获取,而且如果咱们不必异步操作,执行工夫应该是:500+400+600 = 1500,用异步操作后理论只用1110。

然而Java8当前我不在认为这是一种优雅的解决形式,接下来来理解下CompletableFuture的应用。

通过CompletableFuture实现下面示例

 @Test    public void testCompletableInfo() throws InterruptedException, ExecutionException {        long startTime = System.currentTimeMillis();            //调用用户服务获取用户根本信息          CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->                  //模仿查问商品耗时500毫秒          {              try {                  Thread.sleep(500);              } catch (InterruptedException e) {                  e.printStackTrace();              }              return "用户A";          });            //调用商品服务获取商品根本信息          CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->                  //模仿查问商品耗时500毫秒          {              try {                  Thread.sleep(400);              } catch (InterruptedException e) {                  e.printStackTrace();              }              return "商品A";          });            System.out.println("获取用户信息:" + userFuture.get());          System.out.println("获取商品信息:" + goodsFuture.get());            //模仿主程序耗时工夫          Thread.sleep(600);          System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");    }

运行后果

获取用户信息:用户A获取商品信息:商品A总共用时1112ms

通过CompletableFuture能够很轻松的实现CountDownLatch的性能,你认为这就完结了,远远不止,CompletableFuture比这要强多了。

比方能够实现:工作1执行完了再执行工作2,甚至工作1执行的后果,作为工作2的入参数等等弱小性能,上面就来学学CompletableFuture的API。

CompletableFuture创立形式

1、罕用的4种创立形式

CompletableFuture源码中有四个静态方法用来执行异步工作

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}public static CompletableFuture<Void> runAsync(Runnable runnable){..}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

个别咱们用下面的静态方法来创立CompletableFuture,这里也解释下他们的区别:

  • 「supplyAsync」执行工作,反对返回值。
  • 「runAsync」执行工作,没有返回值。

「supplyAsync办法」

//应用默认内置线程池ForkJoinPool.commonPool(),依据supplier构建执行工作public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//自定义线程,依据supplier构建执行工作public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

「runAsync办法」

//应用默认内置线程池ForkJoinPool.commonPool(),依据runnable构建执行工作public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,依据runnable构建执行工作public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

2、后果获取的4种形式

对于后果的获取CompltableFuture类提供了四种形式

//形式一public T get()//形式二public T get(long timeout, TimeUnit unit)//形式三public T getNow(T valueIfAbsent)//形式四public T join()

阐明

  • 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就曾经提供了,后者提供超时解决,如果在指定工夫内未获取后果将抛出超时异样
  • 「getNow」 => 立刻获取后果不阻塞,后果计算已实现将返回后果或计算过程中的异样,如果未计算实现将返回设定的valueIfAbsent值
  • 「join」 => 办法里不会抛出异样

示例

  @Test    public void testCompletableGet() throws InterruptedException, ExecutionException {        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "商品A";        });        // getNow办法测试         System.out.println(cp1.getNow("商品B"));        //join办法测试         CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));        System.out.println(cp2.join());       System.out.println("-----------------------------------------------------");        //get办法测试        CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));        System.out.println(cp3.get());    }

「运行后果」

  • 第一个执行后果为 「商品B」,因为要先睡上1秒后果不能立刻获取
  • join办法获取后果办法里不会抛异样,然而执行后果会抛异样,抛出的异样为CompletionException
  • get办法获取后果办法里将抛出异样,执行后果抛出的异样为ExecutionException

异步回调办法

1、thenRun/thenRunAsync

艰深点讲就是,「做完第一个工作后,再做第二个工作,第二个工作也没有返回值」

示例

 @Test    public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {        long startTime = System.currentTimeMillis();                CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {            try {                //执行工作A                Thread.sleep(600);            } catch (InterruptedException e) {                e.printStackTrace();            }        });        CompletableFuture<Void> cp2 =  cp1.thenRun(() -> {            try {                //执行工作B                Thread.sleep(400);            } catch (InterruptedException e) {                e.printStackTrace();            }        });        // get办法测试        System.out.println(cp2.get());        //模仿主程序耗时工夫        Thread.sleep(600);        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");    }        //运行后果      /**     *  null     *  总共用时1610ms     */

「thenRun 和thenRunAsync有什么区别呢?」

如果你执行第一个工作的时候,传入了一个自定义线程池:

  • 调用thenRun办法执行第二个工作时,则第二个工作和第一个工作是共用同一个线程池。
  • 调用thenRunAsync执行第二个工作时,则第一个工作应用的是你本人传入的线程池,第二个工作应用的是ForkJoin线程池。

阐明: 前面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

2、thenAccept/thenAcceptAsync

第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,然而回调办法是没有返回值的。

示例

 @Test    public void testCompletableThenAccept() throws ExecutionException, InterruptedException {        long startTime = System.currentTimeMillis();        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {            return "dev";        });        CompletableFuture<Void> cp2 =  cp1.thenAccept((a) -> {            System.out.println("上一个工作的返回后果为: " + a);        });             cp2.get();    }

3、 thenApply/thenApplyAsync

示意第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,并且回调办法是有返回值的。

示例

  @Test    public void testCompletableThenApply() throws ExecutionException, InterruptedException {        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {            return "dev";        }).thenApply((a) -> {            if(Objects.equals(a,"dev")){                return "dev";            }            return "prod";        });        System.out.println("以后环境为:" + cp1.get());        //输入: 以后环境为:dev    }

异样回调

当CompletableFuture的工作不论是失常实现还是出现异常它都会调用「whenComplete」这回调函数。

  • 「失常实现」:whenComplete返回后果和下级工作统一,异样为null;
  • 「出现异常」:whenComplete返回后果为null,异样为下级工作的异样;

即调用get()时,失常实现时就获取到后果,出现异常时就会抛出异样,须要你解决该异样。

上面来看看示例

1、只用whenComplete

  @Test    public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {            if (Math.random() < 0.5) {                throw new RuntimeException("出错了");            }            System.out.println("失常完结");            return 0.11;        }).whenComplete((aDouble, throwable) -> {            if (aDouble == null) {                System.out.println("whenComplete aDouble is null");            } else {                System.out.println("whenComplete aDouble is " + aDouble);            }            if (throwable == null) {                System.out.println("whenComplete throwable is null");            } else {                System.out.println("whenComplete throwable is " + throwable.getMessage());            }        });        System.out.println("最终返回的后果 = " + future.get());    }

失常实现,没有异样时:

失常完结whenComplete aDouble is 0.11whenComplete throwable is null最终返回的后果 = 0.11

出现异常时:get()会抛出异样

whenComplete aDouble is nullwhenComplete throwable is java.lang.RuntimeException: 出错了java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

2、whenComplete + exceptionally示例

@Test    public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {            if (Math.random() < 0.5) {                throw new RuntimeException("出错了");            }            System.out.println("失常完结");            return 0.11;        }).whenComplete((aDouble, throwable) -> {            if (aDouble == null) {                System.out.println("whenComplete aDouble is null");            } else {                System.out.println("whenComplete aDouble is " + aDouble);            }            if (throwable == null) {                System.out.println("whenComplete throwable is null");            } else {                System.out.println("whenComplete throwable is " + throwable.getMessage());            }        }).exceptionally((throwable) -> {            System.out.println("exceptionally中异样:" + throwable.getMessage());            return 0.0;        });        System.out.println("最终返回的后果 = " + future.get());    }

当出现异常时,exceptionally中会捕捉该异样,给出默认返回值0.0。

whenComplete aDouble is nullwhenComplete throwable is java.lang.RuntimeException: 出错了exceptionally中异样:java.lang.RuntimeException: 出错了最终返回的后果 = 0.0

多任务组合回调

1、AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都示意:「当工作一和工作二都实现再执行工作三」

区别在于:

  • 「runAfterBoth」 不会把执行后果当做办法入参,且没有返回值
  • 「thenAcceptBoth」: 会将两个工作的执行后果作为办法入参,传递到指定办法中,且无返回值
  • 「thenCombine」:会将两个工作的执行后果作为办法入参,传递到指定办法中,且有返回值

示例

@Test    public void testCompletableThenCombine() throws ExecutionException, InterruptedException {        //创立线程池        ExecutorService executorService = Executors.newFixedThreadPool(10);        //开启异步工作1        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {            System.out.println("异步工作1,以后线程是:" + Thread.currentThread().getId());            int result = 1 + 1;            System.out.println("异步工作1完结");            return result;        }, executorService);        //开启异步工作2        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {            System.out.println("异步工作2,以后线程是:" + Thread.currentThread().getId());            int result = 1 + 1;            System.out.println("异步工作2完结");            return result;        }, executorService);        //工作组合        CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {            System.out.println("执行工作3,以后线程是:" + Thread.currentThread().getId());            System.out.println("工作1返回值:" + f1);            System.out.println("工作2返回值:" + f2);            return f1 + f2;        }, executorService);        Integer res = task3.get();        System.out.println("最终后果:" + res);    }

「运行后果」

异步工作1,以后线程是:17异步工作1完结异步工作2,以后线程是:18异步工作2完结执行工作3,以后线程是:19工作1返回值:2工作2返回值:2最终后果:4

2、OR组合关系

applyToEither / acceptEither / runAfterEither 都示意:「两个工作,只有有一个工作实现,就执行工作三」

区别在于:

  • 「runAfterEither」:不会把执行后果当做办法入参,且没有返回值
  • 「acceptEither」: 会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且无返回值
  • 「applyToEither」:会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且有返回值

示例

@Test    public void testCompletableEitherAsync() {        //创立线程池        ExecutorService executorService = Executors.newFixedThreadPool(10);        //开启异步工作1        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {            System.out.println("异步工作1,以后线程是:" + Thread.currentThread().getId());            int result = 1 + 1;            System.out.println("异步工作1完结");            return result;        }, executorService);        //开启异步工作2        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {            System.out.println("异步工作2,以后线程是:" + Thread.currentThread().getId());            int result = 1 + 2;            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("异步工作2完结");            return result;        }, executorService);        //工作组合        task.acceptEitherAsync(task2, (res) -> {            System.out.println("执行工作3,以后线程是:" + Thread.currentThread().getId());            System.out.println("上一个工作的后果为:"+res);        }, executorService);    }

运行后果

//通过后果能够看出,异步工作2都没有执行完结,工作3获取的也是1的执行后果异步工作1,以后线程是:17异步工作1完结异步工作2,以后线程是:18执行工作3,以后线程是:19上一个工作的后果为:2

留神

如果把下面的外围线程数改为1也就是

 ExecutorService executorService = Executors.newFixedThreadPool(1);

运行后果就是上面的了,会发现基本没有执行工作3,显然是工作3间接被抛弃了。

异步工作1,以后线程是:17异步工作1完结异步工作2,以后线程是:17

3、多任务组合

  • 「allOf」:期待所有工作实现
  • 「anyOf」:只有有一个工作实现

示例

allOf:期待所有工作实现

   @Test    public void testCompletableAnyOf() throws ExecutionException, InterruptedException {        //创立线程池        ExecutorService executorService = Executors.newFixedThreadPool(10);        //开启异步工作1        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {            int result = 1 + 1;            return result;        }, executorService);        //开启异步工作2        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {            int result = 1 + 2;            return result;        }, executorService);        //开启异步工作3        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {            int result = 1 + 3;            return result;        }, executorService);        //工作组合        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);        //只有有一个有工作实现        Object o = anyOf.get();        System.out.println("实现的工作的后果:" + o);

CompletableFuture应用有哪些留神点

CompletableFuture 使咱们的异步编程更加便当的、代码更加优雅的同时,咱们也要关注下它,应用的一些留神点。

1、Future须要获取返回值,能力获取异样信息

  @Test    public void testWhenCompleteExceptionally() {        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {            if (1 == 1) {                throw new RuntimeException("出错了");            }            return 0.11;        });        //如果不加 get()办法这一行,看不到异样信息        //future.get();    }

Future须要获取返回值,能力获取到异样信息。如果不加 get()/join()办法,看不到异样信息。

小伙伴们应用的时候,留神一下哈,思考是否加try...catch...或者应用exceptionally办法。

2、CompletableFuture的get()办法是阻塞的

CompletableFuture的get()办法是阻塞的,如果应用它来获取异步调用的返回值,须要增加超时工夫。

//反例 CompletableFuture.get();//正例CompletableFuture.get(5, TimeUnit.SECONDS);

3、不倡议应用默认线程池

CompletableFuture代码中又应用了默认的「ForkJoin线程池」,解决的线程个数是电脑「CPU核数-1」。在大量申请过去的时候,解决逻辑简单的话,响应会很慢。个别倡议应用自定义线程池,优化线程池配置参数。

4、自定义线程池时,留神饱和策略

CompletableFuture的get()办法是阻塞的,咱们个别倡议应用future.get(5, TimeUnit.SECONDS)。并且个别倡议应用自定义线程池。

然而如果线程池回绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会间接抛弃工作,不会摈弃异样。因而倡议,CompletableFuture线程池策略最好应用AbortPolicy,而后耗时的异步线程,做好线程池隔离哈。