关于java:奇淫巧技CompletableFuture-异步多线程是真的优雅

1次阅读

共计 12281 个字符,预计需要花费 31 分钟才能阅读完成。

一个示例回顾 Future

一些业务场景咱们须要应用多线程异步执行工作,放慢工作执行速度。

JDK5 新增了 Future 接口,用于形容一个异步计算的后果。

尽管 Future 以及相干应用办法提供了异步执行工作的能力,然而对于后果的获取却是很不不便,咱们必须应用 Future.get() 的形式阻塞调用线程,或者应用轮询形式判断 Future.isDone 工作是否完结,再获取后果。

这两种解决形式都不是很优雅,相干代码如下:

 @Test
    public void testFuture() throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<String> future = executorService.submit(() -> {Thread.sleep(2000);
            return "hello";
        });
        System.out.println(future.get());
        System.out.println("end");
    }

与此同时,Future 无奈解决多个异步工作须要相互依赖的场景,简略点说就是,主线程须要期待子线程工作执行结束之后在进行执行,这个时候你可能想到了「CountDownLatch」,没错的确能够解决,代码如下。

这里定义两个 Future,第一个通过用户 id 获取用户信息,第二个通过商品 id 获取商品信息。

@Test
    public void testCountDownLatch() throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(5);
        CountDownLatch downLatch = new CountDownLatch(2);
        long startTime = System.currentTimeMillis();
        Future<String> userFuture = executorService.submit(() -> {
            // 模仿查问商品耗时 500 毫秒
            Thread.sleep(500);
            downLatch.countDown();
            return "用户 A";
        });

        Future<String> goodsFuture = executorService.submit(() -> {
            // 模仿查问商品耗时 500 毫秒
            Thread.sleep(400);
            downLatch.countDown();
            return "商品 A";
        });

        downLatch.await();
        // 模仿主程序耗时工夫
        Thread.sleep(600);
        System.out.println("获取用户信息:" + userFuture.get());
        System.out.println("获取商品信息:" + goodsFuture.get());
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");

    }

「运行后果」

获取用户信息: 用户 A
获取商品信息: 商品 A
总共用时 1110ms

从运行后果能够看出后果都曾经获取,而且如果咱们不必异步操作,执行工夫应该是:500+400+600 = 1500, 用异步操作后理论只用 1110。

然而 Java8 当前我不在认为这是一种优雅的解决形式,接下来来理解下 CompletableFuture 的应用。

通过 CompletableFuture 实现下面示例

 @Test
    public void testCompletableInfo() throws InterruptedException, ExecutionException {long startTime = System.currentTimeMillis();
  
          // 调用用户服务获取用户根本信息
          CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
                  // 模仿查问商品耗时 500 毫秒
          {
              try {Thread.sleep(500);
              } catch (InterruptedException e) {e.printStackTrace();
              }
              return "用户 A";
          });
  
          // 调用商品服务获取商品根本信息
          CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
                  // 模仿查问商品耗时 500 毫秒
          {
              try {Thread.sleep(400);
              } catch (InterruptedException e) {e.printStackTrace();
              }
              return "商品 A";
          });
  
          System.out.println("获取用户信息:" + userFuture.get());
          System.out.println("获取商品信息:" + goodsFuture.get());
  
          // 模仿主程序耗时工夫
          Thread.sleep(600);
          System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }

运行后果

获取用户信息: 用户 A
获取商品信息: 商品 A
总共用时 1112ms

通过 CompletableFuture 能够很轻松的实现 CountDownLatch 的性能,你认为这就完结了,远远不止,CompletableFuture 比这要强多了。

比方能够实现: 工作 1 执行完了再执行工作 2, 甚至工作 1 执行的后果,作为工作 2 的入参数等等弱小性能,上面就来学学 CompletableFuture 的 API。

CompletableFuture 创立形式

1、罕用的 4 种创立形式

CompletableFuture 源码中有四个静态方法用来执行异步工作

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

