作者:京东科技 张天赐
前言
JDK 8 是一次重大的版本升级,新增了十分多的个性,其中之一便是 CompletableFuture
。自此从 JDK 层面真正意义上的反对了基于事件的异步编程范式,补救了 Future
的缺点。
在咱们的日常优化中,最罕用伎俩便是多线程并行执行。这时候就会波及到 CompletableFuture
的应用。
常见应用形式
上面举例一个常见场景。
如果咱们有两个 RPC 近程调用服务,咱们须要获取两个 RPC 的后果后,再进行后续逻辑解决。
public static void main(String[] args) {
// 工作 A,耗时 2 秒
int resultA = compute(1);
// 工作 B,耗时 2 秒
int resultB = compute(2);
// 后续业务逻辑解决
System.out.println(resultA + resultB);
}
能够预估到,串行执行起码耗时 4 秒,并且 B 工作并不依赖 A 工作后果。
对于这种场景,咱们通常会抉择并行的形式优化,Demo 代码如下:
public static void main(String[] args) {
// 仅简略举例,在生产代码中可别这么写!// 统计耗时的函数
time(() -> {CompletableFuture<Integer> result = Stream.of(1, 2)
// 创立异步工作
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
// 聚合
.reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
// 期待后果
try {System.out.println("后果:" + result.get());
} catch (ExecutionException | InterruptedException e) {System.err.println("工作执行异样");
}
});
}
输入:[async-1]: 工作执行开始:1
[async-2]: 工作执行开始:2
[async-1]: 工作执行实现:1
[async-2]: 工作执行实现:2
后果:3
耗时:2 秒
能够看到耗时变成了 2 秒。
存在的问题
剖析
看上去 CompletableFuture
现有性能能够满足咱们诉求。但当咱们引入一些事实常见状况时,一些潜在的有余便裸露进去了。
compute(x)
如果是一个依据入参查问用户某类型优惠券列表的工作,咱们须要查问两种优惠券并组合在一起返回给上游。如果上游要求咱们 2 秒内处理完毕并返回后果,但 compute(x)
耗时却在 0.5 秒 ~ 无穷大稳定。这时候咱们就须要把耗时过长的 compute(x)
工作后果放弃,仅解决在指定工夫内实现的工作,尽可能保障服务可用。
那么以上代码的耗时由耗时最长的服务决定,无奈满足现有诉求。通常咱们会应用 get(long timeout, TimeUnit unit)
来指定获取后果的超时工夫,并且咱们会给 compute(x)
设置一个超时工夫,达到后主动抛异样来中断工作。
public static void main(String[] args) {
// 仅简略举例,在生产代码中可别这么写!// 统计耗时的函数
time(() -> {List<CompletableFuture<Integer>> result = Stream.of(1, 2)
// 创立异步工作,compute(x) 超时抛出异样
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
.toList();
// 期待后果
int res = 0;
for (CompletableFuture<Integer> future : result) {
try {res += future.get(2, SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {System.err.println("工作执行异样或超时");
}
}
System.out.println("后果:" + res);
});
}
输入:[async-2]: 工作执行开始:2
[async-1]: 工作执行开始:1
[async-1]: 工作执行实现:1
工作执行异样或超时
后果:1
耗时:2 秒
能够看到,只有咱们可能给 compute(x)
设置一个超时工夫将工作中断,联合 get
、getNow
等获取后果的形式,就能够很好地治理整体耗时。
那么问题也就转变成了, 如何给工作设置异步超时工夫呢 ?
现有做法
当异步工作是一个 RPC 申请时,咱们能够设置一个 JSF 超时,以达到异步超时成果。
当申请是一个 R2M 申请时,咱们也能够管制 R2M 连贯的最大超时工夫来达到成果。
这么看如同咱们都是在依赖三方中间件的能力来治理工作超时工夫?那么就存在一个问题,中间件超时控制能力无限,如果异步工作是中间件 IO 操作 + 本地计算操作怎么办?
用 JSF 超时举一个具体的例子,反编译 JSF 的获取后果代码如下:
public V get(long timeout, TimeUnit unit) throws InterruptedException {
// 配置的超时工夫
timeout = unit.toMillis(timeout);
// 残余等待时间
long remaintime = timeout - (this.sentTime - this.genTime);
if (remaintime <= 0L) {if (this.isDone()) {
// 反序列化获取后果
return this.getNow();}
} else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
// 等待时间内工作实现,反序列化获取后果
return this.getNow();}
this.setDoneTime();
// 超时抛出异样
throw this.clientTimeoutException(false);
}
当这个工作刚好卡在超时边缘实现时,这个工作的耗时工夫就变成了超时工夫 + 获取后果工夫。而获取后果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。
某些 CPU 使用率高的状况下,就会呈现异步工作没能触发抛出异常中断,导致咱们无奈精确管制超时工夫。对上游来说,本次申请全副失败。
解决形式
JDK 9
这类问题十分常见,如大促场景,服务器 CPU 霎时升高就会呈现以上问题。
那么如何解决呢?其实 JDK 的开发大佬们早有钻研。在 JDK 9,CompletableFuture
正式提供了 orTimeout
、completeTimeout
办法,来精确实现异步超时管制。
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
return this;
}
JDK 9 orTimeout
其实现原理是通过一个定时工作,在给定工夫之后抛出异样。如果工作在指定工夫内实现,则勾销抛异样的操作。
以上代码咱们按执行程序来看下:
首先执行 new Timeout(this)
。
static final class Timeout implements Runnable {
final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f) {this.f = f;}
public void run() {if (f != null && !f.isDone())
// 抛出超时异样
f.completeExceptionally(new TimeoutException());
}
}
通过源码能够看到,Timeout
是一个实现 Runnable 的类,run()
办法负责给传入的异步工作通过 completeExceptionally
CAS 赋值异样,将工作标记为异样实现。
那么谁来触发这个 run()
办法呢?咱们看下 Delayer
的实现。
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
// 到工夫触发 command 工作
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {public Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
Delayer
其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit)
通过 ScheduledThreadPoolExecutor
实现指定工夫后触发 Timeout
的 run()
办法。
到这里就曾经实现了超时抛出异样的操作。但当工作实现时,就没必要触发 Timeout
了。因而咱们还须要实现一个勾销逻辑。
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) {this.f = f;}
public void accept(Object ignore, Throwable ex) {if (ex == null && f != null && !f.isDone())
// 3 未触发抛异样工作则勾销
f.cancel(false);
}
}
当工作执行实现,或者工作执行异样时,咱们也就没必要抛出超时异样了。因而咱们能够把 delayer.schedule(command, delay, unit)
返回的定时超时工作勾销,不再触发 Timeout
。当咱们的异步工作实现,并且定时超时工作未实现的时候,就是咱们勾销的机会。因而咱们能够通过 whenComplete(BiConsumer<? super T, ? super Throwable> action)
来实现。
Canceller
就是一个 BiConsumer
的实现。其持有了 delayer.schedule(command, delay, unit)
返回的定时超时工作,accept(Object ignore, Throwable ex)
实现了定时超时工作未实现后,执行 cancel(boolean mayInterruptIfRunning)
勾销工作的操作。
JDK 8
如果咱们应用的是 JDK 9 或以上,咱们能够间接用 JDK 的实现来实现异步超时操作。那么 JDK 8 怎么办呢?
其实咱们也能够根据上述逻辑简略实现一个工具类来辅助。
以下是咱们营销本人的工具类以及用法,贴出来给大家作为参考,大家也能够本人写的更优雅一些~
调用形式:
CompletableFutureExpandUtils.orTimeout(异步工作, 超时工夫, 工夫单位);
工具类源码:
package com.jd.jr.market.reduction.util;
import com.jdpay.market.common.exception.UncheckedException;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
* CompletableFuture 扩大工具
*
* @author zhangtianci7
*/
public class CompletableFutureExpandUtils {
/**
* 如果在给定超时之前未实现,则异样实现此 CompletableFuture 并抛出 {@link TimeoutException}。*
* @param timeout 在呈现 TimeoutException 异样实现之前期待多长时间,以 {@code unit} 为单位
* @param unit 一个 {@link TimeUnit},联合 {@code timeout} 参数,示意给定粒度单位的持续时间
* @return 入参的 CompletableFuture
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {if (null == unit) {throw new UncheckedException("工夫的给定粒度不能为空");
}
if (null == future) {throw new UncheckedException("异步工作不能为空");
}
if (future.isDone()) {return future;}
return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
}
/**
* 超时时异样实现的操作
*/
static final class Timeout implements Runnable {
final CompletableFuture<?> future;
Timeout(CompletableFuture<?> future) {this.future = future;}
public void run() {if (null != future && !future.isDone()) {future.completeExceptionally(new TimeoutException());
}
}
}
/**
* 勾销不须要的超时的操作
*/
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> future;
Canceller(Future<?> future) {this.future = future;}
public void accept(Object ignore, Throwable ex) {if (null == ex && null != future && !future.isDone()) {future.cancel(false);
}
}
}
/**
* 单例提早调度器,仅用于启动和勾销工作,一个线程就足够
*/
static final class Delayer {static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {public Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureExpandUtilsDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
static {delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
}
}
参考资料
- JEP 266: JDK 9 并发包更新提案