CompletableFuture 由 Java 8 提供,是实现异步化的工具类,上手难度较低,且功能强大,反对通过函数式编程的形式对各类操作进行组合编排。相比于 ListenableFuture,CompletableFuture 无效晋升了代码的可读性,解决了“回调天堂”的问题。本文次要讲述 CompletableFuture 的原理与实际,同时联合了美团外卖商家端 API 的异步化实战,心愿能对从事相干开发的同学有所帮忙或启发。
0 背景
随着订单量的持续上升,美团外卖各零碎服务面临的压力也越来越大。作为外卖链路的外围环节,商家端提供了商家接单、配送等一系列外围性能,业务对系统吞吐量的要求也越来越高。而商家端 API 服务是流量入口,所有商家端流量都会由其调度、聚合,对外面向商家提供性能接口,对内调度各个上游服务获取数据进行聚合,具备显明的 I / O 密集型(I/O Bound)特点。在以后日订单规模已达千万级的状况下,应用同步加载形式的弊病逐步浮现,因而咱们开始思考将同步加载改为并行加载的可行性。
1 为何须要并行加载
外卖商家端 API 服务是典型的 I / O 密集型(I/O Bound)服务。除此之外,美团外卖商家端交易业务还有两个比拟大的特点:
- 服务端必须一次返回订单卡片所有内容 :依据商家端和服务端的“增量同步协定 注 1 ”,服务端必须一次性返回订单的所有信息,蕴含订单主信息、商品、结算、配送、用户信息、骑手信息、餐损、退款、客服赔付(参照上面订单卡片截图)等,须要从上游三十多个服务中获取数据。在特定条件下,如第一次登录和长时间没登录的状况下,客户端会分页拉取多个订单,这样发动的近程调用会更多。
- 商家端和服务端 交互频繁:商家对订单状态变动敏感,多种推拉机制保障每次变更可能触达商家,导致 App 和服务端的交互频繁,每次变更须要拉取订单最新的全部内容。
在外卖交易链路如此大的流量下,为了保障商家的用户体验,保障接口的高性能,并行从上游获取数据就成为必然。
2 并行加载的实现形式
并行从上游获取数据,从 IO 模型上来讲分为 同步模型 和异步模型。
2.1 同步模型
从各个服务获取数据最常见的是同步调用,如下图所示:
在同步调用的场景下,接口耗时长、性能差,接口响应时长 T > T1+T2+T3+……+Tn,这时为了缩短接口的响应工夫,个别会应用线程池的形式并行获取数据,商家端订单卡片的组装正是应用了这种形式。
这种形式因为以下两个起因,导致资源利用率比拟低:
- CPU 资源大量节约在阻塞期待上 ,导致 CPU 资源利用率低。在 Java 8 之前,个别会通过回调的形式来缩小阻塞,然而大量应用回调,又引发臭名远扬的 回调天堂 问题,导致代码可读性和可维护性大大降低。
- 为了减少并发度,会引入更多额定的线程池,随着 CPU 调度线程数的减少,会导致更重大的资源争用,贵重的 CPU 资源被损耗在上下文切换上,而且线程自身也会占用系统资源,且不能有限减少。
同步模型下,会导致 硬件资源无奈充分利用,零碎吞吐量容易达到瓶颈。
2.2 NIO 异步模型
咱们次要通过以下两种形式来缩小线程池的调度开销和阻塞工夫:
- 通过 RPC NIO 异步调用的形式能够升高线程数,从而升高调度(上下文切换)开销,如 Dubbo 的异步调用能够参考《dubbo 调用端异步》一文。
- 通过引入 CompletableFuture(下文简称 CF)对业务流程进行编排,升高依赖之间的阻塞。本文次要讲述 CompletableFuture 的应用和原理。
2.3 为什么会抉择 CompletableFuture?
咱们首先对业界宽泛风行的解决方案做了横向调研,次要包含 Future、CompletableFuture注 2 、RxJava、Reactor。它们的个性比照如下:
Future | CompletableFuture | RxJava | Reactor | |
---|---|---|---|---|
Composable(可组合) | ❌ | ✔️ | ✔️ | ✔️ |
Asynchronous(异步) | ✔️ | ✔️ | ✔️ | ✔️ |
Operator fusion(操作交融) | ❌ | ❌ | ✔️ | ✔️ |
Lazy(提早执行) | ❌ | ❌ | ✔️ | ✔️ |
Backpressure(回压) | ❌ | ❌ | ✔️ | ✔️ |
- 可组合:能够将多个依赖操作通过不同的形式进行编排,例如 CompletableFuture 提供 thenCompose、thenCombine 等各种 then 结尾的办法,这些办法就是对“可组合”个性的反对。
- 操作交融:将数据流中应用的多个操作符以某种形式联合起来,进而升高开销(工夫、内存)。
- 提早执行:操作不会立刻执行,当收到明确批示时操作才会触发。例如 Reactor 只有当有订阅者订阅时,才会触发操作。
- 回压:某些异步阶段的处理速度跟不上,间接失败会导致大量数据的失落,对业务来说是不能承受的,这时须要反馈上游生产者升高调用量。
RxJava 与 Reactor 显然更加弱小,它们提供了更多的函数调用形式,反对更多个性,但同时也带来了更大的学习老本。而咱们本次整合最须要的个性就是“异步”、“可组合”,综合思考后,咱们抉择了学习老本绝对较低的 CompletableFuture。
3 CompletableFuture 应用与原理
3.1 CompletableFuture 的背景和定义
3.1.1 CompletableFuture 解决的问题
CompletableFuture 是由 Java 8 引入的,在 Java8 之前咱们个别通过 Future 实现异步。
- Future 用于示意异步计算的后果,只能通过阻塞或者轮询的形式获取后果,而且不反对设置回调办法,Java 8 之前若要设置回调个别会应用 guava 的 ListenableFuture,回调的引入又会导致臭名远扬的回调天堂(上面的例子会通过 ListenableFuture 的应用来具体进行展现)。
- CompletableFuture 对 Future 进行了扩大,能够通过设置回调的形式解决计算结果,同时也反对组合操作,反对进一步的编排,同时肯定水平解决了回调天堂的问题。
上面将举例来说明,咱们通过 ListenableFuture、CompletableFuture 来实现异步的差别。假如有三个操作 step1、step2、step3 存在依赖关系,其中 step3 的执行依赖 step1 和 step2 的后果。
Future(ListenableFuture)的实现(回调天堂)如下:
ExecutorService executor = Executors.newFixedThreadPool(5);
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);
ListenableFuture<String> future1 = guavaExecutor.submit(() -> {
//step 1
System.out.println("执行 step 1");
return "step1 result";
});
ListenableFuture<String> future2 = guavaExecutor.submit(() -> {
//step 2
System.out.println("执行 step 2");
return "step2 result";
});
ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);
Futures.addCallback(future1And2, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> result) {System.out.println(result);
ListenableFuture<String> future3 = guavaExecutor.submit(() -> {System.out.println("执行 step 3");
return "step3 result";
});
Futures.addCallback(future3, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {System.out.println(result);
}
@Override
public void onFailure(Throwable t) {}}, guavaExecutor);
}
@Override
public void onFailure(Throwable t) {}}, guavaExecutor);
CompletableFuture 的实现如下:
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println("执行 step 1");
return "step1 result";
}, executor);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println("执行 step 2");
return "step2 result";
});
cf1.thenCombine(cf2, (result1, result2) -> {System.out.println(result1 + "," + result2);
System.out.println("执行 step 3");
return "step3 result";
}).thenAccept(result3 -> System.out.println(result3));
显然,CompletableFuture 的实现更为简洁,可读性更好。
3.1.2 CompletableFuture 的定义
CompletableFuture 实现了两个接口(如上图所示):Future、CompletionStage。Future 示意异步计算的后果,CompletionStage 用于示意异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个 CompletionStage 触发的,随着以后步骤的实现,也可能会触发其余一系列 CompletionStage 的执行。从而咱们能够依据理论业务对这些步骤进行多样化的编排组合,CompletionStage 接口正是定义了这样的能力,咱们能够通过其提供的 thenAppy、thenCompose 等函数式编程办法来组合编排这些步骤。
3.2 CompletableFuture 的应用
上面咱们通过一个例子来解说 CompletableFuture 如何应用,应用 CompletableFuture 也是构建依赖树的过程。一个 CompletableFuture 的实现会触发另外一系列依赖它的 CompletableFuture 的执行:
如上图所示,这里描述的是一个业务接口的流程,其中包含 CF1\CF2\CF3\CF4\CF5 共 5 个步骤,并描述了这些步骤之间的依赖关系,每个步骤能够是一次 RPC 调用、一次数据库操作或者是一次本地办法调用等,在应用 CompletableFuture 进行异步化编程时,图中的每个步骤都会产生一个 CompletableFuture 对象,最终后果也会用一个 CompletableFuture 来进行示意。
依据 CompletableFuture 依赖数量,能够分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。
3.2.1 零依赖:CompletableFuture 的创立
咱们先看下如何不依赖其余 CompletableFuture 来创立新的 CompletableFuture:
如上图红色链路所示,接口接管到申请后,首先发动两个异步调用 CF1、CF2,次要有三种形式:
ExecutorService executor = Executors.newFixedThreadPool(5);
//1、应用 runAsync 或 supplyAsync 发动异步调用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {return "result1";}, executor);
//2、CompletableFuture.completedFuture()间接创立一个已实现状态的 CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
//3、先初始化一个未实现的 CompletableFuture,而后通过 complete()、completeExceptionally(),实现该 CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");
第三种形式的一个典型应用场景,就是将回调办法转为 CompletableFuture,而后再依赖 CompletableFure 的能力进行调用编排,示例如下:
@FunctionalInterface
public interface ThriftAsyncCall {void invoke() throws TException;
}
/**
* 该办法为美团外部 rpc 注册监听的封装,能够作为其余实现的参照
* OctoThriftCallback 为 thrift 回调办法
* ThriftAsyncCall 为自定义函数,用来示意一次 thrift 调用(定义如上)*/
public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) {
// 新建一个未实现的 CompletableFuture
CompletableFuture<T> resultFuture = new CompletableFuture<>();
// 监听回调的实现,并且与 CompletableFuture 同步状态
callback.addObserver(new OctoObserver<T>() {
@Override
public void onSuccess(T t) {resultFuture.complete(t);
}
@Override
public void onFailure(Throwable throwable) {resultFuture.completeExceptionally(throwable);
}
});
if (thriftCall != null) {
try {thriftCall.invoke();
} catch (TException e) {resultFuture.completeExceptionally(e);
}
}
return resultFuture;
}
3.2.2 一元依赖:依赖一个 CF
如上图红色链路所示,CF3,CF5 别离依赖于 CF1 和 CF2,这种对于单个 CompletableFuture 的依赖能够通过 thenApply、thenAccept、thenCompose 等办法来实现,代码如下所示:
CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
//result1 为 CF1 的后果
//......
return "result3";
});
CompletableFuture<String> cf5 = cf2.thenApply(result2 -> {
//result2 为 CF2 的后果
//......
return "result5";
});
3.2.3 二元依赖:依赖两个 CF
如上图红色链路所示,CF4 同时依赖于两个 CF1 和 CF2,这种二元依赖能够通过 thenCombine 等回调来实现,如下代码所示:
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {
//result1 和 result2 别离为 cf1 和 cf2 的后果
return "result4";
});
3.2.4 多元依赖:依赖多个 CF
如上图红色链路所示,整个流程的完结依赖于三个步骤 CF3、CF4、CF5,这种多元依赖能够通过 allOf
或anyOf
办法来实现,区别是当须要多个依赖全副实现时应用allOf
,当多个依赖中的任意一个实现即可时应用anyOf
,如下代码所示:
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFuture<String> result = cf6.thenApply(v -> {
// 这里的 join 并不会阻塞,因为传给 thenApply 的函数是在 CF3、CF4、CF5 全副实现时,才会执行。result3 = cf3.join();
result4 = cf4.join();
result5 = cf5.join();
// 依据 result3、result4、result5 组装最终 result;
return "result";
});
3.3 CompletableFuture 原理
CompletableFuture 中蕴含两个字段:result和stack。result 用于存储以后 CF 的后果,stack(Completion)示意以后 CF 实现后须要触发的依赖动作(Dependency Actions),去触发依赖它的 CF 的计算,依赖动作能够有多个(示意有多个依赖它的 CF),以栈(Treiber stack)的模式存储,stack 示意栈顶元素。
这种形式相似“观察者模式”,依赖动作(Dependency Action)都封装在一个独自 Completion 子类中。上面是 Completion 类关系结构图。CompletableFuture 中的每个办法都对应了图中的一个 Completion 的子类,Completion 自身是 观察者 的基类。
- UniCompletion 继承了 Completion,是一元依赖的基类,例如 thenApply 的实现类 UniApply 就继承自 UniCompletion。
- BiCompletion 继承了 UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如 thenCombine 的实现类 BiRelay 就继承自 BiCompletion。
3.3.1 CompletableFuture 的设计思维
依照相似“观察者模式”的设计思维,原理剖析能够从“观察者”和“被观察者”两个方面着手。因为回调品种多,但构造差别不大,所以这里单以一元依赖中的 thenApply 为例,不再枚举全副回调类型。如下图所示:
3.3.1.1 被观察者
- 每个 CompletableFuture 都能够被看作一个被观察者,其外部有一个 Completion 类型的链表成员变量 stack,用来存储注册到其中的所有观察者。当被观察者执行实现后会弹栈 stack 属性,顺次告诉注册到其中的观察者。下面例子中步骤 fn2 就是作为观察者被封装在 UniApply 中。
- 被观察者 CF 中的 result 属性,用来存储返回后果数据。这里可能是一次 RPC 调用的返回值,也可能是任意对象,在下面的例子中对应步骤 fn1 的执行后果。
3.3.1.2 观察者
CompletableFuture 反对很多回调办法,例如 thenAccept、thenApply、exceptionally 等,这些办法接管一个函数类型的参数 f,生成一个 Completion 类型的对象(即观察者),并将入参函数 f 赋值给 Completion 的成员变量 fn,而后查看以后 CF 是否已处于实现状态(即 result != null),如果已实现间接触发 fn,否则将观察者 Completion 退出到 CF 的观察者链 stack 中,再次尝试触发,如果被观察者未执行完则其执行结束之后告诉触发。
- 观察者中的 dep 属性:指向其对应的 CompletableFuture,在下面的例子中 dep 指向 CF2。
- 观察者中的 src 属性:指向其依赖的 CompletableFuture,在下面的例子中 src 指向 CF1。
- 观察者 Completion 中的 fn 属性:用来存储具体的期待被回调的函数。这里须要留神的是不同的回调办法(thenAccept、thenApply、exceptionally 等)接管的函数类型也不同,即 fn 的类型有很多种,在下面的例子中 fn 指向 fn2。
3.3.2 整体流程
3.3.2.1 一元依赖
这里依然以 thenApply 为例来阐明一元依赖的流程:
- 将观察者 Completion 注册到 CF1,此时 CF1 将 Completion 压栈。
- 当 CF1 的操作运行实现时,会将后果赋值给 CF1 中的 result 属性。
- 顺次弹栈,告诉观察者尝试运行。
初步流程设计如上图所示,这里有几个对于注册与告诉的并发问题,大家能够思考下:
Q1:在观察者注册之前,如果 CF 曾经执行实现,并且曾经发出通知,那么这时观察者因为错过了告诉是不是将永远不会被触发呢?
A1:不会。在注册时查看依赖的 CF 是否曾经实现。如果未实现(即 result == null)则将观察者入栈,如果已实现(result != null)则间接触发观察者操作。
Q2:在”入栈“前会有”result == null“的判断,这两个操作为非原子操作,CompletableFufure 的实现也没有对两个操作进行加锁,实现工夫在这两个操作之间,观察者依然得不到告诉,是不是依然无奈触发?
A2:不会。入栈之后再次查看 CF 是否实现,如果实现则触发。
Q3:当依赖多个 CF 时,观察者会被压入所有依赖的 CF 的栈中,每个 CF 实现的时候都会进行,那么会不会导致一个操作被屡次执行呢?如下图所示,即当 CF1、CF2 同时实现时,如何防止 CF3 被屡次触发。
A3:CompletableFuture 的实现是这样解决该问题的:观察者在执行之前会先通过 CAS 操作设置一个状态位,将 status 由 0 改为 1。如果观察者曾经执行过了,那么 CAS 操作将会失败,勾销执行。
通过对以上 3 个问题的剖析能够看出,CompletableFuture 在解决并行问题时,全程无加锁操作,极大地提高了程序的执行效率。咱们将并行问题思考纳入之后,能够失去欠缺的整体流程图如下所示:
CompletableFuture 反对的回调办法非常丰盛,然而正如上一章节的整体流程图所述,他们的整体流程是统一的。所有回调复用同一套流程架构,不同的回调监听通过 策略模式 实现差异化。
3.3.2.2 二元依赖
咱们以 thenCombine 为例来阐明二元依赖:
thenCombine 操作示意依赖两个 CompletableFuture。其观察者实现类为 BiApply,如上图所示,BiApply 通过 src 和 snd 两个属性关联被依赖的两个 CF,fn 属性的类型为 BiFunction。与单个依赖不同的是,在依赖的 CF 未实现的状况下,thenCombine 会尝试将 BiApply 压入这两个被依赖的 CF 的栈中,每个被依赖的 CF 实现时都会尝试触发观察者 BiApply,BiApply 会查看两个依赖是否都实现,如果实现则开始执行。这里为了解决反复触发的问题,同样用的是上一章节提到的 CAS 操作,执行时会先通过 CAS 设置状态位,防止反复触发。
3.3.2.3 多元依赖
依赖多个 CompletableFuture 的回调办法包含 allOf
、anyOf
,区别在于allOf
观察者实现类为 BiRelay,须要所有被依赖的 CF 实现后才会执行回调;而 anyOf
观察者实现类为 OrRelay,任意一个被依赖的 CF 实现后就会触发。二者的实现形式都是将多个被依赖的 CF 构建成一棵均衡二叉树,执行后果层层告诉,直到根节点,触发回调监听。
3.3.3 小结
本章节为 CompletableFuture 实现原理的科普,旨在尝试不粘贴源码,而通过结构图、流程图以及搭配文字描述把 CompletableFuture 的实现原理讲述分明。把艰涩的源码翻译为“整体流程”章节的流程图,并且将并发解决的逻辑融入,便于大家了解。
4 实际总结
在商家端 API 异步化的过程中,咱们遇到了一些问题,这些问题有的会比拟荫蔽,上面把这些问题的解决教训整理出来。心愿能帮忙到更多的同学,大家能够少踩一些坑。
4.1 线程阻塞问题
4.1.1 代码执行在哪个线程上?
要正当治理线程资源,最根本的前提条件就是要在写代码时,分明地晓得每一行代码都将执行在哪个线程上。上面咱们看一下 CompletableFuture 的执行线程状况。
CompletableFuture 实现了 CompletionStage 接口,通过丰盛的回调办法,反对各种组合操作,每种组合场景都有同步和异步两种办法。
同步办法(即不带 Async 后缀的办法)有两种状况。
- 如果注册时被依赖的操作曾经执行实现,则间接由以后线程执行。
- 如果注册时被依赖的操作还未执行完,则由回调线程执行。
异步办法(即带 Async 后缀的办法):能够抉择是否传递线程池参数 Executor 运行在指定线程池中;当不传递 Executor 时,会应用 ForkJoinPool 中的共用线程池 CommonPool(CommonPool 的大小是 CPU 核数 -1,如果是 IO 密集的利用,线程数可能成为瓶颈)。
例如:
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
// 业务操作
return "";
}, threadPool1);
// 此时,如果 future1 中的业务操作曾经执行结束并返回,则该 thenApply 间接由以后 main 线程执行;否则,将会由执行以上业务操作的 threadPool1 中的线程执行。future1.thenApply(value -> {System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
return value + "1";
});
// 应用 ForkJoinPool 中的共用线程池 CommonPool
future1.thenApplyAsync(value -> {
//do something
return value + "1";
});
// 应用指定线程池
future1.thenApplyAsync(value -> {
//do something
return value + "1";
}, threadPool1);
4.2 线程池须知
4.2.1 异步回调要传线程池
后面提到,异步回调办法能够抉择是否传递线程池参数 Executor,这里咱们倡议 强制传线程池,且依据理论状况做线程池隔离。
当不传递线程池时,会应用 ForkJoinPool 中的公共线程池 CommonPool,这里所有调用将共用该线程池,外围线程数 = 处理器数量 -1(单核外围线程数为 1),所有异步回调都会共用该 CommonPool,外围与非核心业务都竞争同一个池中的线程,很容易成为零碎瓶颈。手动传递线程池参数能够更不便的调节参数,并且能够给不同的业务调配不同的线程池,以求资源隔离,缩小不同业务之间的互相烦扰。
4.2.2 线程池循环援用会导致死锁
public Object doGet() {ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
//do sth
return CompletableFuture.supplyAsync(() -> {System.out.println("child");
return "child";
}, threadPool1).join();// 子工作}, threadPool1);
return cf1.join();}
如上代码块所示,doGet 办法第三行通过 supplyAsync 向 threadPool1 申请线程,并且外部子工作又向 threadPool1 申请线程。threadPool1 大小为 10,当同一时刻有 10 个申请达到,则 threadPool1 被打满,子工作申请线程时进入阻塞队列排队,然而父工作的实现又依赖于子工作,这时因为子工作得不到线程,父工作无奈实现。主线程执行 cf1.join()进入阻塞状态,并且永远无奈复原。
为了修复该问题,须要将父工作与子工作做线程池隔离,两个工作申请不同的线程池,防止循环依赖导致的阻塞。
4.2.3 异步 RPC 调用留神不要阻塞 IO 线程池
服务异步化后很多步骤都会依赖于异步 RPC 调用的后果,这时须要特地留神一点,如果是应用基于 NIO(比方 Netty)的异步 RPC,则返回后果是由 IO 线程负责设置的,即回调办法由 IO 线程触发,CompletableFuture 同步回调(如 thenApply、thenAccept 等无 Async 后缀的办法)如果依赖的异步 RPC 调用的返回后果,那么这些同步回调将运行在 IO 线程上,而整个服务只有一个 IO 线程池,这时须要保障同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行实现前,IO 线程将始终被占用,影响整个服务的响应。
4.3 其余
4.3.1 异样解决
因为异步执行的工作在其余线程上执行,而异样信息存储在线程栈中,因而以后线程除非阻塞期待返回后果,否则无奈通过 try\catch 捕捉异样。CompletableFuture 提供了异样捕捉回调 exceptionally,相当于同步调用中的 try\catch。应用办法如下所示:
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;// 外部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);// 业务办法,外部会发动异步 rpc 调用
return remarkResultFuture
.exceptionally(err -> {// 通过 exceptionally 捕捉异样,打印日志并返回默认值
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, err);
return 0;
});
}
有一点须要留神,CompletableFuture 在回调办法中对异样进行了包装。大部分异样会封装成 CompletionException 后抛出,真正的异样存储在 cause 属性中,因而如果调用链中通过了回调办法解决那么就须要用 Throwable.getCause()办法提取真正的异样。然而,有些状况下会间接返回真正的异样(Stack Overflow 的探讨),最好应用工具类提取异样,如下代码所示:
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;// 外部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);// 业务办法,外部会发动异步 rpc 调用
return remarkResultFuture
.thenApply(result -> {// 这里减少了一个回调办法 thenApply,如果产生异样 thenApply 外部会通过 new CompletionException(throwable) 对异样进行包装
// 这里是一些业务操作
})
.exceptionally(err -> {// 通过 exceptionally 捕捉异样,这里的 err 曾经被 thenApply 包装过,因而须要通过 Throwable.getCause()提取异样
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));
return 0;
});
}
下面代码中用到了一个自定义的工具类 ExceptionUtils,用于 CompletableFuture 的异样提取,在应用 CompletableFuture 做异步编程时,能够间接应用该工具类解决异样。实现代码如下:
public class ExceptionUtils {public static Throwable extractRealException(Throwable throwable) {
// 这里判断异样类型是否为 CompletionException、ExecutionException,如果是则进行提取,否则间接返回。if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {if (throwable.getCause() != null) {return throwable.getCause();
}
}
return throwable;
}
}
4.3.2 积淀的工具办法介绍
在实际过程中咱们积淀了一些通用的工具办法,在应用 CompletableFuture 开发时能够间接拿来应用,详情参见“附录”。
5 异步化收益
通过异步化革新,美团商家端 API 零碎的性能失去显著晋升,与革新前比照的收益如下:
- 外围接口吞吐量大幅晋升,其中订单轮询接口革新前 TP99 为 754ms,革新后降为 408ms。
- 服务器数量缩小 1 /3。
6 参考文献
- CompletableFuture (Java Platform SE 8)
- java – Does CompletionStage always wrap exceptions in CompletionException? – Stack Overflow
- exception – Surprising behavior of Java 8 CompletableFuture exceptionally method – Stack Overflow
- 文档 | Apache Dubbo
7 名词解释及备注
注 1:“增量同步”是指商家客户端与服务端之间的订单增量数据同步协定,客户端应用该协定获取新增订单以及状态发生变化的订单。
注 2:本文波及到的所有技术点依赖的 Java 版本为 JDK 8,CompletableFuture 反对的个性剖析也是基于该版本。
附录
自定义函数
@FunctionalInterface
public interface ThriftAsyncCall {void invoke() throws TException ;
}
CompletableFuture 解决工具类
/**
* CompletableFuture 封装工具类
*/
@Slf4j
public class FutureUtils {
/**
* 该办法为美团外部 rpc 注册监听的封装,能够作为其余实现的参照
* OctoThriftCallback 为 thrift 回调办法
* ThriftAsyncCall 为自定义函数,用来示意一次 thrift 调用(定义如上)*/
public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) {CompletableFuture<T> thriftResultFuture = new CompletableFuture<>();
callback.addObserver(new OctoObserver<T>() {
@Override
public void onSuccess(T t) {thriftResultFuture.complete(t);
}
@Override
public void onFailure(Throwable throwable) {thriftResultFuture.completeExceptionally(throwable);
}
});
if (thriftCall != null) {
try {thriftCall.invoke();
} catch (TException e) {thriftResultFuture.completeExceptionally(e);
}
}
return thriftResultFuture;
}
/**
* 设置 CF 状态为失败
*/
public static <T> CompletableFuture<T> failed(Throwable ex) {CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(ex);
return completableFuture;
}
/**
* 设置 CF 状态为胜利
*/
public static <T> CompletableFuture<T> success(T result) {CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFuture.complete(result);
return completableFuture;
}
/**
* 将 List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>
*/
public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
/**
* 将 List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>
* 多用于分页查问的场景
*/
public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.flatMap(listFuture -> listFuture.join().stream())
.collect(Collectors.toList())
);
}
/*
* 将 List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>>
* @Param mergeFunction 自定义 key 抵触时的 merge 策略
*/
public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(Collection<CompletableFuture<Map<K, V>>> completableFutures, BinaryOperator<V> mergeFunction) {
return CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction)));
}
/**
* 将 List<CompletableFuture<T>> 转为 CompletableFuture<List<T>>,并过滤调 null 值
*/
public static <T> CompletableFuture<List<T>> sequenceNonNull(Collection<CompletableFuture<T>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.filter(e -> e != null)
.collect(Collectors.toList())
);
}
/**
* 将 List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>,并过滤调 null 值
* 多用于分页查问的场景
*/
public static <T> CompletableFuture<List<T>> sequenceListNonNull(Collection<CompletableFuture<List<T>>> completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.flatMap(listFuture -> listFuture.join().stream().filter(e -> e != null))
.collect(Collectors.toList())
);
}
/**
* 将 List<CompletableFuture<Map<K, V>>> 转为 CompletableFuture<Map<K, V>>
* @Param filterFunction 自定义过滤策略
*/
public static <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> completableFutures,
Predicate<? super T> filterFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join)
.filter(filterFunction)
.collect(Collectors.toList())
);
}
/**
* 将 List<CompletableFuture<List<T>>> 转为 CompletableFuture<List<T>>
* @Param filterFunction 自定义过滤策略
*/
public static <T> CompletableFuture<List<T>> sequenceList(Collection<CompletableFuture<List<T>>> completableFutures,
Predicate<? super T> filterFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream()
.flatMap(listFuture -> listFuture.join().stream().filter(filterFunction))
.collect(Collectors.toList())
);
}
/**
* 将 CompletableFuture<Map<K,V>> 的 list 转为 CompletableFuture<Map<K,V>>。多个 map 合并为一个 map。如果 key 抵触,采纳新的 value 笼罩。*/
public static <K, V> CompletableFuture<Map<K, V>> sequenceMap(Collection<CompletableFuture<Map<K, V>>> completableFutures) {
return CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> completableFutures.stream().map(CompletableFuture::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> b)));
}}
异样提取工具类
public class ExceptionUtils {
/**
* 提取真正的异样
*/
public static Throwable extractRealException(Throwable throwable) {if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {if (throwable.getCause() != null) {return throwable.getCause();
}
}
return throwable;
}
}
打印日志
@Slf4j
public abstract class AbstractLogAction<R> {
protected final String methodName;
protected final Object[] args;
public AbstractLogAction(String methodName, Object... args) {
this.methodName = methodName;
this.args = args;
}
protected void logResult(R result, Throwable throwable) {if (throwable != null) {boolean isBusinessError = throwable instanceof TBase || (throwable.getCause() != null && throwable
.getCause() instanceof TBase);
if (isBusinessError) {logBusinessError(throwable);
} else if (throwable instanceof DegradeException || throwable instanceof DegradeRuntimeException) {// 这里为外部 rpc 框架抛出的异样,应用时能够酌情批改
if (RhinoSwitch.getBoolean("isPrintDegradeLog", false)) {log.error("{} degrade exception, param:{} , error:{}", methodName, args, throwable);
}
} else {log.error("{} unknown error, param:{} , error:{}", methodName, args, ExceptionUtils.extractRealException(throwable));
}
} else {if (isLogResult()) {log.info("{} param:{} , result:{}", methodName, args, result);
} else {log.info("{} param:{}", methodName, args);
}
}
}
private void logBusinessError(Throwable throwable) {log.error("{} business error, param:{} , error:{}", methodName, args, throwable.toString(), ExceptionUtils.extractRealException(throwable));
}
private boolean isLogResult() {
// 这里是动静配置开关,用于动态控制日志打印,开源动静配置核心能够应用 nacos、apollo 等,如果我的项目没有应用配置核心则能够删除
return RhinoSwitch.getBoolean(methodName + "_isLogResult", false);
}}
日志解决实现类
/**
* 产生异样时,依据是否为业务异样打印日志。* 跟 CompletableFuture.whenComplete 配合应用,不扭转 completableFuture 的后果(失常 OR 异样)*/
@Slf4j
public class LogErrorAction<R> extends AbstractLogAction<R> implements BiConsumer<R, Throwable> {public LogErrorAction(String methodName, Object... args) {super(methodName, args);
}
@Override
public void accept(R result, Throwable throwable) {logResult(result, throwable);
}
}
打印日志形式
completableFuture
.whenComplete(new LogErrorAction<>("orderService.getOrder", params));
异常情况返回默认值
/**
* 当产生异样时返回自定义的值
*/
public class DefaultValueHandle<R> extends AbstractLogAction<R> implements BiFunction<R, Throwable, R> {
private final R defaultValue;
/**
* 当返回值为空的时候是否替换为默认值
*/
private final boolean isNullToDefault;
/**
* @param methodName 办法名称
* @param defaultValue 当异样产生时自定义返回的默认值
* @param args 办法入参
*/
public DefaultValueHandle(String methodName, R defaultValue, Object... args) {super(methodName, args);
this.defaultValue = defaultValue;
this.isNullToDefault = false;
}
/**
* @param isNullToDefault
* @param defaultValue 当异样产生时自定义返回的默认值
* @param methodName 办法名称
* @param args 办法入参
*/
public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {super(methodName, args);
this.defaultValue = defaultValue;
this.isNullToDefault = isNullToDefault;
}
@Override
public R apply(R result, Throwable throwable) {logResult(result, throwable);
if (throwable != null) {return defaultValue;}
if (result == null && isNullToDefault) {return defaultValue;}
return result;
}
public static <R> DefaultValueHandle.DefaultValueHandleBuilder<R> builder() {return new DefaultValueHandle.DefaultValueHandleBuilder<>();
}
public static class DefaultValueHandleBuilder<R> {
private boolean isNullToDefault;
private R defaultValue;
private String methodName;
private Object[] args;
DefaultValueHandleBuilder() {}
public DefaultValueHandle.DefaultValueHandleBuilder<R> isNullToDefault(final boolean isNullToDefault) {
this.isNullToDefault = isNullToDefault;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> defaultValue(final R defaultValue) {
this.defaultValue = defaultValue;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> methodName(final String methodName) {
this.methodName = methodName;
return this;
}
public DefaultValueHandle.DefaultValueHandleBuilder<R> args(final Object... args) {
this.args = args;
return this;
}
public DefaultValueHandle<R> build() {return new DefaultValueHandle<R>(this.isNullToDefault, this.defaultValue, this.methodName, this.args);
}
public String toString() {return "DefaultValueHandle.DefaultValueHandleBuilder(isNullToDefault=" + this.isNullToDefault + ", defaultValue=" + this.defaultValue + ", methodName=" + this.methodName + ", args=" + Arrays.deepToString(this.args) + ")";
}
}
默认返回值利用示例
completableFuture.handle(new DefaultValueHandle<>("orderService.getOrder", Collections.emptyMap(), params));
本文作者
长发、旭孟、向鹏,均来自美团外卖商家组技术团队。
招聘信息
美团外卖商家组技术团队,通过技术手段服务于百万商家,涵盖客户、合同、商品、交易、成长等多个业务方向构建商家端系统,同时晋升餐饮外卖商家的数字化经营程度,帮忙美团建设丰盛的供应,为用户提供更加丰盛、多样的可选择性。
美团外卖商家零碎,既有日千万量级订单下的稳定性挑战,又具备 B 端特有的业务复杂性,同时也在商家生态、商家经营、智能硬件等方向翻新与摸索。通过在高可用、畛域驱动设计、微服务等技术方向继续实际,积攒了丰盛的技术教训。
欢送退出美团外卖商家组技术团队,感兴趣的同学能够将简历发送至 pingxumeng@meituan.com
浏览美团技术团队更多技术文章合集
前端 | 算法 | 后端 | 数据 | 平安 | 运维 | iOS | Android | 测试
| 在公众号菜单栏对话框回复【2021 年货】、【2020 年货】、【2019 年货】、【2018 年货】、【2017 年货】等关键词,可查看美团技术团队历年技术文章合集。
| 本文系美团技术团队出品,著作权归属美团。欢送出于分享和交换等非商业目标转载或应用本文内容,敬请注明“内容转载自美团技术团队”。本文未经许可,不得进行商业性转载或者应用。任何商用行为,请发送邮件至 tech@meituan.com 申请受权。