乐趣区

dubbo源码解析四十八异步化改造

2.7 大揭秘——异步化改造

目标:从源码的角度分析 2.7 的新特性中对于异步化的改造原理。

前言

dubbo 中提供了很多类型的协议,关于协议的系列可以查看下面的文章:

  • dubbo 源码解析(二十四)远程调用——dubbo 协议
  • dubbo 源码解析(二十五)远程调用——hessian 协议
  • dubbo 源码解析(二十六)远程调用——http 协议
  • dubbo 源码解析(二十七)远程调用——injvm 本地调用
  • dubbo 源码解析(二十八)远程调用——memcached 协议
  • dubbo 源码解析(二十九)远程调用——redis 协议
  • dubbo 源码解析(三十)远程调用——rest 协议
  • dubbo 源码解析(三十一)远程调用——rmi 协议
  • dubbo 源码解析(三十二)远程调用——thrift 协议
  • dubbo 源码解析(三十三)远程调用——webservice 协议

官方推荐的是使用 dubbo 协议,而异步调用的支持也是在 dubbo 协议中实现的。

看了我之前写的 2.7 新特性的文章,应该对于异步化改造有个大致的印象。要弄懂异步在什么时候起作用,先要弄懂 dubbo 的服务暴露和引用过程以及消费端发送请求过程和服务端处理请求过程。我在前四篇文章已经讲述了相关内容,异步请求只是 dubbo 的一种请求方式,基于 dubbo 底层的异步 NIO 实现异步调用,对于 Provider 响应时间较长的场景是必须的,它能有效利用 Consumer 端的资源,相对于 Consumer 端使用多线程来说开销较小。可以让消费者无需阻塞等待返回结果。

经过改良后,Provider 端也支持异步处理请求,引用官网的话就是现在 Provider 端异步执行和 Consumer 端异步调用是相互独立的,你可以任意正交组合两端配置。

如何开启和使用异步可以查看以下链接:

Provider 异步执行:http://dubbo.apache.org/zh-cn/docs/user/demos/async-execute-on-provider.html

Consumer 异步调用:http://dubbo.apache.org/zh-cn/docs/user/demos/async-call.html

异步的改造

Listener 做为 Filter 的内部接口

从设计上

  1. 废弃了 Filter 原先的 onResponse() 方法
  2. 在 Filter 接口新增了内部接口 Listener,相关接口设计如下。

    • 优点:职责划分更加明确,进行逻辑分组,增强可读性,Filter 本身应仅是传递调用的响应,而所有回调都放入 Listener。这样做以后可以把之前回调的逻辑从 invoke 里面剥离出来,放到 Listener 的 onResponse 或者 onError 中。
interface Listener {

    /**
     * 回调正常的调用结果
     * @param appResponse
     * @param invoker
     * @param invocation
     */
    void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);

    /**
     * 回调异常结果
     * @param t
     * @param invoker
     * @param invocation
     */
    void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
  1. 新增抽象类 ListenableFilter,实现了 Filter 接口,其中只记录了一个该过滤器的内部 Listener 实例。
public abstract class ListenableFilter implements Filter {

    protected Listener listener = null;

    public Listener listener() {
        // 提供该过滤器的内部类 listener
        return listener;
    }
}

异步转同步,新增 InvokeMode

不变的是配置来决定调用方式,变的是在何时去做同步异步的不同逻辑处理。看《dubbo 源码解析(四十六)消费端发送请求过程》讲到的(十四)DubboInvoker 的 doInvoke,在以前的逻辑会直接在 doInvoke 方法中根据配置区分同步、异步、单向调用。现在只单独做了单向调用和需要返回结果的区分,统一先使用 AsyncRpcResult 来表示结果,也就是说一开始统一都是异步调用,然后在调用回到 AsyncToSyncInvoker 的 invoke 中时,才对同步异步做区分,这里新增了 InvokeMode,InvokeMode 现在有三种模式:SYNC, ASYNC, FUTURE。前两种很显而易见,后面一种是调用的返回类型是 Future 类型,代表调用的方法的返回类型是 CompletableFuture 类型,这种模式专门用来支持服务端异步的。看下面的源码。

public static InvokeMode getInvokeMode(URL url, Invocation inv) {
    // 如果返回类型是 future
    if (isReturnTypeFuture(inv)) {return InvokeMode.FUTURE;} else if (isAsync(url, inv)) {
        // 如果是异步调用
        return InvokeMode.ASYNC;
    } else {
        // 如果是同步
        return InvokeMode.SYNC;
    }
}