个别咱们用下面的静态方法来创立 CompletableFuture,这里也解释下他们的区别:

  • 「supplyAsync」执行工作,反对返回值。
  • 「runAsync」执行工作,没有返回值。

「supplyAsync 办法」

// 应用默认内置线程池 ForkJoinPool.commonPool(),依据 supplier 构建执行工作
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 自定义线程,依据 supplier 构建执行工作
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

「runAsync 办法」

// 应用默认内置线程池 ForkJoinPool.commonPool(),依据 runnable 构建执行工作
public static CompletableFuture<Void> runAsync(Runnable runnable) 
// 自定义线程,依据 runnable 构建执行工作
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

2、后果获取的 4 种形式

对于后果的获取 CompltableFuture 类提供了四种形式

// 形式一
public T get()
// 形式二
public T get(long timeout, TimeUnit unit)
// 形式三
public T getNow(T valueIfAbsent)
// 形式四
public T join()

阐明

  • 「get()和 get(long timeout, TimeUnit unit)」 => 在 Future 中就曾经提供了,后者提供超时解决,如果在指定工夫内未获取后果将抛出超时异样
  • 「getNow」 => 立刻获取后果不阻塞,后果计算已实现将返回后果或计算过程中的异样,如果未计算实现将返回设定的 valueIfAbsent 值
  • 「join」 => 办法里不会抛出异样

示例

  @Test
    public void testCompletableGet() throws InterruptedException, ExecutionException {CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "商品 A";
        });

        // getNow 办法测试 
        System.out.println(cp1.getNow("商品 B"));

        //join 办法测试 
        CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp2.join());
       System.out.println("-----------------------------------------------------");
        //get 办法测试
        CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp3.get());
    }

「运行后果」

  • 第一个执行后果为 「商品 B」,因为要先睡上 1 秒后果不能立刻获取
  • join 办法获取后果办法里不会抛异样,然而执行后果会抛异样,抛出的异样为 CompletionException
  • get 办法获取后果办法里将抛出异样,执行后果抛出的异样为 ExecutionException

异步回调办法

1、thenRun/thenRunAsync

艰深点讲就是,「做完第一个工作后,再做第二个工作, 第二个工作也没有返回值」

示例

 @Test
    public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {long startTime = System.currentTimeMillis();
        
        CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {
            try {
                // 执行工作 A
                Thread.sleep(600);
            } catch (InterruptedException e) {e.printStackTrace();
            }

        });

        CompletableFuture<Void> cp2 =  cp1.thenRun(() -> {
            try {
                // 执行工作 B
                Thread.sleep(400);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        });

        // get 办法测试
        System.out.println(cp2.get());

        // 模仿主程序耗时工夫
        Thread.sleep(600);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }
    
    // 运行后果  
    /**
     *  null
     *  总共用时 1610ms
     */

「thenRun 和 thenRunAsync 有什么区别呢?」

如果你执行第一个工作的时候,传入了一个自定义线程池:

  • 调用 thenRun 办法执行第二个工作时,则第二个工作和第一个工作是共用同一个线程池。
  • 调用 thenRunAsync 执行第二个工作时,则第一个工作应用的是你本人传入的线程池,第二个工作应用的是 ForkJoin 线程池。

阐明: 前面介绍的 thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等,它们之间的区别也是这个。

2、thenAccept/thenAcceptAsync

第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,然而回调办法是没有返回值的。

示例

 @Test
    public void testCompletableThenAccept() throws ExecutionException, InterruptedException {long startTime = System.currentTimeMillis();
        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {return "dev";});
        CompletableFuture<Void> cp2 =  cp1.thenAccept((a) -> {System.out.println("上一个工作的返回后果为:" + a);
        });
     
        cp2.get();}

3、thenApply/thenApplyAsync

示意第一个工作执行实现后,执行第二个回调办法工作,会将该工作的执行后果,作为入参,传递到回调办法中,并且回调办法是有返回值的。

