听说你还不知道CompletableFuture

40次阅读

共计 4903 个字符,预计需要花费 13 分钟才能阅读完成。

java8 已经在日常开发编码中非常普遍了,掌握运用好它可以在开发中运用几行精简代码就可以完成所需功能。
今天将介绍 CompletableFuture 的在生产环境如何使用实践。CompletableFuture 类作为 Java 8 Concurrency API 改进而引入,熟悉的同学应该了解在 Java 9 也有对 CompletableFuture 有一些改进,橘子之后再进入讲解。
阅读这篇文章需要知道的前置知识点有,函数式编程,线程池原理等。还不熟悉的同学可以看看之前的文章,话不多说,开始吧。

为了更好的表达,我们结合例子讲解,假设今天小橘收到 TL 任务,要求完成实时拉取数据的功能,完成后告知拉取完成。假设拉取数据需要从 A,B,C 三个服务中获取,拉取完成推送需要调用 D 服务。

需求变更 1:拉取数据需要从 E 服务获取,但是会依赖从 A 服务获取的结果。
需求变更 2:从 A 服务一次能拉去一万 + 数据,但是 E 服务的性能支撑不了大调用,在 Provider 端有限流兜底。
需求变更 3:拉取数据过程中需要保证数据完整性,不能出现统计错误。

为什么使用 CompletableFuture

橘友们说了,这个可以用 jdk5.0 提供的 Future 来实现,我们将拉取数据需要用到的从 A,B,C 三个服务接口放到 FutureTask 中,异步的去执行获取数据结果,然后再同步调用 D 服务。
OK,简单实现这个功能没有问题,但是有什么缺陷,需要怎么可以改进嘛?
我们通过源码注释可以看到 Future 类返回的结果需要阻塞等待 get 方法返回结果,它提供了 isDone() 方法检测异步计算是否已经结束,get() 方法等待异步操作结束,以及获取计算的结果。等到所有 Future 任务完成,通知线程获取结果并合并。

从性能上,需要等待 Future 集合中的所有任务都完成(此需求没问题,接着往下看), 从健壮性上,Futrue 接口没有方法去进行计算组合或者处理可能出现的错误。从功能扩展上,Future 接口无法进行多个异步计算之间相互独立,同时第二个又依赖于第一个的结果。而今天的主角 CompletableFuture 都可以满足上述功能,具有大约 50 种不同的构成,结合,执行异步计算步骤和处理错误。(全部学习完所有方法是不现实的,掌握灵魂和核心方法即可依法炮制)

CompletableFuture API 使用

API 太多,简单列举。读者自行学习即可,本文重点不在介绍 api

/**
   任务 A 执行完执行 B,执行 B 不需要依赖 A 的结果同时 B 不返回结果。*/
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}); 
/**
   任务 A 执行完执行 B,B 执行依赖 A 结果同时 B 不返回结果
*/
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
/**
   任务 A 执行完执行 B,B 执行依赖 A 结果同时 B 返回结果
*/
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + "resultB");

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "orange")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "csong"));
//true
assertEquals("orangecsong", completableFuture.get());

你的疑问:该 thenCompose 方法不和 thenApply 一样实现结果合并计算嘛?

刚学习时候确实有点迷惑,其实他们的内部形式是不一样的,它们与 Java 8 中可用的 Stream 和 Optional 类的 map 和 flatMap 方法是有着类似的设计思路在里面的。都是接收一个 CompletableFuture 并将其应用于计算结果,但 thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另一个 CompletableFuture 对象。

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "orange")
    .thenCombine(CompletableFuture.supplyAsync(() -> "chizongzi"), (s1, s2) -> s1 + s2));

assertEquals("orange chizongzi", completableFuture.get());

