远程调用——开篇
目标:介绍之后解读远程调用模块的内容如何编排、介绍 dubbo-rpc-api 中的包结构设计以及最外层的的源码解析。
前言
最近我面临着一个选择,因为 dubbo 2.7.0-release 出现在了仓库里,最近一直在进行 2.7.0 版本的 code review,那我之前说这一系列的文章都是讲述 2.6.x 版本的源代码,我现在要不要选择直接开始讲解 2.7.0 的版本的源码呢?我最后还是决定继续讲解 2.6.x,因为我觉得还是有很多公司在用着 2.6.x 的版本,并且对于升级 2.7.0 的计划应该还没那么快,并且在了解 2.6.x 版本的原理后,再去了解 2.7.0 新增的特性会更加容易,也能够品位到设计者的意图。当然在结束 2.6.x 的重要模块讲解后,我也会对 2.7.0 的新特性以及实现原理做一个全面的分析,2.7.0 作为 dubbo 社区的毕业版,更加强大,敬请期待。
前面讲了很多的内容,现在开始将远程调用 RPC,好像又回到我第一篇文章《dubbo 源码解析(一)Hello,Dubbo》,在这篇文章开头我讲到了什么叫做 RPC,再通俗一点讲,就是我把一个项目的两部分代码分开来,分别放到两台机器上,当我部署在 A 服务器上的应用想要调用部署在 B 服务器上的应用等方法,由于不存在同一个内存空间,不能直接调用。而其实整个 dubbo 都在做远程调用的事情,它涉及到很多内容,比如配置、代理、集群、监控等等,那么这次讲的内容是只关心一对一的调用,dubbo-rpc 远程调用模块抽象各种协议,以及动态代理,Proxy 层和 Protocol 层 rpc 的核心,我将会在本系列中讲到。下面我们来看两张官方文档的图:
暴露服务的时序图:
你会发现其中有我们以前讲到的 Transporter、Server、Registry,而这次的系列将会讲到的就是红色框框内的部分。
引用服务时序图
在引用服务时序图中,对应的也是红色框框的部分。
当阅读完该系列后,希望能对这个调用链有所感悟。接下来看看 dubbo-rpc 的包结构:
可以看到有很多包,很规整,其中 dubbo-rpc-api 是对协议、暴露、引用、代理等的抽象和实现,是 rpc 整个设计的核心内容。其他的包则是 dubbo 支持的 9 种协议,在官方文档也能查看介绍,并且包括一种本地调用 injvm。那么我们再来看看 dubbo-rpc-api 中包结构:
filter 包:在进行服务引用时会进行一系列的过滤。其中包括了很多过滤器。
listener 包:看上面两张服务引用和服务暴露的时序图,发现有两个 listener,其中的逻辑实现就在这个包内
protocol 包:这个包实现了协议的一些公共逻辑
proxy 包:实现了代理的逻辑。
service 包:其中包含了一个需要调用的方法等封装抽象。
support 包:包括了工具类
最外层的实现。
下面的篇幅设计,本文会讲解最外层的源码和 service 下的源码,support 包下的源码我会穿插在其他用到的地方一并讲解,filter、listener、protocol、proxy 以及各类协议的实现各自用一篇来讲。
源码分析
(一)Invoker
public interface Invoker<T> extends Node {
/**
* get service interface.
* 获得服务接口
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
* 调用下一个会话域
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
该接口是实体域,它是 dubbo 的核心模型,其他模型都向它靠拢,或者转化成它,它代表了一个可执行体,可以向它发起 invoke 调用,这个有可能是一个本地的实现,也可能是一个远程的实现,也可能是一个集群的实现。它代表了一次调用
(二)Invocation
public interface Invocation {
/**
* get method name.
* 获得方法名称
* @return method name.
* @serial
*/
String getMethodName();
/**
* get parameter types.
* 获得参数类型
* @return parameter types.
* @serial
*/
Class<?>[] getParameterTypes();
/**
* get arguments.
* 获得参数
* @return arguments.
* @serial
*/
Object[] getArguments();
/**
* get attachments.
* 获得附加值集合
* @return attachments.
* @serial
*/
Map<String, String> getAttachments();
/**
* get attachment by key.
* 获得附加值
* @return attachment value.
* @serial
*/
String getAttachment(String key);
/**
* get attachment by key with default value.
* 获得附加值
* @return attachment value.
* @serial
*/
String getAttachment(String key, String defaultValue);
/**
* get the invoker in current context.
* 获得当前上下文的 invoker
* @return invoker.
* @transient
*/
Invoker<?> getInvoker();
}
Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。
(三)Exporter
public interface Exporter<T> {
/**
* get invoker.
* 获得对应的实体域 invoker
* @return invoker
*/
Invoker<T> getInvoker();
/**
* unexport.
* 取消暴露
* <p>
* <code>
* getInvoker().destroy();
* </code>
*/
void unexport();
}
该接口是暴露服务的接口,定义了两个方法分别是获得 invoker 和取消暴露服务。
(四)ExporterListener
@SPI
public interface ExporterListener {
/**
* The exporter exported.
* 暴露服务
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#export(Invoker)
*/
void exported(Exporter<?> exporter) throws RpcException;
/**
* The exporter unexported.
* 取消暴露
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Exporter#unexport()
*/
void unexported(Exporter<?> exporter);
}
该接口是服务暴露的监听器接口,定义了两个方法是暴露和取消暴露,参数都是 Exporter 类型的。
(五)Protocol
@SPI(“dubbo”)
public interface Protocol {
/**
* Get default port when user doesn’t config the port.
* 获得默认的端口
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent, that is, there’s no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
* 暴露服务方法,
* @param <T> Service type 服务类型
* @param invoker Service invoker 服务的实体域
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which’s returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It’s protocol’s responsibility to implement `Invoker` which’s returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there’s check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
* 引用服务方法
* @param <T> Service type 服务类型
* @param type Service class 服务类名
* @param url URL address for the remote service
* @return invoker service’s local proxy
* @throws RpcException when there’s any error while connecting to the service provider
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources, for example: connection, port, etc. <br>
* 3. Protocol can continue to export and refer new service even after it’s destroyed.
*/
void destroy();
}
该接口是服务域接口,也是协议接口,它是一个可扩展的接口,默认实现的是 dubbo 协议。定义了四个方法,关键的是服务暴露和引用两个方法。
(六)Filter
@SPI
public interface Filter {
/**
* do invoke filter.
* <p>
* <code>
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
* </code>
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
}
该接口是 invoker 调用时过滤器接口,其中就只有一个 invoke 方法。在该方法中对调用进行过滤
(七)InvokerListener
@SPI
public interface InvokerListener {
/**
* The invoker referred
* 在服务引用的时候进行监听
* @param invoker
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#refer(Class, com.alibaba.dubbo.common.URL)
*/
void referred(Invoker<?> invoker) throws RpcException;
/**
* The invoker destroyed.
* 销毁实体域
* @param invoker
* @see com.alibaba.dubbo.rpc.Invoker#destroy()
*/
void destroyed(Invoker<?> invoker);
}
该接口是实体域的监听器,定义了两个方法,分别是服务引用和销毁的时候执行的方法。
(八)Result
该接口是实体域执行 invoke 的结果接口,里面定义了获得结果异常以及附加值等方法。比较好理解我就不贴代码了。
(九)ProxyFactory
@SPI(“javassist”)
public interface ProxyFactory {
/**
* create proxy.
* 创建一个代理
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create proxy.
* 创建一个代理
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
/**
* create invoker.
* 创建一个实体域
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
该接口是代理工厂接口,它也是个可扩展接口,默认实现 javassist,dubbo 提供两种动态代理方法分别是 javassist/jdk,该接口定义了三个方法,前两个方法是通过 invoker 创建代理,最后一个是通过代理来获得 invoker。
(十)RpcContext
该类就是远程调用的上下文,贯穿着整个调用,例如 A 调用 B,然后 B 调用 C。在服务 B 上,RpcContext 在 B 之前将调用信息从 A 保存到 B。开始调用 C,并在 B 调用 C 后将调用信息从 B 保存到 C。RpcContext 保存了调用信息。
public class RpcContext {
/**
* use internal thread local to improve performance
* 本地上下文
*/
private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
/**
* 服务上下文
*/
private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
/**
* 附加值集合
*/
private final Map<String, String> attachments = new HashMap<String, String>();
/**
* 上下文值
*/
private final Map<String, Object> values = new HashMap<String, Object>();
/**
* 线程结果
*/
private Future<?> future;
/**
* url 集合
*/
private List<URL> urls;
/**
* 当前的 url
*/
private URL url;
/**
* 方法名称
*/
private String methodName;
/**
* 参数类型集合
*/
private Class<?>[] parameterTypes;
/**
* 参数集合
*/
private Object[] arguments;
/**
* 本地地址
*/
private InetSocketAddress localAddress;
/**
* 远程地址
*/
private InetSocketAddress remoteAddress;
/**
* 实体域集合
*/
@Deprecated
private List<Invoker<?>> invokers;
/**
* 实体域
*/
@Deprecated
private Invoker<?> invoker;
/**
* 会话域
*/
@Deprecated
private Invocation invocation;
// now we don’t use the ‘values’ map to hold these objects
// we want these objects to be as generic as possible
/**
* 请求
*/
private Object request;
/**
* 响应
*/
private Object response;
该类中最重要的是它的一些属性,因为该上下文就是用来保存信息的。方法我就不介绍了,因为比较简单。
(十一)RpcException
/**
* 不知道异常
*/
public static final int UNKNOWN_EXCEPTION = 0;
/**
* 网络异常
*/
public static final int NETWORK_EXCEPTION = 1;
/**
* 超时异常
*/
public static final int TIMEOUT_EXCEPTION = 2;
/**
* 基础异常
*/
public static final int BIZ_EXCEPTION = 3;
/**
* 禁止访问异常
*/
public static final int FORBIDDEN_EXCEPTION = 4;
/**
* 序列化异常
*/
public static final int SERIALIZATION_EXCEPTION = 5;
该类是 rpc 调用抛出的异常类,其中封装了五种通用的错误码。
(十二)RpcInvocation
/**
* 方法名称
*/
private String methodName;
/**
* 参数类型集合
*/
private Class<?>[] parameterTypes;
/**
* 参数集合
*/
private Object[] arguments;
/**
* 附加值
*/
private Map<String, String> attachments;
/**
* 实体域
*/
private transient Invoker<?> invoker;
该类实现了 Invocation 接口,是 rpc 的会话域,其中的方法比较简单,主要是封装了上述的属性。
(十三)RpcResult
/**
* 结果
*/
private Object result;
/**
* 异常
*/
private Throwable exception;
/**
* 附加值
*/
private Map<String, String> attachments = new HashMap<String, String>();
该类实现了 Result 接口,是 rpc 的结果实现类,其中关键是封装了以上三个属性。
(十四)RpcStatus
该类是 rpc 的一些状态监控,其中封装了许多的计数器,用来记录 rpc 调用的状态。
1. 属性
/**
* uri 对应的状态集合,key 为 uri,value 为 RpcStatus 对象
*/
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
/**
* method 对应的状态集合,key 是 uri,第二个 key 是方法名 methodName
*/
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
/**
* 已经没用了
*/
private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
/**
* 活跃状态
*/
private final AtomicInteger active = new AtomicInteger();
/**
* 总的数量
*/
private final AtomicLong total = new AtomicLong();
/**
* 失败的个数
*/
private final AtomicInteger failed = new AtomicInteger();
/**
* 总计过期个数
*/
private final AtomicLong totalElapsed = new AtomicLong();
/**
* 失败的累计值
*/
private final AtomicLong failedElapsed = new AtomicLong();
/**
* 最大火气的累计值
*/
private final AtomicLong maxElapsed = new AtomicLong();
/**
* 最大失败累计值
*/
private final AtomicLong failedMaxElapsed = new AtomicLong();
/**
* 成功最大累计值
*/
private final AtomicLong succeededMaxElapsed = new AtomicLong();
/**
* Semaphore used to control concurrency limit set by `executes`
* 信号量用来控制 `execution` 设置的并发限制
*/
private volatile Semaphore executesLimit;
/**
* 用来控制 `execution` 设置的许可证
*/
private volatile int executesPermits;
以上是该类的属性,可以看到保存了很多的计数器,分别用来记录了失败调用成功调用等累计数。
2.beginCount
/**
* 开始计数
* @param url
*/
public static void beginCount(URL url, String methodName) {
// 对该 url 对应对活跃计数器加一
beginCount(getStatus(url));
// 对该方法对活跃计数器加一
beginCount(getStatus(url, methodName));
}
/**
* 以原子方式加 1
* @param status
*/
private static void beginCount(RpcStatus status) {
status.active.incrementAndGet();
}
该方法是增加计数。
3.endCount
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
// url 对应的状态中计数器减一
endCount(getStatus(url), elapsed, succeeded);
// 方法对应的状态中计数器减一
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
// 活跃计数器减一
status.active.decrementAndGet();
// 总计数器加 1
status.total.incrementAndGet();
// 过期的计数器加上过期个数
status.totalElapsed.addAndGet(elapsed);
// 如果最大的过期数小于 elapsed,则设置最大过期数
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
// 如果 rpc 调用成功
if (succeeded) {
// 如果成功的最大值小于 elapsed,则设置成功最大值
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
// 失败计数器加一
status.failed.incrementAndGet();
// 失败的过期数加上 elapsed
status.failedElapsed.addAndGet(elapsed);
// 失败最大值小于 elapsed,则设置失败最大值
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}
该方法是计数器减少。
(十五)StaticContext
该类是系统上下文,仅供内部使用。
/**
* 系统名称
*/
private static final String SYSTEMNAME = “system”;
/**
* 系统上下文集合,仅供内部使用
*/
private static final ConcurrentMap<String, StaticContext> context_map = new ConcurrentHashMap<String, StaticContext>();
/**
* 系统上下文名称
*/
private String name;
上面是该类的属性,它还记录了所有的系统上下文集合。
(十六)EchoService
public interface EchoService {
/**
* echo test.
* 回声测试
* @param message message.
* @return message.
*/
Object $echo(Object message);
}
该接口是回声服务接口,定义了一个一个回声测试的方法,回声测试用于检测服务是否可用,回声测试按照正常请求流程执行,能够测试整个调用是否通畅,可用于监控,所有服务自动实现该接口,只需将任意服务强制转化为 EchoService,就可以用了。
(十七)GenericException
该方法是通用的异常类。
/**
* 异常类名
*/
private String exceptionClass;
/**
* 异常信息
*/
private String exceptionMessage;
比较简单,就封装了两个属性。
(十八)GenericService
public interface GenericService {
/**
* Generic invocation
* 通用的会话域
* @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws Throwable potential exception thrown from the invocation
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
}
该接口是通用的服务接口,同样定义了一个类似 invoke 的方法
后记
该部分相关的源码解析地址:https://github.com/CrazyHZM/i…
该文章讲解了远程调用的开篇,介绍之后解读远程调用模块的内容如何编排、介绍 dubbo-rpc-api 中的包结构设计以及最外层的的源码解析,其中的逻辑不负责,要关注的是其中的一些概念和 dubbo 如何去做暴露服务和引用服务,其中很多的接口定义需要弄清楚。接下来我将开始对 rpc 模块的过滤器进行讲解。