CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,反对通过函数式编程的形式对各类操作进行组合编排。相比于ListenableFuture,CompletableFuture无效晋升了代码的可读性,解决了“回调天堂”的问题。本文次要讲述CompletableFuture的原理与实际,同时联合了美团外卖商家端API的异步化实战,心愿能对从事相干开发的同学有所帮忙或启发。

0 背景

随着订单量的持续上升,美团外卖各零碎服务面临的压力也越来越大。作为外卖链路的外围环节,商家端提供了商家接单、配送等一系列外围性能,业务对系统吞吐量的要求也越来越高。而商家端API服务是流量入口,所有商家端流量都会由其调度、聚合,对外面向商家提供性能接口,对内调度各个上游服务获取数据进行聚合,具备显明的I/O密集型(I/O Bound)特点。在以后日订单规模已达千万级的状况下,应用同步加载形式的弊病逐步浮现,因而咱们开始思考将同步加载改为并行加载的可行性。

1 为何须要并行加载

外卖商家端API服务是典型的I/O密集型(I/O Bound)服务。除此之外,美团外卖商家端交易业务还有两个比拟大的特点:

  • 服务端必须一次返回订单卡片所有内容:依据商家端和服务端的“增量同步协定注1”,服务端必须一次性返回订单的所有信息,蕴含订单主信息、商品、结算、配送、用户信息、骑手信息、餐损、退款、客服赔付(参照上面订单卡片截图)等,须要从上游三十多个服务中获取数据。在特定条件下,如第一次登录和长时间没登录的状况下,客户端会分页拉取多个订单,这样发动的近程调用会更多。
  • 商家端和服务端交互频繁:商家对订单状态变动敏感,多种推拉机制保障每次变更可能触达商家,导致App和服务端的交互频繁,每次变更须要拉取订单最新的全部内容。

在外卖交易链路如此大的流量下,为了保障商家的用户体验,保障接口的高性能,并行从上游获取数据就成为必然。

2 并行加载的实现形式

并行从上游获取数据,从IO模型上来讲分为同步模型异步模型

2.1 同步模型

从各个服务获取数据最常见的是同步调用,如下图所示:

在同步调用的场景下,接口耗时长、性能差,接口响应时长T > T1+T2+T3+……+Tn,这时为了缩短接口的响应工夫,个别会应用线程池的形式并行获取数据,商家端订单卡片的组装正是应用了这种形式。

这种形式因为以下两个起因,导致资源利用率比拟低:

  • CPU资源大量节约在阻塞期待上,导致CPU资源利用率低。在Java 8之前,个别会通过回调的形式来缩小阻塞,然而大量应用回调,又引发臭名远扬的回调天堂问题,导致代码可读性和可维护性大大降低。
  • 为了减少并发度,会引入更多额定的线程池,随着CPU调度线程数的减少,会导致更重大的资源争用,贵重的CPU资源被损耗在上下文切换上,而且线程自身也会占用系统资源,且不能有限减少。

同步模型下,会导致硬件资源无奈充分利用,零碎吞吐量容易达到瓶颈。

2.2 NIO异步模型

咱们次要通过以下两种形式来缩小线程池的调度开销和阻塞工夫:

  • 通过RPC NIO异步调用的形式能够升高线程数,从而升高调度(上下文切换)开销,如Dubbo的异步调用能够参考《dubbo调用端异步》一文。
  • 通过引入CompletableFuture(下文简称CF)对业务流程进行编排,升高依赖之间的阻塞。本文次要讲述CompletableFuture的应用和原理。

2.3 为什么会抉择CompletableFuture?

咱们首先对业界宽泛风行的解决方案做了横向调研,次要包含Future、CompletableFuture注2、RxJava、Reactor。它们的个性比照如下:

FutureCompletableFutureRxJavaReactor
Composable(可组合)✔️✔️✔️
Asynchronous(异步)✔️✔️✔️✔️
Operator fusion(操作交融)✔️✔️
Lazy(提早执行)✔️✔️
Backpressure(回压)✔️✔️
  • 可组合:能够将多个依赖操作通过不同的形式进行编排,例如CompletableFuture提供thenCompose、thenCombine等各种then结尾的办法,这些办法就是对“可组合”个性的反对。
  • 操作交融:将数据流中应用的多个操作符以某种形式联合起来,进而升高开销(工夫、内存)。
  • 提早执行:操作不会立刻执行,当收到明确批示时操作才会触发。例如Reactor只有当有订阅者订阅时,才会触发操作。
  • 回压:某些异步阶段的处理速度跟不上,间接失败会导致大量数据的失落,对业务来说是不能承受的,这时须要反馈上游生产者升高调用量。

RxJava与Reactor显然更加弱小,它们提供了更多的函数调用形式,反对更多个性,但同时也带来了更大的学习老本。而咱们本次整合最须要的个性就是“异步”、“可组合”,综合思考后,咱们抉择了学习老本绝对较低的CompletableFuture。

3 CompletableFuture应用与原理

3.1 CompletableFuture的背景和定义

3.1.1 CompletableFuture解决的问题

CompletableFuture是由Java 8引入的,在Java8之前咱们个别通过Future实现异步。

  • Future用于示意异步计算的后果,只能通过阻塞或者轮询的形式获取后果,而且不反对设置回调办法,Java 8之前若要设置回调个别会应用guava的ListenableFuture,回调的引入又会导致臭名远扬的回调天堂(上面的例子会通过ListenableFuture的应用来具体进行展现)。
  • CompletableFuture对Future进行了扩大,能够通过设置回调的形式解决计算结果,同时也反对组合操作,反对进一步的编排,同时肯定水平解决了回调天堂的问题。

上面将举例来说明,咱们通过ListenableFuture、CompletableFuture来实现异步的差别。假如有三个操作step1、step2、step3存在依赖关系,其中step3的执行依赖step1和step2的后果。

Future(ListenableFuture)的实现(回调天堂)如下:

ExecutorService executor = Executors.newFixedThreadPool(5);ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);ListenableFuture<String> future1 = guavaExecutor.submit(() -> {    //step 1    System.out.println("执行step 1");    return "step1 result";});ListenableFuture<String> future2 = guavaExecutor.submit(() -> {    //step 2    System.out.println("执行step 2");    return "step2 result";});ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);Futures.addCallback(future1And2, new FutureCallback<List<String>>() {    @Override    public void onSuccess(List<String> result) {        System.out.println(result);        ListenableFuture<String> future3 = guavaExecutor.submit(() -> {            System.out.println("执行step 3");            return "step3 result";        });        Futures.addCallback(future3, new FutureCallback<String>() {            @Override            public void onSuccess(String result) {                System.out.println(result);            }                    @Override            public void onFailure(Throwable t) {            }        }, guavaExecutor);    }    @Override    public void onFailure(Throwable t) {    }}, guavaExecutor);

CompletableFuture的实现如下:

ExecutorService executor = Executors.newFixedThreadPool(5);CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {    System.out.println("执行step 1");    return "step1 result";}, executor);CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {    System.out.println("执行step 2");    return "step2 result";});cf1.thenCombine(cf2, (result1, result2) -> {    System.out.println(result1 + " , " + result2);    System.out.println("执行step 3");    return "step3 result";}).thenAccept(result3 -> System.out.println(result3));

显然,CompletableFuture的实现更为简洁,可读性更好。

3.1.2 CompletableFuture的定义

CompletableFuture实现了两个接口(如上图所示):Future、CompletionStage。Future示意异步计算的后果,CompletionStage用于示意异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着以后步骤的实现,也可能会触发其余一系列CompletionStage的执行。从而咱们能够依据理论业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,咱们能够通过其提供的thenAppy、thenCompose等函数式编程办法来组合编排这些步骤。

3.2 CompletableFuture的应用

上面咱们通过一个例子来解说CompletableFuture如何应用,应用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的实现会触发另外一系列依赖它的CompletableFuture的执行:

如上图所示,这里描述的是一个业务接口的流程,其中包含CF1\CF2\CF3\CF4\CF5共5个步骤,并描述了这些步骤之间的依赖关系,每个步骤能够是一次RPC调用、一次数据库操作或者是一次本地办法调用等,在应用CompletableFuture进行异步化编程时,图中的每个步骤都会产生一个CompletableFuture对象,最终后果也会用一个CompletableFuture来进行示意。

