CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,反对通过函数式编程的形式对各类操作进行组合编排。相比于ListenableFuture,CompletableFuture无效晋升了代码的可读性,解决了“回调天堂”的问题。本文次要讲述CompletableFuture的原理与实际,同时联合了美团外卖商家端API的异步化实战,心愿能对从事相干开发的同学有所帮忙或启发。
0 背景
随着订单量的持续上升,美团外卖各零碎服务面临的压力也越来越大。作为外卖链路的外围环节,商家端提供了商家接单、配送等一系列外围性能,业务对系统吞吐量的要求也越来越高。而商家端API服务是流量入口,所有商家端流量都会由其调度、聚合,对外面向商家提供性能接口,对内调度各个上游服务获取数据进行聚合,具备显明的I/O密集型(I/O Bound)特点。在以后日订单规模已达千万级的状况下,应用同步加载形式的弊病逐步浮现,因而咱们开始思考将同步加载改为并行加载的可行性。
1 为何须要并行加载
外卖商家端API服务是典型的I/O密集型(I/O Bound)服务。除此之外,美团外卖商家端交易业务还有两个比拟大的特点:
- 服务端必须一次返回订单卡片所有内容:依据商家端和服务端的“增量同步协定注1”,服务端必须一次性返回订单的所有信息,蕴含订单主信息、商品、结算、配送、用户信息、骑手信息、餐损、退款、客服赔付(参照上面订单卡片截图)等,须要从上游三十多个服务中获取数据。在特定条件下,如第一次登录和长时间没登录的状况下,客户端会分页拉取多个订单,这样发动的近程调用会更多。
- 商家端和服务端交互频繁:商家对订单状态变动敏感,多种推拉机制保障每次变更可能触达商家,导致App和服务端的交互频繁,每次变更须要拉取订单最新的全部内容。
在外卖交易链路如此大的流量下,为了保障商家的用户体验,保障接口的高性能,并行从上游获取数据就成为必然。
2 并行加载的实现形式
并行从上游获取数据,从IO模型上来讲分为同步模型和异步模型。
2.1 同步模型
从各个服务获取数据最常见的是同步调用,如下图所示:
在同步调用的场景下,接口耗时长、性能差,接口响应时长T > T1+T2+T3+……+Tn,这时为了缩短接口的响应工夫,个别会应用线程池的形式并行获取数据,商家端订单卡片的组装正是应用了这种形式。
这种形式因为以下两个起因,导致资源利用率比拟低:
- CPU资源大量节约在阻塞期待上,导致CPU资源利用率低。在Java 8之前,个别会通过回调的形式来缩小阻塞,然而大量应用回调,又引发臭名远扬的回调天堂问题,导致代码可读性和可维护性大大降低。
- 为了减少并发度,会引入更多额定的线程池,随着CPU调度线程数的减少,会导致更重大的资源争用,贵重的CPU资源被损耗在上下文切换上,而且线程自身也会占用系统资源,且不能有限减少。
同步模型下,会导致硬件资源无奈充分利用,零碎吞吐量容易达到瓶颈。
2.2 NIO异步模型
咱们次要通过以下两种形式来缩小线程池的调度开销和阻塞工夫:
- 通过RPC NIO异步调用的形式能够升高线程数,从而升高调度(上下文切换)开销,如Dubbo的异步调用能够参考《dubbo调用端异步》一文。
- 通过引入CompletableFuture(下文简称CF)对业务流程进行编排,升高依赖之间的阻塞。本文次要讲述CompletableFuture的应用和原理。
2.3 为什么会抉择CompletableFuture?
咱们首先对业界宽泛风行的解决方案做了横向调研,次要包含Future、CompletableFuture注2、RxJava、Reactor。它们的个性比照如下:
Future | CompletableFuture | RxJava | Reactor | |
---|---|---|---|---|
Composable(可组合) | ❌ | ✔️ | ✔️ | ✔️ |
Asynchronous(异步) | ✔️ | ✔️ | ✔️ | ✔️ |
Operator fusion(操作交融) | ❌ | ❌ | ✔️ | ✔️ |
Lazy(提早执行) | ❌ | ❌ | ✔️ | ✔️ |
Backpressure(回压) | ❌ | ❌ | ✔️ | ✔️ |
- 可组合:能够将多个依赖操作通过不同的形式进行编排,例如CompletableFuture提供thenCompose、thenCombine等各种then结尾的办法,这些办法就是对“可组合”个性的反对。
- 操作交融:将数据流中应用的多个操作符以某种形式联合起来,进而升高开销(工夫、内存)。
- 提早执行:操作不会立刻执行,当收到明确批示时操作才会触发。例如Reactor只有当有订阅者订阅时,才会触发操作。
- 回压:某些异步阶段的处理速度跟不上,间接失败会导致大量数据的失落,对业务来说是不能承受的,这时须要反馈上游生产者升高调用量。
RxJava与Reactor显然更加弱小,它们提供了更多的函数调用形式,反对更多个性,但同时也带来了更大的学习老本。而咱们本次整合最须要的个性就是“异步”、“可组合”,综合思考后,咱们抉择了学习老本绝对较低的CompletableFuture。
3 CompletableFuture应用与原理
3.1 CompletableFuture的背景和定义
3.1.1 CompletableFuture解决的问题
CompletableFuture是由Java 8引入的,在Java8之前咱们个别通过Future实现异步。
- Future用于示意异步计算的后果,只能通过阻塞或者轮询的形式获取后果,而且不反对设置回调办法,Java 8之前若要设置回调个别会应用guava的ListenableFuture,回调的引入又会导致臭名远扬的回调天堂(上面的例子会通过ListenableFuture的应用来具体进行展现)。
- CompletableFuture对Future进行了扩大,能够通过设置回调的形式解决计算结果,同时也反对组合操作,反对进一步的编排,同时肯定水平解决了回调天堂的问题。
上面将举例来说明,咱们通过ListenableFuture、CompletableFuture来实现异步的差别。假如有三个操作step1、step2、step3存在依赖关系,其中step3的执行依赖step1和step2的后果。
Future(ListenableFuture)的实现(回调天堂)如下:
ExecutorService executor = Executors.newFixedThreadPool(5);ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);ListenableFuture<String> future1 = guavaExecutor.submit(() -> { //step 1 System.out.println("执行step 1"); return "step1 result";});ListenableFuture<String> future2 = guavaExecutor.submit(() -> { //step 2 System.out.println("执行step 2"); return "step2 result";});ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);Futures.addCallback(future1And2, new FutureCallback<List<String>>() { @Override public void onSuccess(List<String> result) { System.out.println(result); ListenableFuture<String> future3 = guavaExecutor.submit(() -> { System.out.println("执行step 3"); return "step3 result"; }); Futures.addCallback(future3, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println(result); } @Override public void onFailure(Throwable t) { } }, guavaExecutor); } @Override public void onFailure(Throwable t) { }}, guavaExecutor);
CompletableFuture的实现如下:
ExecutorService executor = Executors.newFixedThreadPool(5);CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { System.out.println("执行step 1"); return "step1 result";}, executor);CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { System.out.println("执行step 2"); return "step2 result";});cf1.thenCombine(cf2, (result1, result2) -> { System.out.println(result1 + " , " + result2); System.out.println("执行step 3"); return "step3 result";}).thenAccept(result3 -> System.out.println(result3));
显然,CompletableFuture的实现更为简洁,可读性更好。
3.1.2 CompletableFuture的定义
CompletableFuture实现了两个接口(如上图所示):Future、CompletionStage。Future示意异步计算的后果,CompletionStage用于示意异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着以后步骤的实现,也可能会触发其余一系列CompletionStage的执行。从而咱们能够依据理论业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,咱们能够通过其提供的thenAppy、thenCompose等函数式编程办法来组合编排这些步骤。
3.2 CompletableFuture的应用
上面咱们通过一个例子来解说CompletableFuture如何应用,应用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的实现会触发另外一系列依赖它的CompletableFuture的执行:
如上图所示,这里描述的是一个业务接口的流程,其中包含CF1\CF2\CF3\CF4\CF5共5个步骤,并描述了这些步骤之间的依赖关系,每个步骤能够是一次RPC调用、一次数据库操作或者是一次本地办法调用等,在应用CompletableFuture进行异步化编程时,图中的每个步骤都会产生一个CompletableFuture对象,最终后果也会用一个CompletableFuture来进行示意。
依据CompletableFuture依赖数量,能够分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。
3.2.1 零依赖:CompletableFuture的创立
咱们先看下如何不依赖其余CompletableFuture来创立新的CompletableFuture:
如上图红色链路所示,接口接管到申请后,首先发动两个异步调用CF1、CF2,次要有三种形式:
ExecutorService executor = Executors.newFixedThreadPool(5);//1、应用runAsync或supplyAsync发动异步调用CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { return "result1";}, executor);//2、CompletableFuture.completedFuture()间接创立一个已实现状态的CompletableFutureCompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");//3、先初始化一个未实现的CompletableFuture,而后通过complete()、completeExceptionally(),实现该CompletableFutureCompletableFuture<String> cf = new CompletableFuture<>();cf.complete("success");
第三种形式的一个典型应用场景,就是将回调办法转为CompletableFuture,而后再依赖CompletableFure的能力进行调用编排,示例如下:
@FunctionalInterfacepublic interface ThriftAsyncCall { void invoke() throws TException;} /** * 该办法为美团外部rpc注册监听的封装,能够作为其余实现的参照 * OctoThriftCallback 为thrift回调办法 * ThriftAsyncCall 为自定义函数,用来示意一次thrift调用(定义如上) */ public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) { //新建一个未实现的CompletableFuture CompletableFuture<T> resultFuture = new CompletableFuture<>(); //监听回调的实现,并且与CompletableFuture同步状态 callback.addObserver(new OctoObserver<T>() { @Override public void onSuccess(T t) { resultFuture.complete(t); } @Override public void onFailure(Throwable throwable) { resultFuture.completeExceptionally(throwable); } }); if (thriftCall != null) { try { thriftCall.invoke(); } catch (TException e) { resultFuture.completeExceptionally(e); } } return resultFuture; }
3.2.2 一元依赖:依赖一个CF
如上图红色链路所示,CF3,CF5别离依赖于CF1和CF2,这种对于单个CompletableFuture的依赖能够通过thenApply、thenAccept、thenCompose等办法来实现,代码如下所示:
CompletableFuture<String> cf3 = cf1.thenApply(result1 -> { //result1为CF1的后果 //...... return "result3";});CompletableFuture<String> cf5 = cf2.thenApply(result2 -> { //result2为CF2的后果 //...... return "result5";});
3.2.3 二元依赖:依赖两个CF
如上图红色链路所示,CF4同时依赖于两个CF1和CF2,这种二元依赖能够通过thenCombine等回调来实现,如下代码所示:
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> { //result1和result2别离为cf1和cf2的后果 return "result4";});
3.2.4 多元依赖:依赖多个CF
如上图红色链路所示,整个流程的完结依赖于三个步骤CF3、CF4、CF5,这种多元依赖能够通过allOf
或anyOf
办法来实现,区别是当须要多个依赖全副实现时应用allOf
,当多个依赖中的任意一个实现即可时应用anyOf
,如下代码所示:
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);CompletableFuture<String> result = cf6.thenApply(v -> { //这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全副实现时,才会执行 。 result3 = cf3.join(); result4 = cf4.join(); result5 = cf5.join(); //依据result3、result4、result5组装最终result; return "result";});
3.3 CompletableFuture原理
CompletableFuture中蕴含两个字段:result和stack。result用于存储以后CF的后果,stack(Completion)示意以后CF实现后须要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作能够有多个(示意有多个依赖它的CF),以栈(Treiber stack)的模式存储,stack示意栈顶元素。
这种形式相似“观察者模式”,依赖动作(Dependency Action)都封装在一个独自Completion子类中。上面是Completion类关系结构图。CompletableFuture中的每个办法都对应了图中的一个Completion的子类,Completion自身是观察者的基类。
- UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
- BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
3.3.1 CompletableFuture的设计思维
依照相似“观察者模式”的设计思维,原理剖析能够从“观察者”和“被观察者”两个方面着手。因为回调品种多,但构造差别不大,所以这里单以一元依赖中的thenApply为例,不再枚举全副回调类型。如下图所示:
3.3.1.1 被观察者
- 每个CompletableFuture都能够被看作一个被观察者,其外部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行实现后会弹栈stack属性,顺次告诉注册到其中的观察者。下面例子中步骤fn2就是作为观察者被封装在UniApply中。
- 被观察者CF中的result属性,用来存储返回后果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在下面的例子中对应步骤fn1的执行后果。
3.3.1.2 观察者
CompletableFuture反对很多回调办法,例如thenAccept、thenApply、exceptionally等,这些办法接管一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,而后查看以后CF是否已处于实现状态(即result != null),如果已实现间接触发fn,否则将观察者Completion退出到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行结束之后告诉触发。
- 观察者中的dep属性:指向其对应的CompletableFuture,在下面的例子中dep指向CF2。
- 观察者中的src属性:指向其依赖的CompletableFuture,在下面的例子中src指向CF1。
- 观察者Completion中的fn属性:用来存储具体的期待被回调的函数。这里须要留神的是不同的回调办法(thenAccept、thenApply、exceptionally等)接管的函数类型也不同,即fn的类型有很多种,在下面的例子中fn指向fn2。
3.3.2 整体流程
3.3.2.1 一元依赖
这里依然以thenApply为例来阐明一元依赖的流程:
- 将观察者Completion注册到CF1,此时CF1将Completion压栈。
- 当CF1的操作运行实现时,会将后果赋值给CF1中的result属性。
- 顺次弹栈,告诉观察者尝试运行。
初步流程设计如上图所示,这里有几个对于注册与告诉的并发问题,大家能够思考下:
Q1:在观察者注册之前,如果CF曾经执行实现,并且曾经发出通知,那么这时观察者因为错过了告诉是不是将永远不会被触发呢 ?
A1:不会。在注册时查看依赖的CF是否曾经实现。如果未实现(即result == null)则将观察者入栈,如果已实现(result != null)则间接触发观察者操作。
Q2:在”入栈“前会有”result == null“的判断,这两个操作为非原子操作,CompletableFufure的实现也没有对两个操作进行加锁,实现工夫在这两个操作之间,观察者依然得不到告诉,是不是依然无奈触发?
A2:不会。入栈之后再次查看CF是否实现,如果实现则触发。
Q3:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF实现的时候都会进行,那么会不会导致一个操作被屡次执行呢 ?如下图所示,即当CF1、CF2同时实现时,如何防止CF3被屡次触发。
A3:CompletableFuture的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者曾经执行过了,那么CAS操作将会失败,勾销执行。
通过对以上3个问题的剖析能够看出,CompletableFuture在解决并行问题时,全程无加锁操作,极大地提高了程序的执行效率。咱们将并行问题思考纳入之后,能够失去欠缺的整体流程图如下所示:
CompletableFuture反对的回调办法非常丰盛,然而正如上一章节的整体流程图所述,他们的整体流程是统一的。所有回调复用同一套流程架构,不同的回调监听通过策略模式实现差异化。
3.3.2.2 二元依赖
咱们以thenCombine为例来阐明二元依赖:
thenCombine操作示意依赖两个CompletableFuture。其观察者实现类为BiApply,如上图所示,BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未实现的状况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF实现时都会尝试触发观察者BiApply,BiApply会查看两个依赖是否都实现,如果实现则开始执行。这里为了解决反复触发的问题,同样用的是上一章节提到的CAS操作,执行时会先通过CAS设置状态位,防止反复触发。
3.3.2.3 多元依赖
依赖多个CompletableFuture的回调办法包含allOf
、anyOf
,区别在于allOf
观察者实现类为BiRelay,须要所有被依赖的CF实现后才会执行回调;而anyOf
观察者实现类为OrRelay,任意一个被依赖的CF实现后就会触发。二者的实现形式都是将多个被依赖的CF构建成一棵均衡二叉树,执行后果层层告诉,直到根节点,触发回调监听。
3.3.3 小结
本章节为CompletableFuture实现原理的科普,旨在尝试不粘贴源码,而通过结构图、流程图以及搭配文字描述把CompletableFuture的实现原理讲述分明。把艰涩的源码翻译为“整体流程”章节的流程图,并且将并发解决的逻辑融入,便于大家了解。
4 实际总结
在商家端API异步化的过程中,咱们遇到了一些问题,这些问题有的会比拟荫蔽,上面把这些问题的解决教训整理出来。心愿能帮忙到更多的同学,大家能够少踩一些坑。
4.1 线程阻塞问题
4.1.1 代码执行在哪个线程上?
要正当治理线程资源,最根本的前提条件就是要在写代码时,分明地晓得每一行代码都将执行在哪个线程上。上面咱们看一下CompletableFuture的执行线程状况。
CompletableFuture实现了CompletionStage接口,通过丰盛的回调办法,反对各种组合操作,每种组合场景都有同步和异步两种办法。
同步办法(即不带Async后缀的办法)有两种状况。
- 如果注册时被依赖的操作曾经执行实现,则间接由以后线程执行。
- 如果注册时被依赖的操作还未执行完,则由回调线程执行。
异步办法(即带Async后缀的办法):能够抉择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会应用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的利用,线程数可能成为瓶颈)。
例如:
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName()); //业务操作 return "";}, threadPool1);//此时,如果future1中的业务操作曾经执行结束并返回,则该thenApply间接由以后main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。future1.thenApply(value -> { System.out.println("thenApply 执行线程:" + Thread.currentThread().getName()); return value + "1";});//应用ForkJoinPool中的共用线程池CommonPoolfuture1.thenApplyAsync(value -> {//do something return value + "1";});//应用指定线程池future1.thenApplyAsync(value -> {//do something return value + "1";}, threadPool1);
4.2 线程池须知
4.2.1 异步回调要传线程池
后面提到,异步回调办法能够抉择是否传递线程池参数Executor,这里咱们倡议强制传线程池,且依据理论状况做线程池隔离。
当不传递线程池时,会应用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,外围线程数=处理器数量-1(单核外围线程数为1),所有异步回调都会共用该CommonPool,外围与非核心业务都竞争同一个池中的线程,很容易成为零碎瓶颈。手动传递线程池参数能够更不便的调节参数,并且能够给不同的业务调配不同的线程池,以求资源隔离,缩小不同业务之间的互相烦扰。
4.2.2 线程池循环援用会导致死锁
public Object doGet() { ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100)); CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> { //do sth return CompletableFuture.supplyAsync(() -> { System.out.println("child"); return "child"; }, threadPool1).join();//子工作 }, threadPool1); return cf1.join();}
如上代码块所示,doGet办法第三行通过supplyAsync向threadPool1申请线程,并且外部子工作又向threadPool1申请线程。threadPool1大小为10,当同一时刻有10个申请达到,则threadPool1被打满,子工作申请线程时进入阻塞队列排队,然而父工作的实现又依赖于子工作,这时因为子工作得不到线程,父工作无奈实现。主线程执行cf1.join()进入阻塞状态,并且永远无奈复原。
为了修复该问题,须要将父工作与子工作做线程池隔离,两个工作申请不同的线程池,防止循环依赖导致的阻塞。
4.2.3 异步RPC调用留神不要阻塞IO线程池
服务异步化后很多步骤都会依赖于异步RPC调用的后果,这时须要特地留神一点,如果是应用基于NIO(比方Netty)的异步RPC,则返回后果是由IO线程负责设置的,即回调办法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的办法)如果依赖的异步RPC调用的返回后果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时须要保障同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行实现前,IO线程将始终被占用,影响整个服务的响应。
4.3 其余
4.3.1 异样解决
因为异步执行的工作在其余线程上执行,而异样信息存储在线程栈中,因而以后线程除非阻塞期待返回后果,否则无奈通过try\catch捕捉异样。CompletableFuture提供了异样捕捉回调exceptionally,相当于同步调用中的try\catch。应用办法如下所示:
@Autowiredprivate WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//外部接口public CompletableFuture<Integer> getCancelTypeAsync(long orderId) { CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务办法,外部会发动异步rpc调用 return remarkResultFuture .exceptionally(err -> {//通过exceptionally 捕捉异样,打印日志并返回默认值 log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, err); return 0; });}
有一点须要留神,CompletableFuture在回调办法中对异样进行了包装。大部分异样会封装成CompletionException后抛出,真正的异样存储在cause属性中,因而如果调用链中通过了回调办法解决那么就须要用Throwable.getCause()办法提取真正的异样。然而,有些状况下会间接返回真正的异样(Stack Overflow的探讨),最好应用工具类提取异样,如下代码所示:
@Autowiredprivate WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//外部接口public CompletableFuture<Integer> getCancelTypeAsync(long orderId) { CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务办法,外部会发动异步rpc调用 return remarkResultFuture .thenApply(result -> {//这里减少了一个回调办法thenApply,如果产生异样thenApply外部会通过new CompletionException(throwable) 对异样进行包装 //这里是一些业务操作 }) .exceptionally(err -> {//通过exceptionally 捕捉异样,这里的err曾经被thenApply包装过,因而须要通过Throwable.getCause()提取异样 log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err)); return 0; });}
下面代码中用到了一个自定义的工具类ExceptionUtils,用于CompletableFuture的异样提取,在应用CompletableFuture做异步编程时,能够间接应用该工具类解决异样。实现代码如下:
public class ExceptionUtils { public static Throwable extractRealException(Throwable throwable) { //这里判断异样类型是否为CompletionException、ExecutionException,如果是则进行提取,否则间接返回。 if (throwable instanceof CompletionException || throwable instanceof ExecutionException) { if (throwable.getCause() != null) { return throwable.getCause(); } } return throwable; }}
4.3.2 积淀的工具办法介绍
在实际过程中咱们积淀了一些通用的工具办法,在应用CompletableFuture开发时能够间接拿来应用,详情参见“附录”。
5 异步化收益
通过异步化革新,美团商家端API零碎的性能失去显著晋升,与革新前比照的收益如下:
- 外围接口吞吐量大幅晋升,其中订单轮询接口革新前TP99为754ms,革新后降为408ms。
- 服务器数量缩小1/3。
6 参考文献
- CompletableFuture (Java Platform SE 8 )
- java - Does CompletionStage always wrap exceptions in CompletionException? - Stack Overflow
- exception - Surprising behavior of Java 8 CompletableFuture exceptionally method - Stack Overflow
- 文档 | Apache Dubbo
7 名词解释及备注
注1:“增量同步”是指商家客户端与服务端之间的订单增量数据同步协定,客户端应用该协定获取新增订单以及状态发生变化的订单。
注2:本文波及到的所有技术点依赖的Java版本为JDK 8,CompletableFuture反对的个性剖析也是基于该版本。
附录
自定义函数
@FunctionalInterfacepublic interface ThriftAsyncCall { void invoke() throws TException ;}
CompletableFuture解决工具类
/** * CompletableFuture封装工具类 */@Slf4jpublic class FutureUtils {/** * 该办法为美团外部rpc注册监听的封装,能够作为其余实现的参照 * OctoThriftCallback 为thrift回调办法 * ThriftAsyncCall 为自定义函数,用来示意一次thrift调用(定义如上) */public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) { CompletableFuture<T> thriftResultFuture = new CompletableFuture<>(); callback.addObserver(new OctoObserver<T>() { @Override public void onSuccess(T t) { thriftResultFuture.complete(t); } @Override public void onFailure(Throwable throwable) { thriftResultFuture.completeExceptionally(throwable); } }); if (thriftCall != null) { try { thriftCall.invoke(); } catch (TException e) { thriftResultFuture.completeExceptionally(e); } } return thriftResultFuture;} /** * 设置CF状态为失败 */ public static <T> CompletableFuture<T> failed(Throwable ex) { CompletableFuture<T> completableFuture = new CompletableFuture<>(); completableFuture.completeExceptionally(ex); return completableFuture; } /** * 设置CF状态为胜利 */ public static <T> CompletableFuture<T> success(T result) { CompletableFuture<T> completableFuture = new CompletableFuture<>(); completableFuture.complete(result); return completableFuture; } /** * 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>> */ public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures) { return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); } /** * 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>> * 多用于分页查问的场景 */ public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures) { return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream() .flatMap( listFuture -> listFuture.join().stream()) .collect(Collectors.toList()) ); } /* * 将List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>> * @Param mergeFunction 自定义key抵触时的merge策略 */ public static <K, V> CompletableFuture<Map<K, V>> sequenceMap( Collection<CompletableFuture<Map<K, V>>> completableFutures, BinaryOperator<V> mergeFunction) { return CompletableFuture .allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream().map(CompletableFuture::join) .flatMap(map -> map.entrySet().stream()) .collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction))); } /** * 将List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>,并过滤调null值 */ public static <T> CompletableFuture<List<T>> sequenceNonNull(Collection<CompletableFuture<T>> completableFutures) { return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream() .map(CompletableFuture::join) .filter(e -> e != null) .collect(Collectors.toList()) ); } /** * 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>,并过滤调null值 * 多用于分页查问的场景 */ public static <T> CompletableFuture<List<T>> sequenceListNonNull(Collection<CompletableFuture<List<T>>> completableFutures) { return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream() .flatMap( listFuture -> listFuture.join().stream().filter(e -> e != null)) .collect(Collectors.toList()) ); } /** * 将List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>> * @Param filterFunction 自定义过滤策略 */ public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures, Predicate<? super T> filterFunction) { return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream() .map(CompletableFuture::join) .filter(filterFunction) .collect(Collectors.toList()) ); } /** * 将List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>> * @Param filterFunction 自定义过滤策略 */ public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures, Predicate<? super T> filterFunction) { return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream() .flatMap( listFuture -> listFuture.join().stream().filter(filterFunction)) .collect(Collectors.toList()) ); }/** * 将CompletableFuture<Map<K,V>>的list转为 CompletableFuture<Map<K,V>>。 多个map合并为一个map。 如果key抵触,采纳新的value笼罩。 */ public static <K, V> CompletableFuture<Map<K, V>> sequenceMap( Collection<CompletableFuture<Map<K, V>>> completableFutures) { return CompletableFuture .allOf(completableFutures.toArray(new CompletableFuture<?>[0])) .thenApply(v -> completableFutures.stream().map(CompletableFuture::join) .flatMap(map -> map.entrySet().stream()) .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> b))); }}
异样提取工具类
public class ExceptionUtils { /** * 提取真正的异样 */ public static Throwable extractRealException(Throwable throwable) { if (throwable instanceof CompletionException || throwable instanceof ExecutionException) { if (throwable.getCause() != null) { return throwable.getCause(); } } return throwable; } }
打印日志
@Slf4j public abstract class AbstractLogAction<R> { protected final String methodName; protected final Object[] args;public AbstractLogAction(String methodName, Object... args) { this.methodName = methodName; this.args = args;}protected void logResult(R result, Throwable throwable) { if (throwable != null) { boolean isBusinessError = throwable instanceof TBase || (throwable.getCause() != null && throwable .getCause() instanceof TBase); if (isBusinessError) { logBusinessError(throwable); } else if (throwable instanceof DegradeException || throwable instanceof DegradeRuntimeException) {//这里为外部rpc框架抛出的异样,应用时能够酌情批改 if (RhinoSwitch.getBoolean("isPrintDegradeLog", false)) { log.error("{} degrade exception, param:{} , error:{}", methodName, args, throwable); } } else { log.error("{} unknown error, param:{} , error:{}", methodName, args, ExceptionUtils.extractRealException(throwable)); } } else { if (isLogResult()) { log.info("{} param:{} , result:{}", methodName, args, result); } else { log.info("{} param:{}", methodName, args); } }}private void logBusinessError(Throwable throwable) { log.error("{} business error, param:{} , error:{}", methodName, args, throwable.toString(), ExceptionUtils.extractRealException(throwable));}private boolean isLogResult() { //这里是动静配置开关,用于动态控制日志打印,开源动静配置核心能够应用nacos、apollo等,如果我的项目没有应用配置核心则能够删除 return RhinoSwitch.getBoolean(methodName + "_isLogResult", false);}}
日志解决实现类
/** * 产生异样时,依据是否为业务异样打印日志。 * 跟CompletableFuture.whenComplete配合应用,不扭转completableFuture的后果(失常OR异样) */@Slf4jpublic class LogErrorAction<R> extends AbstractLogAction<R> implements BiConsumer<R, Throwable> {public LogErrorAction(String methodName, Object... args) { super(methodName, args);}@Overridepublic void accept(R result, Throwable throwable) { logResult(result, throwable);}}
打印日志形式
completableFuture.whenComplete( new LogErrorAction<>("orderService.getOrder", params));
异常情况返回默认值
/** * 当产生异样时返回自定义的值 */public class DefaultValueHandle<R> extends AbstractLogAction<R> implements BiFunction<R, Throwable, R> { private final R defaultValue;/** * 当返回值为空的时候是否替换为默认值 */private final boolean isNullToDefault;/** * @param methodName 办法名称 * @param defaultValue 当异样产生时自定义返回的默认值 * @param args 办法入参 */ public DefaultValueHandle(String methodName, R defaultValue, Object... args) { super(methodName, args); this.defaultValue = defaultValue; this.isNullToDefault = false; }/** * @param isNullToDefault * @param defaultValue 当异样产生时自定义返回的默认值 * @param methodName 办法名称 * @param args 办法入参 */ public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) { super(methodName, args); this.defaultValue = defaultValue; this.isNullToDefault = isNullToDefault; }@Overridepublic R apply(R result, Throwable throwable) { logResult(result, throwable); if (throwable != null) { return defaultValue; } if (result == null && isNullToDefault) { return defaultValue; } return result;}public static <R> DefaultValueHandle.DefaultValueHandleBuilder<R> builder() { return new DefaultValueHandle.DefaultValueHandleBuilder<>();}public static class DefaultValueHandleBuilder<R> { private boolean isNullToDefault; private R defaultValue; private String methodName; private Object[] args; DefaultValueHandleBuilder() { } public DefaultValueHandle.DefaultValueHandleBuilder<R> isNullToDefault(final boolean isNullToDefault) { this.isNullToDefault = isNullToDefault; return this; } public DefaultValueHandle.DefaultValueHandleBuilder<R> defaultValue(final R defaultValue) { this.defaultValue = defaultValue; return this; } public DefaultValueHandle.DefaultValueHandleBuilder<R> methodName(final String methodName) { this.methodName = methodName; return this; } public DefaultValueHandle.DefaultValueHandleBuilder<R> args(final Object... args) { this.args = args; return this; } public DefaultValueHandle<R> build() { return new DefaultValueHandle<R>(this.isNullToDefault, this.defaultValue, this.methodName, this.args); } public String toString() { return "DefaultValueHandle.DefaultValueHandleBuilder(isNullToDefault=" + this.isNullToDefault + ", defaultValue=" + this.defaultValue + ", methodName=" + this.methodName + ", args=" + Arrays.deepToString(this.args) + ")"; }}
默认返回值利用示例
completableFuture.handle(new DefaultValueHandle<>("orderService.getOrder", Collections.emptyMap(), params));
本文作者
长发、旭孟、向鹏,均来自美团外卖商家组技术团队。
招聘信息
美团外卖商家组技术团队,通过技术手段服务于百万商家,涵盖客户、合同、商品、交易、成长等多个业务方向构建商家端系统,同时晋升餐饮外卖商家的数字化经营程度,帮忙美团建设丰盛的供应,为用户提供更加丰盛、多样的可选择性。
美团外卖商家零碎,既有日千万量级订单下的稳定性挑战,又具备B端特有的业务复杂性,同时也在商家生态、商家经营、智能硬件等方向翻新与摸索。通过在高可用、畛域驱动设计、微服务等技术方向继续实际,积攒了丰盛的技术教训。
欢送退出美团外卖商家组技术团队,感兴趣的同学能够将简历发送至pingxumeng@meituan.com
浏览美团技术团队更多技术文章合集
前端 | 算法 | 后端 | 数据 | 平安 | 运维 | iOS | Android | 测试
| 在公众号菜单栏对话框回复【2021年货】、【2020年货】、【2019年货】、【2018年货】、【2017年货】等关键词,可查看美团技术团队历年技术文章合集。
| 本文系美团技术团队出品,著作权归属美团。欢送出于分享和交换等非商业目标转载或应用本文内容,敬请注明“内容转载自美团技术团队”。本文未经许可,不得进行商业性转载或者应用。任何商用行为,请发送邮件至tech@meituan.com申请受权。