示例

  @Test
    public void testCompletableThenApply() throws ExecutionException, InterruptedException {CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {return "dev";}).thenApply((a) -> {if(Objects.equals(a,"dev")){return "dev";}
            return "prod";
        });

        System.out.println("以后环境为:" + cp1.get());

        // 输入: 以后环境为:dev
    }

异样回调

当 CompletableFuture 的工作不论是失常实现还是出现异常它都会调用 「whenComplete」 这回调函数。

  • 「失常实现」:whenComplete 返回后果和下级工作统一,异样为 null;
  • 「出现异常」:whenComplete 返回后果为 null,异样为下级工作的异样;

即调用 get()时,失常实现时就获取到后果,出现异常时就会抛出异样,须要你解决该异样。

上面来看看示例

1、只用 whenComplete

  @Test
    public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");
            }
            System.out.println("失常完结");
            return 0.11;

        }).whenComplete((aDouble, throwable) -> {if (aDouble == null) {System.out.println("whenComplete aDouble is null");
            } else {System.out.println("whenComplete aDouble is" + aDouble);
            }
            if (throwable == null) {System.out.println("whenComplete throwable is null");
            } else {System.out.println("whenComplete throwable is" + throwable.getMessage());
            }
        });
        System.out.println("最终返回的后果 =" + future.get());
    }

失常实现,没有异样时:

失常完结
whenComplete aDouble is 0.11
whenComplete throwable is null
最终返回的后果 = 0.11

出现异常时:get()会抛出异样

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了

java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

2、whenComplete + exceptionally 示例

@Test
    public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");
            }
            System.out.println("失常完结");
            return 0.11;

        }).whenComplete((aDouble, throwable) -> {if (aDouble == null) {System.out.println("whenComplete aDouble is null");
            } else {System.out.println("whenComplete aDouble is" + aDouble);
            }
            if (throwable == null) {System.out.println("whenComplete throwable is null");
            } else {System.out.println("whenComplete throwable is" + throwable.getMessage());
            }
        }).exceptionally((throwable) -> {System.out.println("exceptionally 中异样:" + throwable.getMessage());
            return 0.0;
        });

        System.out.println("最终返回的后果 =" + future.get());
    }

当出现异常时,exceptionally 中会捕捉该异样,给出默认返回值 0.0。

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
exceptionally 中异样:java.lang.RuntimeException: 出错了
最终返回的后果 = 0.0

多任务组合回调

1、AND 组合关系

thenCombine / thenAcceptBoth / runAfterBoth 都示意:「当工作一和工作二都实现再执行工作三」

区别在于:

  • 「runAfterBoth」 不会把执行后果当做办法入参,且没有返回值
  • 「thenAcceptBoth」: 会将两个工作的执行后果作为办法入参,传递到指定办法中,且无返回值
  • 「thenCombine」:会将两个工作的执行后果作为办法入参,传递到指定办法中,且有返回值

示例

@Test
    public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
        // 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 开启异步工作 1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 1,以后线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步工作 1 完结");
            return result;
        }, executorService);

        // 开启异步工作 2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 2,以后线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步工作 2 完结");
            return result;
        }, executorService);

        // 工作组合
        CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {System.out.println("执行工作 3,以后线程是:" + Thread.currentThread().getId());
            System.out.println("工作 1 返回值:" + f1);
            System.out.println("工作 2 返回值:" + f2);
            return f1 + f2;
        }, executorService);

        Integer res = task3.get();
        System.out.println("最终后果:" + res);
    }

「运行后果」

异步工作 1,以后线程是:17
异步工作 1 完结
异步工作 2,以后线程是:18
异步工作 2 完结
执行工作 3,以后线程是:19
工作 1 返回值:2
工作 2 返回值:2
最终后果:4

2、OR 组合关系

applyToEither / acceptEither / runAfterEither 都示意:「两个工作,只有有一个工作实现,就执行工作三」