参考《dubbo 源码解析(四十六)消费端发送请求过程》的(十二)AsyncToSyncInvoker 的 invoke 逻辑,如果是同步模式,就会阻塞调用 get 方法。直到调用成功有结果返回。如果不是同步模式,就直接返回。

ResponseFuture 改为 CompletableFuture

关于 ResponseFuture 可以参考《dubbo 源码解析(十)远程通信——Exchange 层》的(六)ResponseFuture。具体的可以看它的两个实现(七)DefaultFuture 和(八)SimpleFuture。

在这次改造中,最小 JDK 版本从以前的 1.6 变成了 1.8。当然也要用到 1.8 中新特性,其中就包括 CompletableFuture。dubbo 的通信主要有两处,一处是 Consumer 发送请求消息给 Provider,另一处就是 Provider 把结果发送给 Consumer。在 Consumer 发送请求消息给 Provider 的时候,Consumer 不会一直处于等待,而是生成 ResponseFuture 会抛给下游去做其他操作,等到 Provider 把结果返回放入 ResponseFuture,Consumer 可以通过 get 方法获得结果,或者它也支持回调。但是这就暴露了一些问题,也就是为在新特性里提到的缺陷:

  • Future 只支持阻塞式的 get() 接口获取结果。因为 future.get() 会导致线程阻塞。
  • Future 接口无法实现自动回调,而自定义 ResponseFuture 虽支持 callback 回调但支持的异步场景有限,如不支持 Future 间的相互协调或组合等;

针对以上两个不足,CompletableFuture 可以很好的解决它们。

  • 针对第一点不足,因为 CompletableFuture 实现了 CompletionStage 和 Future 接口,所以它还是可以像以前一样通过阻塞或者轮询的方式获得结果。这一点就能保证阻塞式获得结果,也就是同步调用不会被抛弃。当然本身也不是很建议用 get() 这样阻塞的方式来获取结果。
  • 针对第二点不足,首先是自动回调,CompletableFuture 提供了良好的回调方法。比如下面四个方法有关计算结果完成时的处理:
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

当计算完成后,就会执行该方法中的 action 方法。相比于 ResponseFuture,不再需要自己去做回调注册的编码,更加易于理解。

  • 还是针对第二点,自定义的 ResponseFuture 不支持 Future 间的相互协调或组合,CompletableFuture 很好的解决了这个问题,在 CompletableFuture 中以下三个方法实现了 future 之间转化的功能:
public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 function。而且我们还可以将这些操作串联起来,或者将 CompletableFuture 组合起来。这一组函数的功能是当原来的 CompletableFuture 计算完后,将结果传递给函数 fn,将 fn 的结果作为新的 CompletableFuture 计算结果。因此它的功能相当于将 CompletableFuture<T> 转换成 CompletableFuture<U>。

除了转化之外,还有 future 之间组合的支持,例如以下三个方法:

public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

这一组方法接受一个 Function 作为参数,这个 Function 的输入是当前的 CompletableFuture 的计算值,返回结果将是一个新的 CompletableFuture,这个新的 CompletableFuture 会组合原来的 CompletableFuture 和函数返回的 CompletableFuture。

现在就能看出 CompletableFuture 的强大了,它解决了自定义 ResponseFuture 的许多问题,该类有几十个方法,感兴趣的可以去一一尝试。

随处可见的 CompletableFuture

可以看到以前的版本只能在 RpcContext 中进行获取。而经过改良后,首先 RpcContext 一样能过获取,其次在过滤器链返回的 Result 中也能获取,可以从最新的代码中看到,原先的 RpcResult 类已经被去除,而在 AsyncRpcResult 也继承了 CompletableFuture<Result> 类,也就是说有 AsyncRpcResult 的地方,就有 CompletableFuture。并且在后续的 dubbo3.0 中,AsyncRpcResult 将会内置 CompletableFuture 类型的变量,CompletableFuture 的获取方式也会大大增加。

AsyncRpcResult 全面替代 RpcResult

接下来我就来讲解一下 AsyncRpcResult 类。

/**
 * 当相同的线程用于执行另一个 RPC 调用时,并且回调发生时,原来的 RpcContext 可能已经被更改。* 所以我们应该保留当前 RpcContext 实例的引用,并在执行回调之前恢复它。* 存储当前的 RpcContext
 */
private RpcContext storedContext;
/**
 * 存储当前的 ServerContext
 */
private RpcContext storedServerContext;

/**
 * 会话域
 */
private Invocation invocation;

