乐趣区

关于java:Java-CompletableFuture-异步超时实现探索

作者:京东科技 张天赐

前言

JDK 8 是一次重大的版本升级,新增了十分多的个性,其中之一便是 CompletableFuture。自此从 JDK 层面真正意义上的反对了基于事件的异步编程范式,补救了 Future 的缺点。

在咱们的日常优化中,最罕用伎俩便是多线程并行执行。这时候就会波及到 CompletableFuture 的应用。

常见应用形式

上面举例一个常见场景。

如果咱们有两个 RPC 近程调用服务,咱们须要获取两个 RPC 的后果后,再进行后续逻辑解决。

public static void main(String[] args) {
    // 工作 A,耗时 2 秒
    int resultA = compute(1);
    // 工作 B,耗时 2 秒
    int resultB = compute(2);

    // 后续业务逻辑解决
    System.out.println(resultA + resultB);
}

能够预估到,串行执行起码耗时 4 秒,并且 B 工作并不依赖 A 工作后果。

对于这种场景,咱们通常会抉择并行的形式优化,Demo 代码如下:

public static void main(String[] args) {
    // 仅简略举例,在生产代码中可别这么写!// 统计耗时的函数
    time(() -> {CompletableFuture<Integer> result = Stream.of(1, 2)
                                                  // 创立异步工作
                                                  .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                  // 聚合
                                                  .reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));

        // 期待后果
        try {System.out.println("后果:" + result.get());
        } catch (ExecutionException | InterruptedException e) {System.err.println("工作执行异样");
        }
    });
}

输入:[async-1]: 工作执行开始:1
[async-2]: 工作执行开始:2
[async-1]: 工作执行实现:1
[async-2]: 工作执行实现:2
后果:3
耗时:2 秒 

能够看到耗时变成了 2 秒。

存在的问题

剖析

看上去 CompletableFuture 现有性能能够满足咱们诉求。但当咱们引入一些事实常见状况时,一些潜在的有余便裸露进去了。

compute(x) 如果是一个依据入参查问用户某类型优惠券列表的工作,咱们须要查问两种优惠券并组合在一起返回给上游。如果上游要求咱们 2 秒内处理完毕并返回后果,但 compute(x) 耗时却在 0.5 秒 ~ 无穷大稳定。这时候咱们就须要把耗时过长的 compute(x) 工作后果放弃,仅解决在指定工夫内实现的工作,尽可能保障服务可用。

那么以上代码的耗时由耗时最长的服务决定,无奈满足现有诉求。通常咱们会应用 get(long timeout, TimeUnit unit) 来指定获取后果的超时工夫,并且咱们会给 compute(x) 设置一个超时工夫,达到后主动抛异样来中断工作。

public static void main(String[] args) {
    // 仅简略举例,在生产代码中可别这么写!// 统计耗时的函数
    time(() -> {List<CompletableFuture<Integer>> result = Stream.of(1, 2)
                                                        // 创立异步工作,compute(x) 超时抛出异样
                                                        .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                        .toList();

        // 期待后果
        int res = 0;
        for (CompletableFuture<Integer> future : result) {
            try {res += future.get(2, SECONDS);
            } catch (ExecutionException | InterruptedException | TimeoutException e) {System.err.println("工作执行异样或超时");
            }
        }

        System.out.println("后果:" + res);
    });
}

输入:[async-2]: 工作执行开始:2
[async-1]: 工作执行开始:1
[async-1]: 工作执行实现:1
工作执行异样或超时
后果:1
耗时:2 秒 

能够看到,只有咱们可能给 compute(x) 设置一个超时工夫将工作中断,联合 getgetNow 等获取后果的形式,就能够很好地治理整体耗时。

那么问题也就转变成了, 如何给工作设置异步超时工夫呢

现有做法

当异步工作是一个 RPC 申请时,咱们能够设置一个 JSF 超时,以达到异步超时成果。

当申请是一个 R2M 申请时,咱们也能够管制 R2M 连贯的最大超时工夫来达到成果。

这么看如同咱们都是在依赖三方中间件的能力来治理工作超时工夫?那么就存在一个问题,中间件超时控制能力无限,如果异步工作是中间件 IO 操作 + 本地计算操作怎么办?

用 JSF 超时举一个具体的例子,反编译 JSF 的获取后果代码如下:

public V get(long timeout, TimeUnit unit) throws InterruptedException {
    // 配置的超时工夫
    timeout = unit.toMillis(timeout);
    // 残余等待时间
    long remaintime = timeout - (this.sentTime - this.genTime);
    if (remaintime <= 0L) {if (this.isDone()) {
            // 反序列化获取后果
            return this.getNow();}
    } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
        // 等待时间内工作实现,反序列化获取后果
        return this.getNow();}

    this.setDoneTime();
    // 超时抛出异样
    throw this.clientTimeoutException(false);
}

当这个工作刚好卡在超时边缘实现时,这个工作的耗时工夫就变成了超时工夫 + 获取后果工夫。而获取后果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。

某些 CPU 使用率高的状况下,就会呈现异步工作没能触发抛出异常中断,导致咱们无奈精确管制超时工夫。对上游来说,本次申请全副失败。

解决形式

JDK 9

这类问题十分常见,如大促场景,服务器 CPU 霎时升高就会呈现以上问题。