依据CompletableFuture依赖数量,能够分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。

3.2.1 零依赖:CompletableFuture的创立

咱们先看下如何不依赖其余CompletableFuture来创立新的CompletableFuture:

如上图红色链路所示,接口接管到申请后,首先发动两个异步调用CF1、CF2,次要有三种形式:

ExecutorService executor = Executors.newFixedThreadPool(5);//1、应用runAsync或supplyAsync发动异步调用CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {  return "result1";}, executor);//2、CompletableFuture.completedFuture()间接创立一个已实现状态的CompletableFutureCompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");//3、先初始化一个未实现的CompletableFuture,而后通过complete()、completeExceptionally(),实现该CompletableFutureCompletableFuture<String> cf = new CompletableFuture<>();cf.complete("success");

第三种形式的一个典型应用场景,就是将回调办法转为CompletableFuture,而后再依赖CompletableFure的能力进行调用编排,示例如下:

@FunctionalInterfacepublic interface ThriftAsyncCall {    void invoke() throws TException;} /**  * 该办法为美团外部rpc注册监听的封装,能够作为其余实现的参照  * OctoThriftCallback 为thrift回调办法  * ThriftAsyncCall 为自定义函数,用来示意一次thrift调用(定义如上)  */  public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) {   //新建一个未实现的CompletableFuture   CompletableFuture<T> resultFuture = new CompletableFuture<>();   //监听回调的实现,并且与CompletableFuture同步状态   callback.addObserver(new OctoObserver<T>() {       @Override       public void onSuccess(T t) {           resultFuture.complete(t);       }       @Override       public void onFailure(Throwable throwable) {           resultFuture.completeExceptionally(throwable);       }   });   if (thriftCall != null) {       try {           thriftCall.invoke();       } catch (TException e) {           resultFuture.completeExceptionally(e);       }   }   return resultFuture;  }

3.2.2 一元依赖:依赖一个CF

如上图红色链路所示,CF3,CF5别离依赖于CF1和CF2,这种对于单个CompletableFuture的依赖能够通过thenApply、thenAccept、thenCompose等办法来实现,代码如下所示:

CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {  //result1为CF1的后果  //......  return "result3";});CompletableFuture<String> cf5 = cf2.thenApply(result2 -> {  //result2为CF2的后果  //......  return "result5";});

3.2.3 二元依赖:依赖两个CF

如上图红色链路所示,CF4同时依赖于两个CF1和CF2,这种二元依赖能够通过thenCombine等回调来实现,如下代码所示:

CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {  //result1和result2别离为cf1和cf2的后果  return "result4";});

3.2.4 多元依赖:依赖多个CF

如上图红色链路所示,整个流程的完结依赖于三个步骤CF3、CF4、CF5,这种多元依赖能够通过allOfanyOf办法来实现,区别是当须要多个依赖全副实现时应用allOf,当多个依赖中的任意一个实现即可时应用anyOf,如下代码所示:

CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);CompletableFuture<String> result = cf6.thenApply(v -> {  //这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全副实现时,才会执行 。  result3 = cf3.join();  result4 = cf4.join();  result5 = cf5.join();  //依据result3、result4、result5组装最终result;  return "result";});

3.3 CompletableFuture原理

CompletableFuture中蕴含两个字段:resultstack。result用于存储以后CF的后果,stack(Completion)示意以后CF实现后须要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作能够有多个(示意有多个依赖它的CF),以栈(Treiber stack)的模式存储,stack示意栈顶元素。

这种形式相似“观察者模式”,依赖动作(Dependency Action)都封装在一个独自Completion子类中。上面是Completion类关系结构图。CompletableFuture中的每个办法都对应了图中的一个Completion的子类,Completion自身是观察者的基类。

  • UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
  • BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。

3.3.1 CompletableFuture的设计思维

依照相似“观察者模式”的设计思维,原理剖析能够从“观察者”和“被观察者”两个方面着手。因为回调品种多,但构造差别不大,所以这里单以一元依赖中的thenApply为例,不再枚举全副回调类型。如下图所示:

3.3.1.1 被观察者

  1. 每个CompletableFuture都能够被看作一个被观察者,其外部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行实现后会弹栈stack属性,顺次告诉注册到其中的观察者。下面例子中步骤fn2就是作为观察者被封装在UniApply中。
  2. 被观察者CF中的result属性,用来存储返回后果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在下面的例子中对应步骤fn1的执行后果。

3.3.1.2 观察者

CompletableFuture反对很多回调办法,例如thenAccept、thenApply、exceptionally等,这些办法接管一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,而后查看以后CF是否已处于实现状态(即result != null),如果已实现间接触发fn,否则将观察者Completion退出到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行结束之后告诉触发。

  1. 观察者中的dep属性:指向其对应的CompletableFuture,在下面的例子中dep指向CF2。
  2. 观察者中的src属性:指向其依赖的CompletableFuture,在下面的例子中src指向CF1。
  3. 观察者Completion中的fn属性:用来存储具体的期待被回调的函数。这里须要留神的是不同的回调办法(thenAccept、thenApply、exceptionally等)接管的函数类型也不同,即fn的类型有很多种,在下面的例子中fn指向fn2。

3.3.2 整体流程

3.3.2.1 一元依赖

这里依然以thenApply为例来阐明一元依赖的流程:

  1. 将观察者Completion注册到CF1,此时CF1将Completion压栈。
  2. 当CF1的操作运行实现时,会将后果赋值给CF1中的result属性。
  3. 顺次弹栈,告诉观察者尝试运行。

初步流程设计如上图所示,这里有几个对于注册与告诉的并发问题,大家能够思考下:

Q1:在观察者注册之前,如果CF曾经执行实现,并且曾经发出通知,那么这时观察者因为错过了告诉是不是将永远不会被触发呢 ?
A1:不会。在注册时查看依赖的CF是否曾经实现。如果未实现(即result == null)则将观察者入栈,如果已实现(result != null)则间接触发观察者操作。

Q2:在”入栈“前会有”result == null“的判断,这两个操作为非原子操作,CompletableFufure的实现也没有对两个操作进行加锁,实现工夫在这两个操作之间,观察者依然得不到告诉,是不是依然无奈触发?

A2:不会。入栈之后再次查看CF是否实现,如果实现则触发。

Q3:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF实现的时候都会进行,那么会不会导致一个操作被屡次执行呢 ?如下图所示,即当CF1、CF2同时实现时,如何防止CF3被屡次触发。

A3:CompletableFuture的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者曾经执行过了,那么CAS操作将会失败,勾销执行。

通过对以上3个问题的剖析能够看出,CompletableFuture在解决并行问题时,全程无加锁操作,极大地提高了程序的执行效率。咱们将并行问题思考纳入之后,能够失去欠缺的整体流程图如下所示:

CompletableFuture反对的回调办法非常丰盛,然而正如上一章节的整体流程图所述,他们的整体流程是统一的。所有回调复用同一套流程架构,不同的回调监听通过策略模式实现差异化。

3.3.2.2 二元依赖

咱们以thenCombine为例来阐明二元依赖:

thenCombine操作示意依赖两个CompletableFuture。其观察者实现类为BiApply,如上图所示,BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未实现的状况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF实现时都会尝试触发观察者BiApply,BiApply会查看两个依赖是否都实现,如果实现则开始执行。这里为了解决反复触发的问题,同样用的是上一章节提到的CAS操作,执行时会先通过CAS设置状态位,防止反复触发。

3.3.2.3 多元依赖

依赖多个CompletableFuture的回调办法包含allOfanyOf,区别在于allOf观察者实现类为BiRelay,须要所有被依赖的CF实现后才会执行回调;而anyOf观察者实现类为OrRelay,任意一个被依赖的CF实现后就会触发。二者的实现形式都是将多个被依赖的CF构建成一棵均衡二叉树,执行后果层层告诉,直到根节点,触发回调监听。

3.3.3 小结

本章节为CompletableFuture实现原理的科普,旨在尝试不粘贴源码,而通过结构图、流程图以及搭配文字描述把CompletableFuture的实现原理讲述分明。把艰涩的源码翻译为“整体流程”章节的流程图,并且将并发解决的逻辑融入,便于大家了解。

4 实际总结

在商家端API异步化的过程中,咱们遇到了一些问题,这些问题有的会比拟荫蔽,上面把这些问题的解决教训整理出来。心愿能帮忙到更多的同学,大家能够少踩一些坑。

4.1 线程阻塞问题

4.1.1 代码执行在哪个线程上?

要正当治理线程资源,最根本的前提条件就是要在写代码时,分明地晓得每一行代码都将执行在哪个线程上。上面咱们看一下CompletableFuture的执行线程状况。

CompletableFuture实现了CompletionStage接口,通过丰盛的回调办法,反对各种组合操作,每种组合场景都有同步和异步两种办法。

同步办法(即不带Async后缀的办法)有两种状况。

  • 如果注册时被依赖的操作曾经执行实现,则间接由以后线程执行。
  • 如果注册时被依赖的操作还未执行完,则由回调线程执行。

异步办法(即带Async后缀的办法):能够抉择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会应用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的利用,线程数可能成为瓶颈)。

例如:

ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {    System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());    //业务操作    return "";}, threadPool1);//此时,如果future1中的业务操作曾经执行结束并返回,则该thenApply间接由以后main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。future1.thenApply(value -> {    System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());    return value + "1";});//应用ForkJoinPool中的共用线程池CommonPoolfuture1.thenApplyAsync(value -> {//do something  return value + "1";});//应用指定线程池future1.thenApplyAsync(value -> {//do something  return value + "1";}, threadPool1);

4.2 线程池须知

4.2.1 异步回调要传线程池

后面提到,异步回调办法能够抉择是否传递线程池参数Executor,这里咱们倡议强制传线程池,且依据理论状况做线程池隔离

当不传递线程池时,会应用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,外围线程数=处理器数量-1(单核外围线程数为1),所有异步回调都会共用该CommonPool,外围与非核心业务都竞争同一个池中的线程,很容易成为零碎瓶颈。手动传递线程池参数能够更不便的调节参数,并且能够给不同的业务调配不同的线程池,以求资源隔离,缩小不同业务之间的互相烦扰。

4.2.2 线程池循环援用会导致死锁

public Object doGet() {  ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {  //do sth    return CompletableFuture.supplyAsync(() -> {        System.out.println("child");        return "child";      }, threadPool1).join();//子工作    }, threadPool1);  return cf1.join();}

如上代码块所示,doGet办法第三行通过supplyAsync向threadPool1申请线程,并且外部子工作又向threadPool1申请线程。threadPool1大小为10,当同一时刻有10个申请达到,则threadPool1被打满,子工作申请线程时进入阻塞队列排队,然而父工作的实现又依赖于子工作,这时因为子工作得不到线程,父工作无奈实现。主线程执行cf1.join()进入阻塞状态,并且永远无奈复原。

为了修复该问题,须要将父工作与子工作做线程池隔离,两个工作申请不同的线程池,防止循环依赖导致的阻塞。

4.2.3 异步RPC调用留神不要阻塞IO线程池

服务异步化后很多步骤都会依赖于异步RPC调用的后果,这时须要特地留神一点,如果是应用基于NIO(比方Netty)的异步RPC,则返回后果是由IO线程负责设置的,即回调办法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的办法)如果依赖的异步RPC调用的返回后果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时须要保障同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行实现前,IO线程将始终被占用,影响整个服务的响应。

4.3 其余

4.3.1 异样解决

因为异步执行的工作在其余线程上执行,而异样信息存储在线程栈中,因而以后线程除非阻塞期待返回后果,否则无奈通过try\catch捕捉异样。CompletableFuture提供了异样捕捉回调exceptionally,相当于同步调用中的try\catch。应用办法如下所示:

@Autowiredprivate WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//外部接口public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {    CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务办法,外部会发动异步rpc调用    return remarkResultFuture      .exceptionally(err -> {//通过exceptionally 捕捉异样,打印日志并返回默认值         log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, err);         return 0;      });}

有一点须要留神,CompletableFuture在回调办法中对异样进行了包装。大部分异样会封装成CompletionException后抛出,真正的异样存储在cause属性中,因而如果调用链中通过了回调办法解决那么就须要用Throwable.getCause()办法提取真正的异样。然而,有些状况下会间接返回真正的异样(Stack Overflow的探讨),最好应用工具类提取异样,如下代码所示:

@Autowiredprivate WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//外部接口public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {    CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务办法,外部会发动异步rpc调用    return remarkResultFuture          .thenApply(result -> {//这里减少了一个回调办法thenApply,如果产生异样thenApply外部会通过new CompletionException(throwable) 对异样进行包装      //这里是一些业务操作        })      .exceptionally(err -> {//通过exceptionally 捕捉异样,这里的err曾经被thenApply包装过,因而须要通过Throwable.getCause()提取异样         log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));         return 0;      });}

下面代码中用到了一个自定义的工具类ExceptionUtils,用于CompletableFuture的异样提取,在应用CompletableFuture做异步编程时,能够间接应用该工具类解决异样。实现代码如下:

public class ExceptionUtils {    public static Throwable extractRealException(Throwable throwable) {          //这里判断异样类型是否为CompletionException、ExecutionException,如果是则进行提取,否则间接返回。        if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {            if (throwable.getCause() != null) {                return throwable.getCause();            }        }        return throwable;    }}

4.3.2 积淀的工具办法介绍

在实际过程中咱们积淀了一些通用的工具办法,在应用CompletableFuture开发时能够间接拿来应用,详情参见“附录”。

5 异步化收益

通过异步化革新,美团商家端API零碎的性能失去显著晋升,与革新前比照的收益如下:

  • 外围接口吞吐量大幅晋升,其中订单轮询接口革新前TP99为754ms,革新后降为408ms。
  • 服务器数量缩小1/3。

6 参考文献

  1. CompletableFuture (Java Platform SE 8 )
  2. java - Does CompletionStage always wrap exceptions in CompletionException? - Stack Overflow
  3. exception - Surprising behavior of Java 8 CompletableFuture exceptionally method - Stack Overflow
  4. 文档 | Apache Dubbo

7 名词解释及备注

注1:“增量同步”是指商家客户端与服务端之间的订单增量数据同步协定,客户端应用该协定获取新增订单以及状态发生变化的订单。

注2:本文波及到的所有技术点依赖的Java版本为JDK 8,CompletableFuture反对的个性剖析也是基于该版本。

附录

自定义函数

@FunctionalInterfacepublic interface ThriftAsyncCall {    void invoke() throws TException ;}

CompletableFuture解决工具类

/** * CompletableFuture封装工具类 */@Slf4jpublic class FutureUtils {/** * 该办法为美团外部rpc注册监听的封装,能够作为其余实现的参照 * OctoThriftCallback 为thrift回调办法 * ThriftAsyncCall 为自定义函数,用来示意一次thrift调用(定义如上) */public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) {    CompletableFuture<T> thriftResultFuture = new CompletableFuture<>();    callback.addObserver(new OctoObserver<T>() {        @Override        public void onSuccess(T t) {            thriftResultFuture.complete(t);        }        @Override        public void onFailure(Throwable throwable) {            thriftResultFuture.completeExceptionally(throwable);        }    });    if (thriftCall != null) {        try {            thriftCall.invoke();        } catch (TException e) {            thriftResultFuture.completeExceptionally(e);        }    }    return thriftResultFuture;}  /**   * 设置CF状态为失败   */  public static <T> CompletableFuture<T> failed(Throwable ex) {   CompletableFuture<T> completableFuture = new CompletableFuture<>();   completableFuture.completeExceptionally(ex);   return completableFuture;  }  /**   * 设置CF状态为胜利   */  public static <T> CompletableFuture<T> success(T result) {   CompletableFuture<T> completableFuture = new CompletableFuture<>();   completableFuture.complete(result);   return completableFuture;  }  /**   * 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>   */  public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures) {   return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream()                   .map(CompletableFuture::join)                   .collect(Collectors.toList())           );  }  /**   * 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>   * 多用于分页查问的场景   */  public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures) {   return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream()                   .flatMap( listFuture -> listFuture.join().stream())                   .collect(Collectors.toList())           );  }  /*   * 将List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>>   * @Param mergeFunction 自定义key抵触时的merge策略   */  public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(       Collection<CompletableFuture<Map<K, V>>> completableFutures, BinaryOperator<V> mergeFunction) {   return CompletableFuture           .allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream().map(CompletableFuture::join)                   .flatMap(map -> map.entrySet().stream())                   .collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction)));  }  /**   * 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>,并过滤调null值   */  public static <T> CompletableFuture<List<T>> sequenceNonNull(Collection<CompletableFuture<T>> completableFutures) {   return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream()                   .map(CompletableFuture::join)                   .filter(e -> e != null)                   .collect(Collectors.toList())           );  }  /**   * 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>,并过滤调null值   * 多用于分页查问的场景   */  public static <T> CompletableFuture<List<T>> sequenceListNonNull(Collection<CompletableFuture<List<T>>> completableFutures) {   return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream()                   .flatMap( listFuture -> listFuture.join().stream().filter(e -> e != null))                   .collect(Collectors.toList())           );  }  /**   * 将List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>>   * @Param filterFunction 自定义过滤策略   */  public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures,                                                     Predicate<? super T> filterFunction) {   return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream()                   .map(CompletableFuture::join)                   .filter(filterFunction)                   .collect(Collectors.toList())           );  }  /**   * 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>   * @Param filterFunction 自定义过滤策略   */  public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures,                                                         Predicate<? super T> filterFunction) {   return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream()                   .flatMap( listFuture -> listFuture.join().stream().filter(filterFunction))                   .collect(Collectors.toList())           );  }/** * 将CompletableFuture<Map<K,V>>的list转为 CompletableFuture<Map<K,V>>。 多个map合并为一个map。 如果key抵触,采纳新的value笼罩。 */  public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(       Collection<CompletableFuture<Map<K, V>>> completableFutures) {   return CompletableFuture           .allOf(completableFutures.toArray(new CompletableFuture<?>[0]))           .thenApply(v -> completableFutures.stream().map(CompletableFuture::join)                   .flatMap(map -> map.entrySet().stream())                   .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> b)));  }}

