作者:vivo 互联网服务器团队 - Wang Fei、LinYupan
Dubbo 泛化调用个性能够在不依赖服务接口 API 包的场景中发动近程调用, 这种个性特地适宜框架集成和网关类利用开发。
本文联合在理论开发过程中所遇到的须要近程调用多个三方零碎的问题,论述了如何利用 Dubbo 泛化调用来简化开发升高零碎耦合性的我的项目实际,最初对 Dubbo 泛化调用的原理进行了深度解析。
一、背景
对立配置平台是一个提供终端设备各个模块进行文件配置和文件下发能力的平台,模块开发在后盾服务器进行文件配置,而后终端设备能够依照特定规定获取对应的配置文件,文件下发能够依照多种设施维度进行下发,具体我的项目框架能够加入下图:
现有的下发策略,都是由模块开发在对立配置后盾服务器进行下发维度配置,文件是否下发到对应终端设备,由用户在本平台所抉择的维度所确定。
然而其余业务方也存在在公司外部的 A / B 试验平台配置下发规定,来借助对立配置平台每天轮询服务器申请新文件的能力,然而在对立配置平台配置的文件是否可能下发由 A / B 试验平台来确定,A/ B 试验平台内会配置对应的规定以及配置对立配置平台对应的文件 id,而后对立配置平台就须要针对申请调用 A / B 试验平台接口来判断文件是否能够下发。
随着公司外部试验平台的减少,越来越多这种由三方平台来决定文件是否下发的对接需要,如何更好更快的应答这种相似的对接需要,是咱们须要去深刻思考的问题。
二、计划选型
原有对立配置的下发逻辑是先找到所有能够下发的文件,而后判断单个配置文件是否满足设施维度,如果满足则能够下发。当初在对接 A / B 试验平台当前,文件是否能下发还须要由内部零碎来确定,过后设计时思考过两种计划:
- 计划一:
同样先找到所有能够下发的文件,而后针对单个文件依照①设施维度判断是否匹配,而后②调用 A / B 试验平台的接口获取这台设施能够下发的文件 Id,再调用③灰度试验平台获取这台设施能够下发的文件 id,最初将前三步获取到的配置文件 id 进行汇总失去能够下发的文件,如下图所示。
计划一突破了原来文件是否可能下发的判断逻辑,当初除了原有的判断逻辑,还须要额定步骤调用其余零碎来追加另外能够下发的文件。并且后续不可避免对接其余三方零碎,计划一须要一直减少调用三方接口的逻辑来追加能够下发的文件 id。此外惯例的 dubbo 调用在 provider 端须要引入其余试验零碎的二方库以及模型类型,减少了对立配置零碎和其余零碎的强耦合性。
- 计划二:
利用 Dubbo 泛化调用高级个性形象一个下发维度(近程调用),专门用于其余想由三方试验零碎来决定是否下发文件的场景,如下图所示:
计划二对立形象一个近程调用下发维度,能够放弃原有的判断逻辑,也就是先把零碎中所有能够下发的文件先查找进去,而后依据设施维度进行匹配,如果某一个文件配置的是近程调用维度,那么查找这个近程调用维度所蕴含的函数名称、参数类型数组和参数值对象数组,而后调用三方接口,从而判断这个文件是否能够下发到设施,最终获取到能够下发的文件 id 列表。
此外,利用 Dubbo 泛化调用高级个性,调用方并不关怀提供者的接口的具体定义,只须要关注调用哪个办法,传什么参数以及接管到什么返回后果即可,这样防止须要依赖服务提供者的二方库以及模型类元,这样能够大大降低 consumer 端和 provider 端的耦合性。
综合下面的剖析,咱们最终确定了计划二采取利用 Dubbo 泛化调用来形象一个对立维度的形式,上面来看一下具体的实现。
三、具体实现
1.GenericService 是 Dubbo 提供的泛化接口,用来进行泛化调用。只提供了一个 $invoke 办法,三个入口参数别离为函数名称、参数类型数组和参数值对象数组。
package com.alibaba.dubbo.rpc.service;
/**
* Generic service interface
*
* @export
*/
public interface GenericService {
/**
* Generic invocation
*
* @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws Throwable potential exception thrown from the invocation
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
- 创立服务援用配置对象 ReferenceConfig。
private ReferenceConfig<GenericService> buildReferenceConfig(RemoteDubboRestrictionConfig config) {ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(applicationConfig);
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(config.getInterfaceName());
referenceConfig.setVersion(config.getVersion());
referenceConfig.setGeneric(Boolean.TRUE.toString());
referenceConfig.setCheck(false);
referenceConfig.setTimeout(DUBBO_INVOKE_TIMEOUT);
referenceConfig.setRetries(DUBBO_INVOKE_RETRIES);
return referenceConfig;
}
3. 设置申请参数及服务调用, 这里利用在后盾所配置的残缺办法名、参数类型数组和参数值数组就能够进行服务调用。
public List<Integer> invoke(RemoteDubboRestrictionConfig config, ConfigFileListQuery listQuery) {
// 因为 ReferenceConfig 很分量,外面封装了所有与注册核心及服务提供方连贯,所以这里做了缓存
GenericService genericService = prepareGenericService(config);
// 构建参数
Map<String, Object> params = buildParams(listQuery);
String method = config.getMethod();
String[] parameterTypeArray = new String[]{Map.class.getName()};
Object[] parameterValueArray = new Object[]{params};
long begin = System.currentTimeMillis();
Assert.notNull(genericService, "cannot find genericService");
// 具体调用
Object result = genericService.$invoke(method, parameterTypeArray, parameterValueArray);
if (logger.isDebugEnabled()) {long duration = System.currentTimeMillis() - begin;
logger.debug("Dubbo 调用后果:{}, 耗时: {}", result, duration);
}
return result == null ? Collections.emptyList() : (List<Integer>) result;
}
那么为什么 Dubbo 泛化调用所波及的调用方并不关怀提供者的接口的具体定义,只须要关注调用哪个办法,传什么参数以及接管到什么返回后果即可呢?
在解说泛化调用的实现原理之前,先简略讲述一下间接调用的原理。
四、Dubbo 间接调用相干原理
Dubbo 的间接调用相干原理波及到两个方面:Dubbo 服务裸露原理和 Dubbo 服务生产原理
4.1 Dubbo 服务裸露原理
4.1.1 服务近程裸露的整体流程
在整体上看,Dubbo 框架做服务裸露分为两大部分,第一步将持有的服务实例通过代理转 换成 Invoker, 第二步会把 Invoker 通过具体的协定(比方 Dubbo)转换成 Exporter, 框架做了 这层形象也大大不便了性能扩大。
这里的 Invoker 能够简略了解成一个实在的服务对象实例,是 Dubbo 框架实体域,所有模型都会向它聚拢,可向它发动 invoke 调用。它可能是一个本地的实 现,也可能是一个近程的实现,还可能是一个集群实现。
源代码如下:
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {logger.info("Export dubbo service" + interfaceClass.getName() + "to url" + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {logger.info("Register dubbo service" + interfaceClass.getName() + "url" + url + "to registry" + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 向注册核心注册服务信息
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
首先将实现类 ref 封装为 Invoker,之后将 invoker 转换为 exporter,最初将 exporter 放入缓存 Listexporters 中。
4.1.2 服务裸露的细节
4.1.2.1 将实现类 ref 封装为 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
① dubbo 近程裸露的入口在 ServiceBean 的 export()办法,因为 servicebean’ 继承了 serviceconfig 类,于是真正执行裸露的逻辑是 serviceconfig 的 doExport()办法。
② Dubbo 反对雷同服务裸露多个协定,比方同时裸露 Dubbo 和 REST 协定,也反对多个注册核心,比方 zookeeper 和 nacos,框架外部会顺次 对应用的协定都做一次服务裸露,每个协定注册元数据都会写入多个注册核心,具体是执行 doExportUrlsFor1Protocol。
③ 而后通过动静代理的形式创立 Invoker 对象,在服务端生成 AbstractProxylnvoker 实例,所有实在的办法调用都会委托给代理,而后代理转发给服务实现者 ref 调用;动静代理个别有:JavassistProxyFactory 和 JdkProxyFactory 两种形式,这里所选用的 JavassistProxyFactory。
4.1.2.2 将 invoker 转换为 exporter
Exporter exporter= protocol.export(wrapperInvoker);
Exporter<?> exporter = protocol.export(wrapperInvoker);
在将服务实例 ref 转换成 Invoker 之后,开始执行服务裸露过程。
这里会通过一系列的过滤器链路,最终会通过 RegistryProtocol#export 进行更细粒度的管制,比方先进行服务裸露再注册服务元数据。注册核心在做服务裸露时顺次 做了以下几件事件:
- 委托具体协定 (Dubbo) 进行服务裸露,创立 NettyServer 监听端口和保留服务实例。
- 创立注册核心对象,与注册核心创立 TCP 连贯。
- 注册服务元数据到注册核心。
- 订阅 configurators 节点,监听服务动静属性变更事件。
- 服务销毁收尾工作,比方敞开端口、反注册服务信息等。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
//TODO 注册服务元数据
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
这里咱们重点解说委托具体协定进行服务裸露的过程 doLocalExport(final InvokeroriginInvoker)。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
(Exporter) protocol.export(invokerDelegete)办法又会通过一系列的拦截器进行解决,最终调用 DubboProtocol 的 export 办法。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
这里很重要的一点就是将 exporter 放到了缓存,这里的 key 是 serviceGroup/serviceName:serviceVersion:port 这样模式,这里最初获取到的是 com.alibaba.dubbo.demo.DemoService:20880,之后创立 DubboExporter。这里的内存缓存 exporterMap 是很重要的一个属性,在后续消费者调用服务提供者时会被被再次应用到。
至此,服务器提供者的近程裸露流程就根本介绍结束。
4.2 Dubbo 服务生产的实现原理
4.2.1 服务生产的整体流程
在整体上看,Dubbo 框架做服务生产也分为两大部分,第一步通过持有近程服务实例生成 Invoker, 这个 Invoker 在客户端是外围的近程代理对象。第二步会把 Invoker 通过动静代理转换 成实现用户接口的动静代理援用。这里的 Invoker 承载了网络连接、服务调用和重试等性能,在 客户端,它可能是一个近程的实现,也可能是一个集群实现。
源代码如下:
public Object getObject() throws Exception {return get();
}
public synchronized T get() {if (destroyed) {throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {init();
}
return ref;
}
private void init() {
...
ref = createProxy(map);
}
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 创立服务代理
return (T) proxyFactory.getProxy(invoker);
}
4.2.2 服务生产的细节
4.2.2.1 应用 Protocol 将 interfaceClass 转化为 Invoker
invoker = refprotocol.refer(interfaceClass, url);
① 服务援用的入口点在 ReferenceBean#getObject,因为 Referencebean’ 继承了 serviceconfig 类,接着会调用 Reference 的 get 办法。
② 而后依据援用的接口类型将持有近程服务实例生成 Invoker。
③ 通过一系列的过滤器链,最初调用 RegistryProtocol 的 doRefer 办法。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
这段逻辑次要实现了注册核心实例的创立,元数据注册到注册核心及订阅的性能。
具体近程 Invoker 是在哪里创立的呢?客户端调用拦截器又是在哪里结构的呢?
当在 directory.subscrib()中 第一次发动订阅时会进行一次数据拉取操作,同时触发 RegistryDirectory#notify 办法,这里 的告诉数据是某一个类别的全量数据,比方 providers 和 routers 类别数据。当告诉 providers 数 据时,在 RegistryDirectory#toInvokers 办法内实现 Invoker 转换。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
......
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
.........
if (enabled) {invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
外围代码
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
这里会通过一系列的过滤器链,而后最终调用 DubboProtocol 的 refer 办法,来创立具体的 invoker。
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
这里返回的 invoker 会用来更新 RegistryDirectory 的 methodInvokerMap 属性,最终在理论调用生产端办法时,会依据 method 找到对应的 invoker 列表。
private void refreshInvoker(List<URL> invokerUrls) {if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);
} else {this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {return;}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {logger.warn("destroyUnusedInvokers error.", e);
}
}
}
4.2.2.2 应用 ProxyFactory 创立代理
(T) proxyFactory.getProxy(invoker)
上述的 proxyFactory 是 ProxyFactory$Adaptive 实例,其 getProxy 外部最终失去是一个被 StubProxyFactoryWrapper 包装后的 JavassistProxyFactory。间接来看 JavassistProxyFactory.getProxy 办法。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
【invoker】:MockClusterInvoker 实例
【interfaces】:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
咱们最终返回的代理对象其实是一个 proxy0 对象,当咱们调用其 sayHello 办法时,其调用外部的 handler.invoke 办法。
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.demo.DemoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy0 implements DemoService {public static Method[] methods;
private InvocationHandler handler;
public String sayHello(String paramString) {Object[] arrayOfObject = new Object[1];
arrayOfObject[0] = paramString;
Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
return (String) localObject;
}
public proxy0() {}
public proxy0(InvocationHandler paramInvocationHandler) {this.handler = paramInvocationHandler;}
}
Dubbo 泛化调用个别是服务器提供者都采纳间接裸露的模式,消费者端采纳服务泛化调用的模式,所以这里重点探讨 Dubbo 泛化调用与间接调用在消费者端的服务援用和发动生产的区别与分割。
五、Dubbo 泛化调用与间接调用的区别与分割
5.1 通过持有近程服务实例生成 Invoker
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 创立服务代理
return (T) proxyFactory.getProxy(invoker);
}
这里的 interfaceClass 的起源不一样,createProxy(Mapmap) 是在 ReferenceConfig 的 init()办法中调用的,具体的 interfaceClass 依据是否是返回调用会有所区别,具体看如下代码:
private void init() {
...
if (ProtocolUtils.isGeneric(getGeneric())) {interfaceClass = GenericService.class;} else {
try {interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
...
ref = createProxy(map);
}
间接调用:interfaceClass→com.alibaba.dubbo.demo.DemoService
泛化调用:interfaceClass→com.alibaba.dubbo.rpc.service.GenericService
最终获取的 invoker 也不一样
间接调用:
interface com.alibaba.dubbo.demo.DemoService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=24932&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640744945905&side=consumer×tamp=1640745033688
泛化调用:
interface com.alibaba.dubbo.rpc.service.GenericService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=true&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=27952&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640748337173&side=consumer×tamp=1640748368427
5.2 服务发动生产流程
在 4.2.2 服务消费者发动申请细节第①步是将申请参数(办法名,办法参数类型,办法参数值,服务名,附加参数)封装成一个 Invocation。
间接调用的 RpcInvoaction 如下:
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={}]
泛化调用的 RpcInvoaction 如下:
RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
咱们能够发现这里生成的 RpcInvocation 对象是有区别的,然而服务提供者裸露的服务是不会发生变化的,所以这里必然有一个转换过程,这里的参数转换要害就在于服务提供者端的 GenericImplFilter 类。
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {protected final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {logger.info("----------------GenericFilter-------------------------");
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isBlank(generic)) {generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {...} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {...}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {return new RpcResult(new GenericException(result.getException()));
}
RpcResult rpcResult;
if (ProtocolUtils.isJavaGenericSerialization(generic)) {...} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {...} else {rpcResult = new RpcResult(PojoUtils.generalize(result.getValue()));
}
rpcResult.setAttachments(result.getAttachments());
return rpcResult;
} catch (NoSuchMethodException e) {throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {throw new RpcException(e.getMessage(), e);
}
}
return invoker.invoke(inv);
}
}
外围流程:
① 是否是泛化调用判断
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
② 参数的提取
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
③ 参数的序列化,再结构新的 RpcInvocation 对象
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
}
...
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
序列化前 RpcInvocation 对象:
RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
序列化后 RpcInvocation 对象:
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]
前面的调用逻辑就和间接调用的是统一的了,比方从本地缓存从本地缓存中 Map<string, list<invoker>> methodInvokerMap 中获取 key 为 sayHello(指定办法名)的 List<invoker>,接着进行后续的调用。
那么什么时候触发 GenericFilter 的 invoke 办法呢,这里其实就和过滤器的调用链建设有关系了,从 GenericFilter 类上的注解,咱们能够看到 @Activate(group = Constants.PROVIDER, order = -20000),阐明是在服务提供者端失效的。
另外,服务提供者端是如何晓得调用是间接调用还是泛化调用的,这里就波及到与服务提供者端 GenericFilter 对应的消费者端的 GenericImplFilter 类,代码如下:
/**
* GenericImplInvokerFilter
*/
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
if (ProtocolUtils.isGeneric(generic)
&& !Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {...}
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {Object[] args = (Object[]) invocation.getArguments()[2];
if (ProtocolUtils.isJavaGenericSerialization(generic)) {for (Object arg : args) {if (!(byte[].class == arg.getClass())) {error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {for (Object arg : args) {if (!(arg instanceof JavaBeanDescriptor)) {error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
((RpcInvocation) invocation).setAttachment(Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
return invoker.invoke(invocation);
}
5.3 泛化调用的整体流程图
六、总结
高内聚低耦合是咱们架构设计的一个重要指标,而 Dubbo 的泛化调用个性仅仅只需晓得服务的残缺接口门路、申请参数类型和申请参数值就能够间接进行调用获取申请后果,可能防止依赖特定三方 jar 包,从而升高了零碎的耦合性。在日常学习和开发的过程中,咱们除了须要关注一门技术的惯例应用办法以外,还须要关注一些高级个性,从而做出更适合的架构设计。
参考资料:
- 《深刻了解 Apache Dubbo 与实战》诣极, 林琳
- Dubbo 源码剖析 - 泛化调用
- Dubbo 泛化调用应用及原理解析