关于java:搞定-CompletableFuture并发异步编程和编写串行程序还有什么区别你们要的多图长文

34次阅读

共计 12197 个字符,预计需要花费 31 分钟才能阅读完成。

  • 你有一个思维,我有一个思维,咱们替换后,一个人就有两个思维
  • If you can NOT explain it simply, you do NOT understand it well enough

前言

上一篇文章 不会用 Java Future,我狐疑你泡茶没我快 全面剖析了 Future,通过它咱们能够获取线程的执行后果,它尽管解决了 Runnable 的“三无”短板,然而它本身还是有短板:

不能手动实现计算

假如你应用 Future 运行子线程调用近程 API 来获取某款产品的最新价格,服务器因为洪灾宕机了,此时如果你想手动完结计算,而是想返回上次缓存中的价格,这是 Future 做不到的

调用 get() 办法会阻塞程序

Future 不会告诉你它的实现,它提供了一个 get()办法,程序调用该办法会阻塞直到后果可用为止,没有方法利用回调函数附加到 Future,并在 Future 的后果可用时主动调用它

不能链式执行

烧水泡茶中,通过构造函数传参做到多个工作的链式执行,万一有更多的工作,或是工作链的执行程序有变,对原有程序的影响都是十分大的

整合多个 Future 执行后果形式轻便

假如有多个 Future 并行执行,须要在这些工作全副执行实现之后做后续操作,Future 自身是做不到的,须要借助工具类 Executors 的办法

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)

没有异样解决

Future 同样没有提供很好的异样解决计划

上一篇文章看 Future 感觉是发现了新天地,这么一说有感觉回到了解放前

对于 Java 后端的同学,在 Java1.8 之前想实现异步编程,还想避开上述这些懊恼,ReactiveX 应该是一个常见解决方案(做 Android 的应该会有理解)。如果相熟前端同学,ES6 Promise(男朋友的承诺)也解决了异步编程的懊恼

天下语言都在彼此借鉴相应长处,Java 作为老牌劲旅天然也要解决上述问题。又是那个男人,并发巨匠 Doug Lea 忧天下程序员之忧,解天下程序员之困扰,在 Java1.8 版本(Lambda 横空出世)中,新增了一个并发工具类 CompletableFuture,它的呈现,让人在泡茶过程中,品味到了不一样的滋味 ……

几个重要 Lambda 函数

CompletableFuture 在 Java1.8 的版本中呈现,天然也得搭上 Lambda 的逆风车,为了更好的了解 CompletableFuture,这里我须要先介绍一下几个 Lambda 函数,咱们只须要关注它们的以下几点就能够:

  • 参数承受模式
  • 返回值模式
  • 函数名称

Runnable

Runnable 咱们曾经说过无数次了,无参数,无返回值

@FunctionalInterface
public interface Runnable {public abstract void run();
}

Function

Function<T, R> 承受一个参数,并且有返回值

@FunctionalInterface
public interface Function<T, R> {R apply(T t);
}

Consumer

Consumer<T> 承受一个参数,没有返回值

@FunctionalInterface
public interface Consumer<T> {void accept(T t);
}

Supplier

Supplier<T> 没有参数,有一个返回值

@FunctionalInterface
public interface Supplier<T> {T get();
}

BiConsumer

BiConsumer<T, U> 承受两个参数(Bi,英文单词词根,代表两个的意思),没有返回值

@FunctionalInterface
public interface BiConsumer<T, U> {void accept(T t, U u);

好了,咱们做个小汇总

有些同学可能有疑难,为什么要关注这几个函数式接口,因为 CompletableFuture 的函数命名以及其作用都是和这几个函数式接口高度相干的,一会你就会发现了

前戏做足,终于能够进入正题了 CompletableFuture

CompletableFuture

类构造

老规矩,先从类构造看起:

实现了 Future 接口

实现了 Future 接口,那就具备 Future 接口的相干个性,请脑补 Future 那少的可怜的 5 个办法,这里不再赘述,具体请查看 不会用 Java Future,我狐疑你泡茶没我快

实现了 CompletionStage 接口

CompletionStage 这个接口还是挺生疏的,中文直译过去是【完工阶段】,如果将烧水泡茶比喻成一项大工程,他们的完工阶段体现是不一样的