异样提取工具类

  public class ExceptionUtils {   /**    * 提取真正的异样    */   public static Throwable extractRealException(Throwable throwable) {       if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {           if (throwable.getCause() != null) {               return throwable.getCause();           }       }       return throwable;   }  }

打印日志

  @Slf4j  public abstract class AbstractLogAction<R> {  protected final String methodName;  protected final Object[] args;public AbstractLogAction(String methodName, Object... args) {    this.methodName = methodName;    this.args = args;}protected void logResult(R result, Throwable throwable) {    if (throwable != null) {        boolean isBusinessError = throwable instanceof TBase || (throwable.getCause() != null && throwable                .getCause() instanceof TBase);        if (isBusinessError) {            logBusinessError(throwable);        } else if (throwable instanceof DegradeException || throwable instanceof DegradeRuntimeException) {//这里为外部rpc框架抛出的异样,应用时能够酌情批改            if (RhinoSwitch.getBoolean("isPrintDegradeLog", false)) {                log.error("{} degrade exception, param:{} , error:{}", methodName, args, throwable);            }        } else {            log.error("{} unknown error, param:{} , error:{}", methodName, args, ExceptionUtils.extractRealException(throwable));        }    } else {        if (isLogResult()) {            log.info("{} param:{} , result:{}", methodName, args, result);        } else {            log.info("{} param:{}", methodName, args);        }    }}private void logBusinessError(Throwable throwable) {    log.error("{} business error, param:{} , error:{}", methodName, args, throwable.toString(), ExceptionUtils.extractRealException(throwable));}private boolean isLogResult() {      //这里是动静配置开关,用于动态控制日志打印,开源动静配置核心能够应用nacos、apollo等,如果我的项目没有应用配置核心则能够删除    return RhinoSwitch.getBoolean(methodName + "_isLogResult", false);}}

日志解决实现类

/** * 产生异样时,依据是否为业务异样打印日志。 * 跟CompletableFuture.whenComplete配合应用,不扭转completableFuture的后果(失常OR异样) */@Slf4jpublic class LogErrorAction<R> extends AbstractLogAction<R> implements BiConsumer<R, Throwable> {public LogErrorAction(String methodName, Object... args) {    super(methodName, args);}@Overridepublic void accept(R result, Throwable throwable) {    logResult(result, throwable);}}

打印日志形式

completableFuture.whenComplete(  new LogErrorAction<>("orderService.getOrder", params));

异常情况返回默认值

/** * 当产生异样时返回自定义的值 */public class DefaultValueHandle<R> extends AbstractLogAction<R> implements BiFunction<R, Throwable, R> {    private final R defaultValue;/** * 当返回值为空的时候是否替换为默认值 */private final boolean isNullToDefault;/** * @param methodName      办法名称 * @param defaultValue 当异样产生时自定义返回的默认值 * @param args            办法入参 */  public DefaultValueHandle(String methodName, R defaultValue, Object... args) {   super(methodName, args);   this.defaultValue = defaultValue;   this.isNullToDefault = false;  }/** * @param isNullToDefault * @param defaultValue 当异样产生时自定义返回的默认值 * @param methodName      办法名称 * @param args            办法入参 */  public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {   super(methodName, args);   this.defaultValue = defaultValue;   this.isNullToDefault = isNullToDefault;  }@Overridepublic R apply(R result, Throwable throwable) {    logResult(result, throwable);    if (throwable != null) {        return defaultValue;    }    if (result == null && isNullToDefault) {        return defaultValue;    }    return result;}public static <R> DefaultValueHandle.DefaultValueHandleBuilder<R> builder() {    return new DefaultValueHandle.DefaultValueHandleBuilder<>();}public static class DefaultValueHandleBuilder<R> {    private boolean isNullToDefault;    private R defaultValue;    private String methodName;    private Object[] args;    DefaultValueHandleBuilder() {    }    public DefaultValueHandle.DefaultValueHandleBuilder<R> isNullToDefault(final boolean isNullToDefault) {        this.isNullToDefault = isNullToDefault;        return this;    }    public DefaultValueHandle.DefaultValueHandleBuilder<R> defaultValue(final R defaultValue) {        this.defaultValue = defaultValue;        return this;    }    public DefaultValueHandle.DefaultValueHandleBuilder<R> methodName(final String methodName) {        this.methodName = methodName;        return this;    }    public DefaultValueHandle.DefaultValueHandleBuilder<R> args(final Object... args) {        this.args = args;        return this;    }    public DefaultValueHandle<R> build() {        return new DefaultValueHandle<R>(this.isNullToDefault, this.defaultValue, this.methodName, this.args);    }    public String toString() {        return "DefaultValueHandle.DefaultValueHandleBuilder(isNullToDefault=" + this.isNullToDefault + ", defaultValue=" + this.defaultValue + ", methodName=" + this.methodName + ", args=" + Arrays.deepToString(this.args) + ")";    }}

默认返回值利用示例

completableFuture.handle(new DefaultValueHandle<>("orderService.getOrder", Collections.emptyMap(), params));

本文作者

长发、旭孟、向鹏,均来自美团外卖商家组技术团队。

招聘信息

美团外卖商家组技术团队,通过技术手段服务于百万商家,涵盖客户、合同、商品、交易、成长等多个业务方向构建商家端系统,同时晋升餐饮外卖商家的数字化经营程度,帮忙美团建设丰盛的供应,为用户提供更加丰盛、多样的可选择性。

美团外卖商家零碎,既有日千万量级订单下的稳定性挑战,又具备B端特有的业务复杂性,同时也在商家生态、商家经营、智能硬件等方向翻新与摸索。通过在高可用、畛域驱动设计、微服务等技术方向继续实际,积攒了丰盛的技术教训。

欢送退出美团外卖商家组技术团队,感兴趣的同学能够将简历发送至pingxumeng@meituan.com

浏览美团技术团队更多技术文章合集

前端 | 算法 | 后端 | 数据 | 平安 | 运维 | iOS | Android | 测试

| 在公众号菜单栏对话框回复【2021年货】、【2020年货】、【2019年货】、【2018年货】、【2017年货】等关键词,可查看美团技术团队历年技术文章合集。

| 本文系美团技术团队出品,著作权归属美团。欢送出于分享和交换等非商业目标转载或应用本文内容,敬请注明“内容转载自美团技术团队”。本文未经许可,不得进行商业性转载或者应用。任何商用行为,请发送邮件至tech@meituan.com申请受权。