作者:京东科技 张天赐
前言
JDK 8 是一次重大的版本升级,新增了十分多的个性,其中之一便是 CompletableFuture
。自此从 JDK 层面真正意义上的反对了基于事件的异步编程范式,补救了 Future
的缺点。
在咱们的日常优化中,最罕用伎俩便是多线程并行执行。这时候就会波及到 CompletableFuture
的应用。
常见应用形式
上面举例一个常见场景。
如果咱们有两个 RPC 近程调用服务,咱们须要获取两个 RPC 的后果后,再进行后续逻辑解决。
public static void main(String[] args) { // 工作 A,耗时 2 秒 int resultA = compute(1); // 工作 B,耗时 2 秒 int resultB = compute(2); // 后续业务逻辑解决 System.out.println(resultA + resultB);}
能够预估到,串行执行起码耗时 4 秒,并且 B 工作并不依赖 A 工作后果。
对于这种场景,咱们通常会抉择并行的形式优化,Demo 代码如下:
public static void main(String[] args) { // 仅简略举例,在生产代码中可别这么写! // 统计耗时的函数 time(() -> { CompletableFuture<Integer> result = Stream.of(1, 2) // 创立异步工作 .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor)) // 聚合 .reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor)); // 期待后果 try { System.out.println("后果:" + result.get()); } catch (ExecutionException | InterruptedException e) { System.err.println("工作执行异样"); } });}输入:[async-1]: 工作执行开始:1[async-2]: 工作执行开始:2[async-1]: 工作执行实现:1[async-2]: 工作执行实现:2后果:3耗时:2 秒
能够看到耗时变成了 2 秒。
存在的问题
剖析
看上去 CompletableFuture
现有性能能够满足咱们诉求。但当咱们引入一些事实常见状况时,一些潜在的有余便裸露进去了。
compute(x)
如果是一个依据入参查问用户某类型优惠券列表的工作,咱们须要查问两种优惠券并组合在一起返回给上游。如果上游要求咱们 2 秒内处理完毕并返回后果,但 compute(x)
耗时却在 0.5 秒 ~ 无穷大稳定。这时候咱们就须要把耗时过长的 compute(x)
工作后果放弃,仅解决在指定工夫内实现的工作,尽可能保障服务可用。
那么以上代码的耗时由耗时最长的服务决定,无奈满足现有诉求。通常咱们会应用 get(long timeout, TimeUnit unit)
来指定获取后果的超时工夫,并且咱们会给 compute(x)
设置一个超时工夫,达到后主动抛异样来中断工作。
public static void main(String[] args) { // 仅简略举例,在生产代码中可别这么写! // 统计耗时的函数 time(() -> { List<CompletableFuture<Integer>> result = Stream.of(1, 2) // 创立异步工作,compute(x) 超时抛出异样 .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor)) .toList(); // 期待后果 int res = 0; for (CompletableFuture<Integer> future : result) { try { res += future.get(2, SECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { System.err.println("工作执行异样或超时"); } } System.out.println("后果:" + res); });}输入:[async-2]: 工作执行开始:2[async-1]: 工作执行开始:1[async-1]: 工作执行实现:1工作执行异样或超时后果:1耗时:2 秒
能够看到,只有咱们可能给 compute(x)
设置一个超时工夫将工作中断,联合 get
、getNow
等获取后果的形式,就能够很好地治理整体耗时。
那么问题也就转变成了,如何给工作设置异步超时工夫呢?
现有做法
当异步工作是一个 RPC 申请时,咱们能够设置一个 JSF 超时,以达到异步超时成果。
当申请是一个 R2M 申请时,咱们也能够管制 R2M 连贯的最大超时工夫来达到成果。
这么看如同咱们都是在依赖三方中间件的能力来治理工作超时工夫?那么就存在一个问题,中间件超时控制能力无限,如果异步工作是中间件 IO 操作 + 本地计算操作怎么办?
用 JSF 超时举一个具体的例子,反编译 JSF 的获取后果代码如下:
public V get(long timeout, TimeUnit unit) throws InterruptedException { // 配置的超时工夫 timeout = unit.toMillis(timeout); // 残余等待时间 long remaintime = timeout - (this.sentTime - this.genTime); if (remaintime <= 0L) { if (this.isDone()) { // 反序列化获取后果 return this.getNow(); } } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) { // 等待时间内工作实现,反序列化获取后果 return this.getNow(); } this.setDoneTime(); // 超时抛出异样 throw this.clientTimeoutException(false);}
当这个工作刚好卡在超时边缘实现时,这个工作的耗时工夫就变成了超时工夫 + 获取后果工夫。而获取后果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。
某些 CPU 使用率高的状况下,就会呈现异步工作没能触发抛出异常中断,导致咱们无奈精确管制超时工夫。对上游来说,本次申请全副失败。
解决形式
JDK 9
这类问题十分常见,如大促场景,服务器 CPU 霎时升高就会呈现以上问题。
那么如何解决呢?其实 JDK 的开发大佬们早有钻研。在 JDK 9,CompletableFuture
正式提供了 orTimeout
、completeTimeout
办法,来精确实现异步超时管制。
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { if (unit == null) throw new NullPointerException(); if (result == null) whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit))); return this;}
JDK 9 orTimeout
其实现原理是通过一个定时工作,在给定工夫之后抛出异样。如果工作在指定工夫内实现,则勾销抛异样的操作。
以上代码咱们按执行程序来看下:
首先执行 new Timeout(this)
。
static final class Timeout implements Runnable { final CompletableFuture<?> f; Timeout(CompletableFuture<?> f) { this.f = f; } public void run() { if (f != null && !f.isDone()) // 抛出超时异样 f.completeExceptionally(new TimeoutException()); }}
通过源码能够看到,Timeout
是一个实现 Runnable 的类,run()
办法负责给传入的异步工作通过 completeExceptionally
CAS 赋值异样,将工作标记为异样实现。
那么谁来触发这个 run()
办法呢?咱们看下 Delayer
的实现。
static final class Delayer { static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) { // 到工夫触发 command 工作 return delayer.schedule(command, delay, unit); } static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("CompletableFutureDelayScheduler"); return t; } } static final ScheduledThreadPoolExecutor delayer; static { (delayer = new ScheduledThreadPoolExecutor( 1, new DaemonThreadFactory())). setRemoveOnCancelPolicy(true); }}
Delayer
其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit)
通过 ScheduledThreadPoolExecutor
实现指定工夫后触发 Timeout
的 run()
办法。
到这里就曾经实现了超时抛出异样的操作。但当工作实现时,就没必要触发 Timeout
了。因而咱们还须要实现一个勾销逻辑。
static final class Canceller implements BiConsumer<Object, Throwable> { final Future<?> f; Canceller(Future<?> f) { this.f = f; } public void accept(Object ignore, Throwable ex) { if (ex == null && f != null && !f.isDone()) // 3 未触发抛异样工作则勾销 f.cancel(false); }}
当工作执行实现,或者工作执行异样时,咱们也就没必要抛出超时异样了。因而咱们能够把 delayer.schedule(command, delay, unit)
返回的定时超时工作勾销,不再触发 Timeout
。 当咱们的异步工作实现,并且定时超时工作未实现的时候,就是咱们勾销的机会。因而咱们能够通过 whenComplete(BiConsumer<? super T, ? super Throwable> action)
来实现。
Canceller
就是一个 BiConsumer
的实现。其持有了 delayer.schedule(command, delay, unit)
返回的定时超时工作,accept(Object ignore, Throwable ex)
实现了定时超时工作未实现后,执行 cancel(boolean mayInterruptIfRunning)
勾销工作的操作。
JDK 8
如果咱们应用的是 JDK 9 或以上,咱们能够间接用 JDK 的实现来实现异步超时操作。那么 JDK 8 怎么办呢?
其实咱们也能够根据上述逻辑简略实现一个工具类来辅助。
以下是咱们营销本人的工具类以及用法,贴出来给大家作为参考,大家也能够本人写的更优雅一些~
调用形式:
CompletableFutureExpandUtils.orTimeout(异步工作, 超时工夫, 工夫单位);
工具类源码:
package com.jd.jr.market.reduction.util;import com.jdpay.market.common.exception.UncheckedException;import java.util.concurrent.*;import java.util.function.BiConsumer;/** * CompletableFuture 扩大工具 * * @author zhangtianci7 */public class CompletableFutureExpandUtils { /** * 如果在给定超时之前未实现,则异样实现此 CompletableFuture 并抛出 {@link TimeoutException} 。 * * @param timeout 在呈现 TimeoutException 异样实现之前期待多长时间,以 {@code unit} 为单位 * @param unit 一个 {@link TimeUnit},联合 {@code timeout} 参数,示意给定粒度单位的持续时间 * @return 入参的 CompletableFuture */ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) { if (null == unit) { throw new UncheckedException("工夫的给定粒度不能为空"); } if (null == future) { throw new UncheckedException("异步工作不能为空"); } if (future.isDone()) { return future; } return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit))); } /** * 超时时异样实现的操作 */ static final class Timeout implements Runnable { final CompletableFuture<?> future; Timeout(CompletableFuture<?> future) { this.future = future; } public void run() { if (null != future && !future.isDone()) { future.completeExceptionally(new TimeoutException()); } } } /** * 勾销不须要的超时的操作 */ static final class Canceller implements BiConsumer<Object, Throwable> { final Future<?> future; Canceller(Future<?> future) { this.future = future; } public void accept(Object ignore, Throwable ex) { if (null == ex && null != future && !future.isDone()) { future.cancel(false); } } } /** * 单例提早调度器,仅用于启动和勾销工作,一个线程就足够 */ static final class Delayer { static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) { return delayer.schedule(command, delay, unit); } static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("CompletableFutureExpandUtilsDelayScheduler"); return t; } } static final ScheduledThreadPoolExecutor delayer; static { delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); delayer.setRemoveOnCancelPolicy(true); } }}
参考资料
- JEP 266: JDK 9 并发包更新提案