thenCombine 方法旨在当你想要使用多个计算结果时,而后续的处理同时需要依赖返回值,第一个计算结果返回 “orange”,第二个计算结果返回 “chizongzi”,对结果进行拼接,那么结果就是 ”orange chizongzi” 啦。你可能会问如果结果无需处理呢?thenAcceptBoth 将可以实现你的功能。那么它和 thenApply 的区别又是啥呢?
thenCompose() 方法是使用前一个 Future 作为参数。它会直接使结果变新的 Future,而不是我们在 thenApply() 中到的嵌套 Future,而是用来连接两个 CompletableFuture,是生成一个新的 CompletableFuture,因此,如果想要继续嵌套链接 CompletableFuture 方法,那么最好使用 thenCompose()。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}

当我们需要并行执行多个任务时,我们通常希望等待所有它们执行,然后处理它们的组合结果。CompletableFuture 提供了 allOf 静态方法允许等待所有的完成任务,但是它返回类型是 CompletableFuture。局限性在于它不会返回所有任务的综合结果。相反,你必须手动从 Futures 获取结果。那么怎么解决呢,CompletableFuture 提供了 join() 可以解决,这里小橘用 Stream 实现同样可以的。

String multiFutures= Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Today is sun", multiFutures);

那么 CompletableFuture 针对异常是如何处理的呢?

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + "resultB")
    .thenApply(resultB -> resultB + "resultC")

// 如果 resultA,resultB,resultC 在获取中有异常

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {throw new RuntimeException();
}).exceptionally(ex -> "errorResultA")
  .thenApply(resultA -> resultA + "resultB")
  .thenApply(resultB -> resultB + "resultC")

上面的代码中,任务 A 抛出异常,然后通过 exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。如果 inovke future.join 方法结果将会输出 “errorResultA resultB result C”

上述方法基本就是底层函数式 api 的使用,聪明的橘友们实践起来吧!

CompletableFuture 例子

Talk is cheap , show me code。自从上篇 你还在担心 rpc 接口超时吗 文章末尾讲述大批量调用,其中是顺序 invoke 调用,其实我们分析,异步调用利用 CompletableFuture 需要怎么实现呢?

/**
 @Description:
 @author: orangeCs
 @create: 2020-06-25
*/
public class AsyncInvokeUtil {private AsyncInvokeUtil() {}

    /**
      @param paramList 源数据(需处理数据载体)@param buildParam 中转函数 (获取的结果做一层 trans, 来满足调用服务条件)
      @param transParam 中转函数 (获取的结果做一层 trans, 来满足调用服务条件)
      @param processFunction 中转处理函数
      @param size 分批大小
      @param executorService 暴露外部自定义实现线程池(demo 没判空, 可以做成非必传)@param <R>
      @param <T>
      @param <P>
      @param <k>
      @return
      @throws ExecutionException
      @throws InterruptedException
    */
    public static <R, T, P, k> List<R> partitionAsyncInvokeWithRes(List<T> paramList,
                                                                      Function<List<T>, P> buildParam,
                                                                      Function<P, List<k>> transParam,
                                                                      Function<List<k>,List<R>> processFunction,
                                                                      Integer size,
                                                                      ExecutorService executorService) throws ExecutionException, InterruptedException {List<CompletableFuture<List<R>>> completableFutures = Lists.partition(paramList, size).stream()
                .map(buildParam)
                .map(transParam)
                .map(eachList -> CompletableFuture.supplyAsync(() ->
                        processFunction.apply(eachList), executorService))
                .collect(Collectors.toList());
        //get
        CompletableFuture<Void> finishCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
        finishCompletableFuture.get();
        return completableFutures.stream().map(CompletableFuture::join)
                .filter(Objects::nonNull).reduce(new ArrayList<>(), (resList1, resList2) -> {resList1.addAll(resList2);
                    return resList1;
                });
    }

}       

仅仅这一篇文章是不够的,任何知识都是长期积累,反复思考才能变成自己的东西,在浮躁的社会,我们年轻人切勿浮躁,今天介绍到这里了,喜欢博主的朋友们记得点个关注哦。

正文完
 0