public AsyncRpcResult(Invocation invocation) {
    // 设置会话域
    this.invocation = invocation;
    // 获得当前线程内代表消费者端的 Context
    this.storedContext = RpcContext.getContext();
    // 获得当前线程内代表服务端的 Context
    this.storedServerContext = RpcContext.getServerContext();}

/**
 * 转换成新的 AsyncRpcResult
 * @param asyncRpcResult
 */
public AsyncRpcResult(AsyncRpcResult asyncRpcResult) {this.invocation = asyncRpcResult.getInvocation();
    this.storedContext = asyncRpcResult.getStoredContext();
    this.storedServerContext = asyncRpcResult.getStoredServerContext();}

上面的是 AsyncRpcResult 核心的变量以及构造函数,storedContext 和 storedServerContext 存储了相关的 RpcContext 实例的引用,为的就是防止在回调的时候由于相同的线程用于执行另一个 RPC 调用导致原来的 RpcContext 可能已经被更改。所以存储下来后,我们需要在执行回调之前恢复它。具体的可以看下面的 thenApplyWithContext 方法。

@Override
public Object getValue() {
    // 获得计算的结果
    return getAppResponse().getValue();
}

@Override
public void setValue(Object value) {
    // 创建一个 AppResponse 实例
    AppResponse appResponse = new AppResponse();
    // 把结果放入 AppResponse
    appResponse.setValue(value);
    // 标志该 future 完成,并且把携带结果的 appResponse 设置为该 future 的结果
    this.complete(appResponse);
}

@Override
public Throwable getException() {
    // 获得抛出的异常信息
    return getAppResponse().getException();
}

@Override
public void setException(Throwable t) {
    // 创建一个 AppResponse 实例
    AppResponse appResponse = new AppResponse();
    // 把异常放入 appResponse
    appResponse.setException(t);
    // 标志该 future 完成,并且把携带异常的 appResponse 设置为该 future 的结果
    this.complete(appResponse);
}

@Override
public boolean hasException() {
    // 设置是否有抛出异常
    return getAppResponse().hasException();
}

public Result getAppResponse() {
    // 如果该结果计算完成,则直接调用 get 方法获得结果
    try {if (this.isDone()) {return this.get();
        }
    } catch (Exception e) {
        // This should never happen;
        logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
    }
    // 创建 AppResponse
    return new AppResponse();}

这些实现了 Result 接口的方法,可以发现其中都是调用了 AppResponse 的方法,AppResponse 跟 AsyncRpcResult 一样也继承了 AbstractResult,不过它是作为回调的数据结构。AppResponse 我会在异步化过滤器链回调中讲述。

@Override
public Object recreate() throws Throwable {
    // 强制类型转化
    RpcInvocation rpcInvocation = (RpcInvocation) invocation;
    // 如果返回的是 future 类型
    if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
        // 创建 AppResponse 实例
        AppResponse appResponse = new AppResponse();
        // 创建 future
        CompletableFuture<Object> future = new CompletableFuture<>();
        // appResponse 设置 future 值,因为返回的就是 CompletableFuture 类型
        appResponse.setValue(future);
        // 当该 AsyncRpcResult 完成的时候,把结果放入 future 中,这样返回的就是 CompletableFuture 包裹的结果
        this.whenComplete((result, t) -> {if (t != null) {if (t instanceof CompletionException) {t = t.getCause();
                }
                future.completeExceptionally(t);
            } else {if (result.hasException()) {future.completeExceptionally(result.getException());
                } else {future.complete(result.getValue());
                }
            }
        });
        // 重置
        return appResponse.recreate();} else if (this.isDone()) {
        // 如果完成,则直接重置
        return this.get().recreate();
    }
    // 如果返回类型不是 CompletableFuture,则调用 AppResponse 的重置
    return (new AppResponse()).recreate();}

该方法是重置,本来也是直接调用了 AppResponse 的方法,不过因为支持了以 CompletableFuture 为返回类型的服务方法调用,所以这里做了一些额外的逻辑,也就是把结果用 CompletableFuture 包裹,作为返回的结果放入 AppResponse 实例中。可以对标使用了 CompletableFuture 签名的服务。

@Override
public Result thenApplyWithContext(Function<Result, Result> fn) {
    // 当该 AsyncRpcResult 完成后,结果作为参数先执行 beforeContext,再执行 fn,最后执行 andThen
    this.thenApply(fn.compose(beforeContext).andThen(afterContext));
    // You may need to return a new Result instance representing the next async stage,
    // like thenApply will return a new CompletableFuture.
    return this;
}