区别在于:

  • 「runAfterEither」:不会把执行后果当做办法入参,且没有返回值
  • 「acceptEither」: 会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且无返回值
  • 「applyToEither」:会将曾经执行实现的工作,作为办法入参,传递到指定办法中,且有返回值

示例

@Test
    public void testCompletableEitherAsync() {
        // 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 开启异步工作 1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 1,以后线程是:" + Thread.currentThread().getId());

            int result = 1 + 1;
            System.out.println("异步工作 1 完结");
            return result;
        }, executorService);

        // 开启异步工作 2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("异步工作 2,以后线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {Thread.sleep(3000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("异步工作 2 完结");
            return result;
        }, executorService);

        // 工作组合
        task.acceptEitherAsync(task2, (res) -> {System.out.println("执行工作 3,以后线程是:" + Thread.currentThread().getId());
            System.out.println("上一个工作的后果为:"+res);
        }, executorService);
    }

运行后果

// 通过后果能够看出,异步工作 2 都没有执行完结,工作 3 获取的也是 1 的执行后果
异步工作 1,以后线程是:17
异步工作 1 完结
异步工作 2,以后线程是:18
执行工作 3,以后线程是:19
上一个工作的后果为:2

留神

如果把下面的外围线程数改为 1 也就是

 ExecutorService executorService = Executors.newFixedThreadPool(1);

运行后果就是上面的了,会发现基本没有执行工作 3,显然是工作 3 间接被抛弃了。

异步工作 1,以后线程是:17
异步工作 1 完结
异步工作 2,以后线程是:17

3、多任务组合

  • 「allOf」:期待所有工作实现
  • 「anyOf」:只有有一个工作实现

示例

allOf:期待所有工作实现

   @Test
    public void testCompletableAnyOf() throws ExecutionException, InterruptedException {
        // 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 开启异步工作 1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 1;
            return result;
        }, executorService);

        // 开启异步工作 2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 2;
            return result;
        }, executorService);

        // 开启异步工作 3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 3;
            return result;
        }, executorService);

        // 工作组合
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);
        // 只有有一个有工作实现
        Object o = anyOf.get();
        System.out.println("实现的工作的后果:" + o);

CompletableFuture 应用有哪些留神点

CompletableFuture 使咱们的异步编程更加便当的、代码更加优雅的同时,咱们也要关注下它,应用的一些留神点。

1、Future 须要获取返回值,能力获取异样信息


  @Test
    public void testWhenCompleteExceptionally() {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (1 == 1) {throw new RuntimeException("出错了");
            }
            return 0.11;
        });

        // 如果不加 get()办法这一行,看不到异样信息
        //future.get();}

Future 须要获取返回值,能力获取到异样信息。如果不加 get()/join()办法,看不到异样信息。

小伙伴们应用的时候,留神一下哈, 思考是否加 try…catch… 或者应用 exceptionally 办法。

2、CompletableFuture 的 get()办法是阻塞的

CompletableFuture 的 get()办法是阻塞的,如果应用它来获取异步调用的返回值,须要增加超时工夫。

// 反例
 CompletableFuture.get();
// 正例
CompletableFuture.get(5, TimeUnit.SECONDS);

3、不倡议应用默认线程池

CompletableFuture 代码中又应用了默认的「ForkJoin 线程池」,解决的线程个数是电脑「CPU 核数 -1」。在大量申请过去的时候,解决逻辑简单的话,响应会很慢。个别倡议应用自定义线程池,优化线程池配置参数。

4、自定义线程池时,留神饱和策略

CompletableFuture 的 get()办法是阻塞的,咱们个别倡议应用 future.get(5, TimeUnit.SECONDS)。并且个别倡议应用自定义线程池。

然而如果线程池回绝策略是 DiscardPolicy 或者 DiscardOldestPolicy,当线程池饱和时,会间接抛弃工作,不会摈弃异样。因而倡议,CompletableFuture 线程池策略最好应用 AbortPolicy,而后耗时的异步线程,做好线程池隔离哈。

正文完
 0