  1. 单看线程 1 或单看线程 2 就是一种串行关系,做完一步之后做下一步
  2. 一起看线程 1 和 线程 2,它们彼此就是并行关系,两个线程做的事彼此独立互补烦扰
  3. 泡茶就是线程 1 和 线程 2 的汇总 / 组合,也就是线程 1 和 线程 2 都实现之后能力到这个阶段(当然也存在线程 1 或 线程 2 任意一个线程完工就能够开启下一阶段的场景)

所以,CompletionStage 接口的作用就做了这点事,所有函数都用于形容工作的时序关系,总结起来就是这个样子:

CompletableFuture 既然实现了两个接口,天然也就会实现相应的办法充分利用其接口个性,咱们走进它的办法来看一看

CompletableFuture 大概有 50 种不同解决串行,并行,组合以及处理错误的办法。小弟屏幕不争气,办法之多,一个屏幕装不下,看到这么多办法,是不是霎时要间接 珍藏——> 吃灰 2 连走人?别放心,咱们依照相应的命名和作用进行分类,分分钟搞定 50 多种办法

串行关系

then 直译【而后】,也就是示意下一步,所以通常是一种串行关系体现, then 前面的单词(比方 run /apply/accept)就是下面说的函数式接口中的形象办法名称了,它的作用和那几个函数式接口的作用是一样一样滴

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
  
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  
CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
  
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

聚合 And 关系

combine... with...both...and... 都是要求两者都满足,也就是 and 的关系了

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
  
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)

聚合 Or 关系

Either...or... 示意两者中的一个,天然也就是 Or 的体现了

<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(、CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)

异样解决

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
        
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
        
       
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

这个异样解决看着还挺吓人的,拿传统的 try/catch/finally 做个比照也就霎时秒懂了

whenComplete 和 handle 的区别如果你看承受的参数函数式接口名称你也就能看出差异了,前者应用 Comsumer, 天然也就不会有返回值;后者应用 Function,天然也就会有返回值

这里并没有全副列举,不过置信很多同学曾经发现了法则:

CompletableFuture 提供的所有回调办法都有两个异步(Async)变体,都像这样

// thenApply() 的变体
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

另外, 办法的名称也都与前戏中说的函数式接口齐全匹配,依照这中法则分类之后,这 50 多个办法看起来是不是很轻松了呢?

根本办法曾经列举的差不多了,接下来咱们通过一些例子来理论演示一下:

案例演示

创立一个 CompletableFuture 对象

创立一个 CompletableFuture 对象并没有什么稀奇的,仍旧是通过构造函数构建

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

这是最简略的 CompletableFuture 对象创立形式,因为它实现了 Future 接口,所以天然就能够通过 get() 办法获取后果

String result = completableFuture.get();

文章结尾曾经说过,get()办法在工作完结之前将始终处在阻塞状态,因为下面创立的 Future 没有返回,所以在这里调用 get() 将会永久性的梗塞

这时就须要咱们调用 complete() 办法手动的完结一个 Future

completableFuture.complete("Future's Result Here Manually");

这时,所有期待这个 Future 的 client 都会返回手动完结的指定后果

runAsync

应用 runAsync 进行异步计算

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {throw new IllegalStateException(e);
    }
    System.out.println("运行在一个独自的线程当中");
});

future.get();

因为应用的是 Runnable 函数式表达式,天然也不会获取到后果

supplyAsync

应用 runAsync 是没有返回后果的,咱们想获取异步计算的返回后果须要应用 supplyAsync() 办法

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {throw new IllegalStateException(e);
            }
            log.info("运行在一个独自的线程当中");
            return "我有返回值";
        });

        log.info(future.get());

因为应用的是 Supplier 函数式表达式,天然能够取得返回后果

咱们曾经屡次说过,get() 办法在 Future 计算实现之前会始终处在 blocking 状态下,对于真正的异步解决,咱们心愿的是能够通过传入回调函数,在 Future 完结时主动调用该回调函数,这样,咱们就不必期待后果

CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> {
          // 能够正文掉做疾速返回 start
            try {TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {throw new IllegalStateException(e);
            }
            log.info("????");
          // 能够正文掉做疾速返回 end
            return "赞";
        })
                .thenApply(first -> {log.info("在看");
                    return first + ", 在看";
                })
                .thenApply(second -> second + ", 转发");

        log.info("三连有没有?");
        log.info(comboText.get());

对 thenApply 的调用并没有阻塞程序打印 log,也就是后面说的通过回调告诉机制,这里你看到 thenApply 应用的是 supplyAsync 所用的线程,如果将 supplyAsync 做疾速返回,咱们再来看一下运行后果:

thenApply 此时应用的是主线程,所以:

串行的后续操作并不一定会和前序操作应用同一个线程

thenAccept

如果你不想从回调函数中返回任何后果,那能够应用 thenAccept

        final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(
                // 模仿远端 API 调用,这里只返回了一个结构的对象
                () -> Product.builder().id(12345L).name("颈椎 / 腰椎治疗仪").build())
                .thenAccept(product -> {log.info("获取到近程 API 产品名称" + product.getName());
                });
        voidCompletableFuture.get();

thenRun

thenAccept 能够从回调函数中获取前序执行的后果,但 thenRun 却不能够,因为它的回调函数式表达式定义中没有任何参数