/**
 * tmp context to use when the thread switch to Dubbo thread.
 * 临时的 RpcContext,当用户线程切换为 Dubbo 线程时候使用
 */
/**
 * 临时的 RpcContext
 */
private RpcContext tmpContext;
private RpcContext tmpServerContext;

private Function<Result, Result> beforeContext = (appResponse) -> {
    // 获得当前线程消费者端的 RpcContext
    tmpContext = RpcContext.getContext();
    // 获得当前线程服务端的 RpcContext
    tmpServerContext = RpcContext.getServerContext();
    // 重新设置消费者端的 RpcContext
    RpcContext.restoreContext(storedContext);
    // 重新设置服务端的 RpcContext
    RpcContext.restoreServerContext(storedServerContext);
    return appResponse;
};

private Function<Result, Result> afterContext = (appResponse) -> {
    // 重新把临时的 RpcContext 设置回去
    RpcContext.restoreContext(tmpContext);
    RpcContext.restoreServerContext(tmpServerContext);
    return appResponse;
};

把这几部分代码放在一起时因为当用户线程切换为 Dubbo 线程时候需要用到临时的 RpcContext 来记录,如何使用该 thenApplyWithContext 方法,我也会在异步化过滤器链回调中讲到。

其他的方法比较好理解,我就不一一讲解。

异步化过滤器链回调

如果看过前两篇关于发送请求和处理请求的过程,应该就知道在整个调用链中有许多的过滤器,而 Consumer 和 Provider 分别都有各自的过滤器来做一些功能增强。过滤器有执行链,也有回调链,如果整一个链路都是同步的,那么过滤器一旦增多,链路增长,就会带来请求响应时间的增加,这当然是最不想看到的事情。那如果把过滤器的调用链异步化,那么我们就可以用一个 future 来代替结果抛给下游,让下游不再阻塞。这样就大大降低了响应时间,节省资源,提升 RPC 响应性能。而这里的 future 就是下面要介绍的 AppResponse。那我先来介绍一下如何实现异步化过滤器链回调。我就拿消费端发送请求过程来举例子说明。

参考《dubbo 源码解析(四十六)消费端发送请求过程》的(六)ProtocolFilterWrapper 的内部类 CallbackRegistrationInvoker 的 invoke,可以看到当所有的过滤器执行完后,会遍历每一个过滤器链,获得上面所说的内部接口 Listener 实现类,进行异步回调,因为请求已经在(十四)DubboInvoker 的 doInvoke 中进行了发送,返回给下游一个 AsyncRpcResult,而 AsyncRpcResult 内包裹的是 AppResponse,可以看《dubbo 源码解析(四十七)服务端处理请求过程》的(二十三)AbstractProxyInvoker 的 invoke,当代理类执行相关方法后,会创建一个 AppResponse,把结果放入 AppResponse 中。所以 AsyncRpcResult 中包裹的是 AppResponse,然后调用回调方法 onResponse。并且会执行 thenApplyWithContext 把回调结果放入上下文中。而这个上下文如何避免相同的线程用于执行另一个 RPC 调用导致原来的 RpcContext 可能已经被更改的情况,我也在上面已经说明。

新增 AppResponse

AppResponse 继承了 AbstractResult,同样也是 CompletableFuture<Result> 类型,但是 AppResponse 跟 AsyncRpcResult 职能不一样,AsyncRpcResult 作为一个 future,而 AppResponse 可以说是作为 rpc 调用结果的一个数据结构,它的实现很简单,就是封装了以下三个属性和对应的一些方法。

/**
 * 调用结果
 */
private Object result;

/**
 * rpc 调用时的异常
 */
private Throwable exception;

/**
 * 附加值
 */
private Map<String, String> attachments = new HashMap<String, String>();

前面我也讲了,Provider 处理请求完成后,会把结果放在 AppResponse 内,在整个链路调用过程中 AsyncRpcResult 内部必然会有一个 AppResponse 存在,而为上文提到的过滤器内置接口 Listener 的 onResponse 方法中的 appResponse 就是 AppResponse 类型的,它作为一个回调的数据类型。

后记

该文章讲解了 dubbo 2.7.x 版本对于异步化改造的介绍,上面只是罗列了所有改动的点,没有具体讲述在哪些新增功能上的应用,如果感兴趣,可以参考前几篇的调用过程文章,来看看新增的功能点如何运用上述的设计的,比如 Provider 异步,有一种实现方式就用到了上述的 InvokeMode。接下来一篇我会讲述元数据的改造。

退出移动版