那么如何解决呢?其实 JDK 的开发大佬们早有钻研。在 JDK 9,CompletableFuture 正式提供了 orTimeoutcompleteTimeout 办法,来精确实现异步超时管制。

public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
    return this;
}

JDK 9 orTimeout 其实现原理是通过一个定时工作,在给定工夫之后抛出异样。如果工作在指定工夫内实现,则勾销抛异样的操作。

以上代码咱们按执行程序来看下:

首先执行 new Timeout(this)

static final class Timeout implements Runnable {
    final CompletableFuture<?> f;
    Timeout(CompletableFuture<?> f) {this.f = f;}
    public void run() {if (f != null && !f.isDone())
            // 抛出超时异样
            f.completeExceptionally(new TimeoutException());
    }
}

通过源码能够看到,Timeout 是一个实现 Runnable 的类,run() 办法负责给传入的异步工作通过 completeExceptionally CAS 赋值异样,将工作标记为异样实现。

那么谁来触发这个 run() 办法呢?咱们看下 Delayer 的实现。

static final class Delayer {
    static ScheduledFuture<?> delay(Runnable command, long delay,
                                    TimeUnit unit) {
        // 到工夫触发 command 工作
        return delayer.schedule(command, delay, unit);
    }

    static final class DaemonThreadFactory implements ThreadFactory {public Thread newThread(Runnable r) {Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("CompletableFutureDelayScheduler");
            return t;
        }
    }

    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }
}

Delayer 其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit) 通过 ScheduledThreadPoolExecutor 实现指定工夫后触发 Timeout 的 run() 办法。

到这里就曾经实现了超时抛出异样的操作。但当工作实现时,就没必要触发 Timeout 了。因而咱们还须要实现一个勾销逻辑。

static final class Canceller implements BiConsumer<Object, Throwable> {
    final Future<?> f;
    Canceller(Future<?> f) {this.f = f;}
    public void accept(Object ignore, Throwable ex) {if (ex == null && f != null && !f.isDone())
        // 3 未触发抛异样工作则勾销
            f.cancel(false);
    }
}

当工作执行实现,或者工作执行异样时,咱们也就没必要抛出超时异样了。因而咱们能够把 delayer.schedule(command, delay, unit) 返回的定时超时工作勾销,不再触发 Timeout。当咱们的异步工作实现,并且定时超时工作未实现的时候,就是咱们勾销的机会。因而咱们能够通过 whenComplete(BiConsumer<? super T, ? super Throwable> action) 来实现。

Canceller 就是一个 BiConsumer 的实现。其持有了 delayer.schedule(command, delay, unit) 返回的定时超时工作,accept(Object ignore, Throwable ex) 实现了定时超时工作未实现后,执行 cancel(boolean mayInterruptIfRunning) 勾销工作的操作。

JDK 8

如果咱们应用的是 JDK 9 或以上,咱们能够间接用 JDK 的实现来实现异步超时操作。那么 JDK 8 怎么办呢?

其实咱们也能够根据上述逻辑简略实现一个工具类来辅助。

以下是咱们营销本人的工具类以及用法,贴出来给大家作为参考,大家也能够本人写的更优雅一些~

调用形式:

CompletableFutureExpandUtils.orTimeout(异步工作, 超时工夫, 工夫单位);

工具类源码:

package com.jd.jr.market.reduction.util;

import com.jdpay.market.common.exception.UncheckedException;

import java.util.concurrent.*;
import java.util.function.BiConsumer;

/**
 * CompletableFuture 扩大工具
 *
 * @author zhangtianci7
 */
public class CompletableFutureExpandUtils {

    /**
     * 如果在给定超时之前未实现,则异样实现此 CompletableFuture 并抛出 {@link TimeoutException}。*
     * @param timeout 在呈现 TimeoutException 异样实现之前期待多长时间,以 {@code unit} 为单位
     * @param unit    一个 {@link TimeUnit},联合 {@code timeout} 参数,示意给定粒度单位的持续时间
     * @return 入参的 CompletableFuture
     */
    public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {if (null == unit) {throw new UncheckedException("工夫的给定粒度不能为空");
        }
        if (null == future) {throw new UncheckedException("异步工作不能为空");
        }
        if (future.isDone()) {return future;}

        return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
    }

    /**
     * 超时时异样实现的操作
     */
    static final class Timeout implements Runnable {
        final CompletableFuture<?> future;

        Timeout(CompletableFuture<?> future) {this.future = future;}

        public void run() {if (null != future && !future.isDone()) {future.completeExceptionally(new TimeoutException());
            }
        }
    }

    /**
     * 勾销不须要的超时的操作
     */
    static final class Canceller implements BiConsumer<Object, Throwable> {
        final Future<?> future;

        Canceller(Future<?> future) {this.future = future;}

        public void accept(Object ignore, Throwable ex) {if (null == ex && null != future && !future.isDone()) {future.cancel(false);
            }
        }
    }

    /**
     * 单例提早调度器,仅用于启动和勾销工作,一个线程就足够
     */
    static final class Delayer {static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {return delayer.schedule(command, delay, unit);
        }

        static final class DaemonThreadFactory implements ThreadFactory {public Thread newThread(Runnable r) {Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureExpandUtilsDelayScheduler");
                return t;
            }
        }

        static final ScheduledThreadPoolExecutor delayer;

        static {delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }
    }
}

参考资料

  1. JEP 266: JDK 9 并发包更新提案
退出移动版