CompletableFuture.supplyAsync(() -> {// 前序操作}).thenRun(() -> {// 串行的后需操作,无参数也无返回值});

咱们后面同样说过了,每个提供回调办法的函数都有两个异步(Async)变体,异步就是另外起一个线程

        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {log.info("前序操作");
            return "前需操作后果";
        }).thenApplyAsync(result -> {log.info("后续操作");
            return "后续操作后果";
        });

到这里,置信你串行的操作你曾经十分纯熟了

thenCompose

日常的工作中,通常定义的办法都会返回 CompletableFuture 类型,这样会给后续操作留有更多的余地,如果有这样的业务(X 呗是不是都有这样的业务呢?):

// 获取用户信息详情
    CompletableFuture<User> getUsersDetail(String userId) {return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("日拱一兵").build());
    }

    // 获取用户信用评级
    CompletableFuture<Double> getCreditRating(User user) {return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating());
    }

这时,如果咱们还是应用 thenApply() 办法来形容串行关系,返回的后果就会产生 CompletableFuture 的嵌套

        CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L)
                .thenApply(user -> completableFutureCompose.getCreditRating(user));

显然这不是咱们想要的,如果想“拍平”返回后果,thenCompose 办法就派上用场了

CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L)
                .thenCompose(user -> completableFutureCompose.getCreditRating(user));

这个和 Lambda 的 map 和 flatMap 的情理是一样一样滴

thenCombine

如果要聚合两个独立 Future 的后果,那么 thenCombine 就会派上用场了

        CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0);
        CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8);
        
        CompletableFuture<Double> combinedFuture = weightFuture
                .thenCombine(heightFuture, (weight, height) -> {
                    Double heightInMeter = height/100;
                    return weight/(heightInMeter*heightInMeter);
                });

        log.info("身材 BMI 指标 -" + combinedFuture.get());

当然这里少数时解决两个 Future 的关系,如果超过两个 Future,如何解决他们的一些聚合关系呢?

allOf | anyOf

置信你看到办法的签名,你曾经明确他的用途了,这里就不再介绍了

static CompletableFuture<Void>     allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

接下来就是异样的解决了

exceptionally

        Integer age = -1;

        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {if( age < 0) {throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {return "大家都是成年人";} else {return "未成年禁止入内";}
        }).thenApply((str) -> {log.info("游戏开始");
            return str;
        }).exceptionally(ex -> {log.info("必有蹊跷,来者" + ex.getMessage());
            return "Unknown!";
        });

        log.info(maturityFuture.get());

exceptionally 就相当于 catch,出现异常,将会跳过 thenApply 的后续操作,间接捕捉异样,进行一场解决

handle

用多线程,良好的习惯是应用 try/finally 范式,handle 就能够起到 finally 的作用,对上述程序做一个小小的更改,handle 承受两个参数,一个是失常返回值,一个是异样

留神:handle 的写法也算是范式的一种

        Integer age = -1;

        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {if( age < 0) {throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {return "大家都是成年人";} else {return "未成年禁止入内";}
        }).thenApply((str) -> {log.info("游戏开始");
            return str;
        }).handle((res, ex) -> {if(ex != null) {log.info("必有蹊跷,来者" + ex.getMessage());
                return "Unknown!";
            }
            return res;
        });

        log.info(maturityFuture.get());

到这里,对于 CompletableFuture 的根本应用你曾经理解的差不多了,不晓得你是否留神,咱们后面说的带有 Sync 的办法是独自起一个线程来执行,然而咱们并没有创立线程,这是怎么实现的呢?

仔细的敌人如果认真看每个变种函数的第三个办法兴许会发现外面都有一个 Executor 类型的参数,用于指定线程池,因为理论业务中咱们是谨严手动创立线程的,这在 我会手动创立线程,为什么要应用线程池? 文章中明确阐明过;如果没有指定线程池,那天然就会有一个默认的线程池,也就是 ForkJoinPool

private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool 的线程数默认是 CPU 的外围数。然而,在前序文章中明确阐明过:

不要所有业务共用一个线程池,因为,一旦有工作执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个零碎的性能

总结

CompletableFuture 的办法并没有全副介绍齐全,也没必要全副介绍,置信大家依照这个思路来了解 CompletableFuture 也不会有什么大问题了,剩下的就交给 实际 / 工夫 以及本人的领会了

后记

你认为 JDK1.8 CompletableFuture 曾经很完满了是不是,但追去完满的路线上永无止境,Java 9 对CompletableFuture 又做了局部降级和革新

  1. 增加了新的工厂办法
  2. 反对提早和超时解决

    orTimeout()
    completeOnTimeout()
  3. 改良了对子类的反对

详情能够查看:Java 9 CompletableFuture API Improvements. 怎么疾速的切换不同 Java 版本来尝鲜?SDKMAN 对立灵便治理多版本 Java 这篇文章的办法送给你

最初咱们再泡一壶茶,感受一下新变动吧

灵魂诘问

  1. 据说 ForkJoinPool 线程池效率更高,为什么呢?
  2. 如果批量解决异步程序,有什么可用的计划吗?

参考

  1. Java 并发编程实战
  2. Java 并发编程的艺术
  3. Java 并发编程之美
  4. https://www.baeldung.com/java…
  5. https://www.callicoder.com/ja…

日拱一兵 | 原创

正文完
 0