1、暴露服务到远程上一篇文章分析了暴露服务到本地,6、Dubbo的服务导出1之导出到本地。接下来分析暴露服务到远程。
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { // 添加动态参数,此动态参数是决定Zookeeper创建临时节点还是持久节点 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } // 步骤1)创建Invoker,这里创建Invoker逻辑和上面一样 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded( Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 步骤2)暴露服务 Exporter<?> exporter = ServiceConfig.protocol.export(wrapperInvoker); exporters.add(exporter); } }} /** * 下面分析步骤2,该方法两大核心逻辑,导出服务和注册服务,服务注册下篇文章分析 */@Overridepublic <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 1. 导出服务,export invoker,本篇文章仅分析第一步导出服务到远程 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // zookeeper://10.101.99.127:2181/com.alibaba.dubbo.registry.RegistryService // ?application=demo-provider&dubbo=2.0.2 URL registryUrl = getRegistryUrl(originInvoker); // registry provider,默认返回ZookeeperRegistry实例 final Registry registry = getRegistry(originInvoker); // dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&default.server=netty4&dubbo=2.0.2&generic=false // &interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8140&side=provider final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // 不配置的话默认返回true boolean register = registeredProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 2.注册服务,这篇文章已经比较长了,决定将步骤2和步骤3新起一篇文章分析,服务暴露之后需要注册到注册中心 if (register) { register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // 3.数据更新订阅 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);} private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 调用protocol的export方法导出服务,默认是采用Dubbo协议,对应DubboProtocol的export方法 // 但是这里protocol.export()并不是先走DubboProtocol的export方法,而是先走 // ProtocolListenerWrapper的wrapper方法 // 因为ProtocolListenerWrapper对DubboProtocol做了一层包装,具体参考 // https://segmentfault.com/a/1190000020387196,核心方法protocal.export() exporter = new ExporterChangeableWrapper<T>( (Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter;}/** * 上述核心方法protocol.export()会先走到ProtocolListenerWrapper的export方法,该方法是在服务暴露上做了 监听器功能的增强,也就是加上了监听器 */@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 如果是注册中心,则暴露该invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 创建一个暴露者监听器包装类对象,暴露服务时这里的protocol是ProtocolFilterWrapper,这里用到了 // Wrapper包装原有的DubboProtocol,可以参考https://segmentfault.com/a/1190000020387196 return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));} /** * ProtocolFilterWrapper的export方法,该方法是在服务暴露上做了过滤器链的增强,也就是加上了过滤器 */@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 如果是注册中心,则直接暴露服务 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 服务提供侧暴露服务,这里通过buildInvokerChain形成了过滤器链 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}/** * 该方法就是创建带Filter链的Invoker对象,倒序的把每一个过滤器串连起来,形成一个invoker */private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 获得过滤器的所有扩展实现类实例集合 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class). getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { // 从最后一个过滤器开始循环,创建一个带有过滤器链的invoker对象 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } // 关键在这里,调用下一个filter代表的invoker,把每一个过滤器串起来 @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last;}// 经过两个Wrapper的export方法包装之后,走到DubboProtocol的export方法,这里是核心方法public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // url形如dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&bind.ip=172.22.213.93&bind.port=20880&dubbo=2.0.2&generic=false // /&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=648&qos.port=22222 // &side=provider×tamp=1569585915258 URL url = invoker.getUrl(); // 获取服务标识,理解成服务坐标也行,由服务组名,服务名,服务版本号以及端口组成,key形如 // com.alibaba.dubbo.demo.DemoService:20880 String key = serviceKey(url); // 创建DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 将<key, exporter>键值对放入缓存中 exporterMap.put(key, exporter); // 本地存根相关代码, export an stub service for dispatching event // 删除,暂时还没有分析本地存根相关 // 启动服务器,重点关注这里 openServer(url); optimizeSerialization(url); return exporter;}
...