乐趣区

dubbo源码分析

Dubbo 插件化

Dubbo 的插件化实现非常类似于原生的 JAVA 的 SPI:它只是提供一种协议,并没有提供相关插件化实施的接口。用过的同学都知道,它有一种 java 原生的支持类:ServiceLoader,通过声明接口的实现类,在 META-INF/services 中注册一个实现类,然后通过 ServiceLoader 去生成一个接口实例,当更换插件的时候只需要把自己实现的插件替换到 META-INF/services 中即可。

SPI

Dubo Spi 和 Java SPi 的使用和对比

在 Dubbo 中,SPI 是一个非常核心的机制,贯穿在几乎所有的流程中。弄明白这一块能够帮我们明白 dubo 源码

Dubbo 是基于 Java 原生 SPI 机制思想的一个改进,所以,先从 JAVA SPI 机制开始了解什么时 SPI 以后再去学习 Dubbo 的 SPI 就比较简单

JAVASPI

SPI 全称(service provider interface),是 JDK 内置的一种服务提供发现机制,目前市面上有很多框架都是用它来做服务的扩展发现,大家耳熟能详的如 JDBC、日志框架都有用到;

简单来说,它是一种动态替换发现的机制。举个简单的例子,如果我们定义了一个规范,需要第三方厂商去实现,那么对于我们应用方来说,只需要集成对应厂商的插件,既可以完成对应规范的实现机制。形成一种插拔式的扩展手段。

java 原生 spi 实现
  1. 需要在 classpath 下创建一个目录,该目录命名必须是:META-INF/services
  2. 在该目录下创建一个 properties 文件,该文件需要满足以下几个条件

    1. 文件名必须是扩展的接口的全路径名称
    2. 文件内部描述的是该扩展接口的所有实现类
    3. 文件的编码格式是 UTF-8
  3. 通过 java.util.ServiceLoader 的加载机制来发现 首先定义 api 接口 接下来对应产商实现对应的接口,并且在 resoucesMETA-INF/services中创建对应的文件, 并且通过 properties 规则配置实现类的全路径

​ 以及对应调用方引入 api 接口,和对应产商的 jar

​ 并且在对应的 resouces 中引入接口,如果引入了多个产商的 jar,那么会取到多个产商的东西

SPI 的实际应用

​ SPI 在很多地方有应用,大家可以看看最常用的 java.sql.Driver 驱动。JDK 官方提供了 java.sql.Driver 这个驱动扩展点,但是你们并没有看到 JDK 中有对应的 Driver 实现。那在哪里实现呢?

以连接 Mysql 为例,我们需要添加 mysql-connector-java 依赖。然后,你们可以在这个 jar 包中找到 SPI 的配置信息。如下图,所以 java.sql.Driver 由各个数据库厂商自行实现。这就是 SPI 的实际应用。当然除了这个意外,大家在 spring 的包中也可以看到相应的痕迹

SPI 的缺点
  1. JDK 标准的 SPI 会一次性加载实例化扩展点的所有实现,什么意思呢?就是如果你在 META-INF/service 下的文件里面加了 N 个实现类,那么 JDK 启动的时候都会一次性全部加载。那么如果有的扩展点实现初始化很耗时或者如果有些实现类并没有用到,那么会很浪费资源
  2. 如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因

dubbo SPI 规范

​ 使用原生 spi, 如果路径下有多个实现都会加载进来,如果有一个加载失败,会比较麻烦

目录

​ Dubbo 的 SPI 并非原生的 SPI,Dubbo 的规则是在

  • /META-INF/dubbo
  • /META-INF/internal
  • /META-INF/service

并且基于 SPI 接口去创建一个文件下面以需要实现的接口去创建一个文件,并且在文件中以 properties 规则一样配置实现类的全面以及分配实现的一个名称。

文件名称和接口名称保持一致,文件内容和 SPI 有差异,内容是 key 对应 value

我们看一下 dubbo-cluster 模块的 META-INF.dubbo.internal:

实现自己的扩展点

在 resources 目录下新建文件

META-INF/dubbo/com.alibaba.dubbo.rpc.Protocol文件,文件内容为

defineProtocol=com.gupaoedu.dubbo.protocol.DefineProtocol

实现 Protocol

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;
public class DefineProtocol implements Protocol {

  @Override
  public int getDefaultPort() {return 8888;}

  @Override
  public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {return null;}

  @Override
  public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {return null;}

  @Override
  public void destroy() {}
}

调用

public class App {public static void main(String[] args) throws IOException, InterruptedException {
    ClassPathXmlApplicationContext context = new
      ClassPathXmlApplicationContext
      ("dubbo-client.xml");

    Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).
      getExtension("defineProtocol");
    System.out.println(protocol.getDefaultPort());

    System.in.read();}
}

SPI 源码分析

​ 切入点

Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).
  getExtension("defineProtocol");

dubbo 的扩展点框架主要位于这个包下:

com.alibaba.dubbo.common.extension

大概结构如下:

com.alibaba.dubbo.common.extension
|–factory

    |--AdaptiveExtensionFactory  自适应扩展点工厂
    |--SpiExtensionFactory         SPI 扩展点工厂

|–support

   |--ActivateComparator 

|–Activate 自动激活加载扩展的注解

   |--Adaptive                      自适应扩展点的注解 
   |--ExtensionFactory              扩展点对象生成工厂接口 

|–ExtensionLoader 扩展点加载器,扩展点的查找,校验,加载等核心逻辑的实现类

  |--SPI                           @SPI 告诉当前应用其实一个扩展点例如 Protocol 一定可以在对应的 meta-inf/dubbo.internal 中看到

dubbo-config-spring
|- extension
|–SpringExtensionFactory

其中最核心的类就是 ExtensionLoader,几乎所有特性都在这个类中实现。

ExtensionLoader 没有提供 public 的构造方法,但是提供了一个 public static 的 getExtensionLoader,这个方法就是获取 ExtensionLoader 实例的工厂方法。其 public 成员方法中有三个比较重要的方法:

getActivateExtension:根据条件获取当前扩展可自动激活的实现

getExtension:根据名称获取当前扩展的指定实现

getAdaptiveExtension : 获取当前扩展的自适应实现

Protocol 源码

@SPI("dubbo")
public interface Protocol {

  /**
     * 获取缺省端口,当用户没有配置端口时使用。* 
     * @return 缺省端口
     */
  int getDefaultPort();

  /**
     * 暴露远程服务:<br>
     * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必须是幂等的,也就是暴露同一个 URL 的 Invoker 两次,和暴露一次没有区别。<br>
     * 3. export()传入的 Invoker 由框架实现并传入,协议不需要关心。<br>
     * 
     * @param <T> 服务的类型
     * @param invoker 服务的执行体
     * @return exporter 暴露服务的引用,用于取消暴露
     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
     */
  @Adaptive
  <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

  /**
     * 引用远程服务:<br>
     * 1. 当用户调用 refer()所返回的 Invoker 对象的 invoke()方法时,协议需相应执行同 URL 远端 export()传入的 Invoker 对象的 invoke()方法。<br>
     * 2. refer()返回的 Invoker 由协议实现,协议通常需要在此 Invoker 中发送远程请求。<br>
     * 3. 当 url 中有设置 check=false 时,连接失败不能抛出异常,并内部自动恢复。<br>
     * 
     * @param <T> 服务的类型
     * @param type 服务的类型
     * @param url 远程服务的 URL 地址
     * @return invoker 服务的本地代理
     * @throws RpcException 当连接服务提供方失败时抛出
     */
  @Adaptive
  <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

  /**
     * 释放协议:<br>
     * 1. 取消该协议所有已经暴露和引用的服务。<br>
     * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
     * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
     */
  void destroy();}

从上述 Protocol 的源码,可以看到两个注解@SPI("duo")@Adaptive

@SPI : 表示当前这个接口是一个扩展点,可以实现自己的扩展实现

@Adaptive 表示一个自适应扩展点,在方法级别上,会动态生成一个适配器类

getExtensionLoader

Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).
 getExtension("defineProtocol");
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {if (type == null)
    throw new IllegalArgumentException("Extension type == null");
  if (!type.isInterface()) {throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
  }
  if (!withExtensionAnnotation(type)) {
    throw new IllegalArgumentException("Extension type(" + type +
                                  ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + "Annotation!");
  }

  ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
  if (loader == null) {EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
    loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
  }
  return loader;
}

​ 根据一个类型 class 获得一个 ExtensionLoader,需要一个 class 类型的参数,这个参数必须是接口,而且此接口必须要 @SPI 注解注释,否则拒绝处理。检查通过之后首先会检查 ExtensionLoader 缓存中是否已经存在该扩展对应的 ExtensionLoader, 如果有则返回,否则创建一个新的 ExtensionLoader 负责加载此扩展实现,同时缓存起来。所以每一个扩展,dubbo 中只会有一个对应的ExtensionLoader 实例

接下来看下 ExtensionLoader 的私有构造函数:

private ExtensionLoader(Class<?> type) {
  this.type = type;
  objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

​ 这里保存了对应的扩展类型,并且设置了一个额外的 objectFactory 属性,他是一个 ExtensionFactory 类型,ExtensionFactory 主要用于加载扩展的实现:

ExtensionFactory 主要用于加载扩展的实现:

@SPI
public interface ExtensionFactory {

  /**
     * Get extension.
     * 
     * @param type object type.
     * @param name object name.
     * @return object instance.
     */
  <T> T getExtension(Class<T> type, String name);

}

​ ExtensionFactory 有 @SPI 注解,说明当前这个接口是一个扩展点。从 extension 包的结构图可以看到。Dubbo 内部提供了两个实现类:SpiExtensionFactoryAdaptiveExtensionFactory。不同的实现可以以不同的方式来完成扩展点实现的加载。

getAdaptiveExtension()

对应上述 ExtensionLoader 的 getAdaptiveExtension() 我们查看对应的 getAdaptivesion() 方法获得一个自适应的扩展点

如果是配置在类级别上,表示自定义适配器,如果是配置在方法上,表示需要动态生成适配器类

表示当前是自定义扩展点

默认的 ExtensionFactory 实现中,AdaptiveExtensionFactotry@Adaptive 注解注释,也就是它就是 ExtensionFactory 对应的自适应扩展实现(每个扩展点最多只能有一个自适应实现,如果所有实现中没有被 @Adaptive 注释的,那么 dubbo 会动态生成一个自适应实现类),也就是说,所有对 ExtensionFactory 调用的地方,实际上调用的都是 AdpativeExtensionFactory,那么我们看下他的实现代码:

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
    
    private final List<ExtensionFactory> factories;
    
    public AdaptiveExtensionFactory() {ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    public <T> T getExtension(Class<T> type, String name) {for (ExtensionFactory factory : factories) {T extension = factory.getExtension(type, name);
            if (extension != null) {return extension;}
        }
        return null;
    }

​ 这段代码,其实就相当于一个代理入口,它会遍历当前系统中所有的 ExtensionFactory 实现来获取指定的扩展实现,获取到扩展实现,遍历完所有 ExtensionFactory 实现,调用 ExtensionLoader 的 getSupportedExtensions 方法来获取 ExtensionFactory 的所有实现

​ 从前面 ExtensionLoader 的私有构造函数中可以看出,在选择 ExtensionFactory 的时候,并不是调用 getExtension(name)来获取某个具体的实现类,而是调用 getAdaptiveExtension 来获取一个自适应的实现。那么首先我们就来分析一下 getAdaptiveExtension 这个方法的实现吧:

@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {if(createAdaptiveInstanceError == null) {synchronized (cachedAdaptiveInstance) {instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance:" + t.toString(), t);
                    }
                }
            }
        }
        else {throw new IllegalStateException("fail to create adaptive instance:" + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }

    return (T) instance;
}

​ 首先检查缓存的 adaptiveInstance 是否存在,如果存在则直接使用,否则的话调用 createAdaptiveExtension 方法来创建新的 adaptiveInstance 并且缓存起来。也就是说对于某个扩展点,每次调用 ExtensionLoader.getAdaptiveExtension 获取到的都是同一个实例。

createAdaptiveExtension 方法
private T createAdaptiveExtension() {
  try {return injectExtension((T) getAdaptiveExtensionClass().newInstance());
  } catch (Exception e) {throw new IllegalStateException("Can not create adaptive extenstion" + type + ", cause:" + e.getMessage(), e);
  }
}
 在 `createAdaptiveExtension` 方法中,首先通过 `getAdaptiveExtensionClass` 方法获取到最终的自适应实现类型,然后实例化一个自适应扩展实现的实例,最后进行扩展点注入操作 
private Class<?> getAdaptiveExtensionClass() {getExtensionClasses();
    if (cachedAdaptiveClass != null) {return cachedAdaptiveClass;}
    return cachedAdaptiveClass = createAdaptiveExtensionClass();}

上述代码中主要做了两件事情

  1. getExtensionClasses() 加载所有路径下的扩展点
  2. createAdaptiveExtensionClass() 动态创建一个扩展点

cachedAdaptiveClass 这里有个判断,用来判断当前 Protocol 这个扩展点是否存在一个自定义的适配器,如果有,则直接返回自定义适配器,否则,就动态创建,这个值是在 getExtensionClasses 中赋值的,这块代码我们稍后再看

getExtensionClasses()
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String,Class<?>>>();
private Map<String, Class<?>> getExtensionClasses() {//com.alibaba.dubbo.rpc.Protocal=>[xx,xx]
  Map<String, Class<?>> classes = cachedClasses.get();
  if (classes == null) {synchronized (cachedClasses) {classes = cachedClasses.get();
      if (classes == null) {classes = loadExtensionClasses();
        cachedClasses.set(classes);
      }
    }
  }
  return classes;
}

上述代码主要做了几件事

  1. 从 cachedClasses 中获得一个结果,这个结果实际上就是所有的扩展点类,key 对应 name,value 对应 class
  2. 通过双重检查锁判断
  3. 调用loadExtensionClasses(),去加载扩展点的实现
loadExtensionClasses()
// 此方法已经 getExtensionClasses 方法同步过。private Map<String, Class<?>> loadExtensionClasses() {
  //type->Protocol.class
  final SPI defaultAnnotation = type.getAnnotation(SPI.class);
  if(defaultAnnotation != null) {String value = defaultAnnotation.value();
    if(value != null && (value = value.trim()).length() > 0) {String[] names = NAME_SEPARATOR.split(value);
      if(names.length > 1) {throw new IllegalStateException("more than 1 default extension name on extension" + type.getName()
                                        + ":" + Arrays.toString(names));
      }
      if(names.length == 1) cachedDefaultName = names[0];
    }
  }

  Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
  loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
  loadFile(extensionClasses, DUBBO_DIRECTORY);
  loadFile(extensionClasses, SERVICES_DIRECTORY);
  return extensionClasses;
}

从代码里面可以看到,在 loadExtensionClasses 中首先会检测扩展点在 @SPI 注解中配置的默认扩展实现的名称,并将其赋值给 cachedDefaultName 属性进行缓存,后面想要获取该扩展点的默认实现名称就可以直接通过访问 cachedDefaultName 字段来完成,比如 getDefaultExtensionName 方法就是这么实现的。从这里的代码中又可以看到,具体的扩展实现类型,是通过调用 loadFile 方法来加载,分别从一下三个地方加载:

META-INF/dubbo/internal

META-INF/dubbo

META-INF/services/

主要逻辑:

  1. 获得当前扩展点的注解,也就是 Protocol.class 这个类的注解@SPI
  2. 判断这个注解不为空,则再次获取 @SPI 中的 value 值
  3. 如果 value 有值,也就是 @SPI("dubbo"),则吧此 dubbo 的值赋值给cachedDefaultName,这就是为什么我们能够通过ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension() 能够获取到 DubboProtocol 这个扩展点的原因
  4. 最后,通过 loadFile 去加载指定路径下的所有扩展点
loadFile()
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {String fileName = dir + type.getName();
  try {
    Enumeration<java.net.URL> urls;
    ClassLoader classLoader = findClassLoader();
    if (classLoader != null) {urls = classLoader.getResources(fileName);
    } else {urls = ClassLoader.getSystemResources(fileName);
    }
    if (urls != null) {while (urls.hasMoreElements()) {java.net.URL url = urls.nextElement();
        try {BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
          try {
            String line = null;
            while ((line = reader.readLine()) != null) {final int ci = line.indexOf('#');
              if (ci >= 0) line = line.substring(0, ci);
              line = line.trim();
              if (line.length() > 0) {
                try {
                  String name = null;
                  int i = line.indexOf('=');
                  if (i > 0) {name = line.substring(0, i).trim();
                    line = line.substring(i + 1).trim();}
                  if (line.length() > 0) {Class<?> clazz = Class.forName(line, true, classLoader);
                    if (! type.isAssignableFrom(clazz)) {
                      throw new IllegalStateException("Error when load extension class(interface:" +
                                                      type + ", class line:" + clazz.getName() + "), class" 
                                                      + clazz.getName() + "is not subtype of interface.");
                    }
                    // 如果在类级别上,表示自定义适配器
                    // 如果是在方法上,表示需要动态生成适配器类
                    if (clazz.isAnnotationPresent(Adaptive.class)) {if(cachedAdaptiveClass == null) {cachedAdaptiveClass = clazz;} else if (! cachedAdaptiveClass.equals(clazz)) {
                        throw new IllegalStateException("More than 1 adaptive class found:"
                                                        + cachedAdaptiveClass.getClass().getName()
                                                        + "," + clazz.getClass().getName());
                      }
                    } else {
                      try {clazz.getConstructor(type);
                        Set<Class<?>> wrappers = cachedWrapperClasses;
                        if (wrappers == null) {cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                          wrappers = cachedWrapperClasses;
                        }
                        wrappers.add(clazz);
                      } catch (NoSuchMethodException e) {clazz.getConstructor();
                        if (name == null || name.length() == 0) {name = findAnnotationName(clazz);
                          if (name == null || name.length() == 0) {if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                && clazz.getSimpleName().endsWith(type.getSimpleName())) {name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();} else {throw new IllegalStateException("No such extension name for the class" + clazz.getName() + "in the config" + url);
                            }
                          }
                        }
                        String[] names = NAME_SEPARATOR.split(name);
                        if (names != null && names.length > 0) {Activate activate = clazz.getAnnotation(Activate.class);
                          if (activate != null) {cachedActivates.put(names[0], activate);
                          }
                          for (String n : names) {if (! cachedNames.containsKey(clazz)) {cachedNames.put(clazz, n);
                            }
                            Class<?> c = extensionClasses.get(n);
                            if (c == null) {extensionClasses.put(n, clazz);
                            } else if (c != clazz) {throw new IllegalStateException("Duplicate extension" + type.getName() + "name" + n + "on" + c.getName() + "and" + clazz.getName());
                            }
                          }
                        }
                      }
                    }
                  }
                } catch (Throwable t) {IllegalStateException e = new IllegalStateException("Failed to load extension class(interface:" + type + ", class line:" + line + ") in" + url + ", cause:" + t.getMessage(), t);
                  exceptions.put(line, e);
                }
              }
            } // end of while read lines
          } finally {reader.close();
          }
        } catch (Throwable t) {
          logger.error("Exception when load extension class(interface:" +
                       type + ", class file:" + url + ") in" + url, t);
        }
      } // end of while urls
    }
  } catch (Throwable t) {
    logger.error("Exception when load extension class(interface:" +
                 type + ", description file:" + fileName + ").", t);
  }
}

解析指定路径下的文件,获取对应的扩展点,通过反射的方式进行实例化之后,put 到 extensionClasses 这个 Map 集合中

调用 loadFile 方法,代码比较长,主要做了几个事情,有几个变量会赋值

cachedAdaptiveClass : 当前 Extension 类型对应的 AdaptiveExtension 类型(只能一个)

cachedWrapperClasses : 当前 Extension 类型对应的所有 Wrapper 实现类型(无顺序)

cachedActivates : 当前 Extension 实现自动激活实现缓存(map, 无序)

cachedNames : 扩展点实现类对应的名称(如配置多个名称则值为第一个)

当 loadExtensionClasses 方法执行完成之后,还有以下变量被赋值:

cachedDefaultName : 当前扩展点的默认实现名称

当 getExtensionClasses 方法执行完成之后,除了上述变量被赋值之外,还有以下变量被赋值:

cachedClasses : 扩展点实现名称对应的实现类(一个实现类可能有多个名称)

其实也就是说,在调用了 getExtensionClasses 方法之后,当前扩展点对应的实现类的一些信息就已经加载进来了并且被缓存了。后面的许多操作都可以直接通过这些缓存数据来进行处理了。

createAdaptiveExtensionClass()
private Class<?> createAdaptiveExtensionClass() {
  // 生成字节码代码
  String code = createAdaptiveExtensionClassCode();
  // 获得类加载器
  ClassLoader classLoader = findClassLoader();
  com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
  // 动态编译字节码
  return compiler.compile(code, classLoader);
}

动态生成适配器代码,以及动态编译

  1. createAdaptiveExtensionClassCode , 动态创建一个字节码文件,返回 code 这个字符串
  2. 通过 compilier.compile 进行编译(默认情况下使用的是 javassist)
  3. 通过 classLoader 载入到 jvm 中

上生成的 code 和类是

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  }

  public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  }

  public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {if (arg1 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg1;
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
      throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
  }

  public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null)
      throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
    com.alibaba.dubbo.common.URL url = arg0.getUrl();
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
      throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
  }
}
动态生成类的主要功能
  1. 从 url 或者拓展接口获取扩展接口实现类的名称
  2. 根据名称,获取实现了ExtensionLoader.getExtensionLoader(扩展接口类).getExtension(扩展接口实现类名称),然后调用类的方法

refer 是一个引用,得到一个 url 的参数,通过参数判断是走哪个协议发布服务

回到 createAdaptiveExtension 方法,他调用了 getExtesionClasses 方法加载扩展点实现信息完成之后,就可以直接通过判断 cachedAdaptiveClass 缓存字段是否被赋值盘确定当前扩展点是否有默认的 AdaptiveExtension 实现。如果没有,那么就调用 createAdaptiveExtensionClass 方法来动态生成一个。在 dubbo 的扩展点框架中大量的使用了缓存技术。

创建自适应扩展点实现类型和实例化就已经完成了,下面就来看下扩展点自动注入的实现

injectExtension
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
    
    private final List<ExtensionFactory> factories;
    
    public AdaptiveExtensionFactory() {ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    public <T> T getExtension(Class<T> type, String name) {for (ExtensionFactory factory : factories) {T extension = factory.getExtension(type, name);
            if (extension != null) {return extension;}
        }
        return null;
    }

}

这里可以看到,扩展点自动注入的一句就是根据 setter 方法对应的参数类型和 property 名称从 ExtensionFactory 中查询,如果有返回扩展点实例,那么就进行注入操作。到这里 getAdaptiveExtension 方法就分析完毕了。

需要明白一点 dubbo 的内部传参基本上都是基于 Url 来实现的,也就是说 Dubbo 是基于 URL 驱动的技术

所以,适配器类的目的是在运行期获取扩展的真正实现来调用,解耦接口和实现,这样的话要不我们自己实现适配器类,要不 dubbo 帮我们生成,而这些都是通过 Adpative 来实现。

到目前为止,我们的 AdaptiveExtension 的主线走完了,可以简单整理一下他们的调用关系如下

总结

在整个过程中,最重要的两个方法 getExtensionClasses 和 createAdaptiveExtensionClass

getExtensionClasses

这个方法主要是读取 META-INF/services、META-INF/dubbo、META-INF/internal 目录下的文件内容

分析每一行,如果发现其中有哪个类的 annotation 是 @Adaptive,就找到对应的 AdaptiveClass。如果没有的话,就动态创建一个

createAdaptiveExtensionClass

该方法是在 getExtensionClasses 方法找不到 AdaptiveClass 的情况下被调用,该方法主要是通过字节码的方式在内存中新生成一个类,它具有 AdaptiveClass 的功能,Protocol 就是通过这种方式获得 AdaptiveClass 类的。

源码分析

加载顺序

NamespaceHandler: 注册 BeanDefinitionParser,利用它来解析
BeanDefinitionParser:解析配置文件的元素
spring 会默认加载 jar 包下/META-INF/spring.handlers 找到对应的 NamespaceHandler

initializingBean

当 spring 容器初始化完以后,会调用 afterPropertiesSet 方法

DisposableBean

bean 被销毁的时候调用 destory 方法

ApplicationContextAware

容器初始化完成之后会主动注入 applicationContext

ApplicationListener

事件监听

BeanNameAware

对象初始化完之后会获取 bean 的本身属性
delay 能够控制延迟发布

源码解读要点

首先我们要关注的是服务的发布和服务的消费这两个主要的流程,那么就可以基于这个点去找到源码分析的突破口。那么自然而然我们就可以想到 spring 的配置

Dubbo 的接入实现

Dubbo 中 spring 扩展就是使用 spring 的自定义类型,所以同样也有 NamespaceHandler、BeanDefinitionParser。而 NamespaceHandler 是 DubboNamespaceHandler

public class DubboNamespaceHandler extends NamespaceHandlerSupport {  
  static {Version.checkDuplicate(DubboNamespaceHandler.class);   
  }   

  public void init() {registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));    
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));   
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); 
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); 
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); 
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));  
    registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); 

  }
} 

BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我们向看 <dubbo:service> 的配置,就直接看 DubboBeanDefinitionParser 中

这个里面主要做了一件事,把不同的配置分别转化成 spring 容器中的 bean 对象

application对应ApplicationConfig

registry对应RegistryConfig

monitor对应MonitorConfig

provider对应ProviderConfig

consumer对应ConsumerConfig

为了在 spring 启动的时候,也相应的启动 provider 发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个 bean

ServiceBeanReferenceBean

分别继承了 ServiceConfigReferenceConfig

同时还分别实现了InitializingBeanDisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware

InitializingBean接口为 bean 提供了初始化方法的方式,它只包括 afterPropertiesSet 方法,凡是继承该接口的类,在初始化 bean 的时候会执行该方法。

DisposableBean bean 被销毁的时候,spring 容器会自动执行 destory 方法,比如释放资源

ApplicationContextAware 实现了这个接口的 bean,当 spring 容器初始化的时候,会自动的将 ApplicationContext 注入进来

ApplicationListener ApplicationEvent 事件监听,spring 容器启动后会发一个事件通知

BeanNameAware 获得自身初始化时,本身的 bean 的 id 属性

那么基本的实现思路可以整理出来了

  1. 利用 spring 的解析收集 xml 中的配置信息,然后把这些配置信息存储到 serviceConfig 中
  2. 调用 ServiceConfig 的 export 方法来进行服务的发布和注册

服务的发布

ServiceBean

serviceBean是服务发布的切入点,通过 afterPropertiesSet 方法,调用 export() 方法进行发布。

export为父类 ServiceConfig 中的方法,所以跳转到 SeviceConfig 类中的 export 方法

public synchronized void export() {if (provider != null) {if (export == null) {export = provider.getExport();
    }
    if (delay == null) {delay = provider.getDelay();
    }
  }
  if (export != null && ! export.booleanValue()) {return;}
  if (delay != null && delay > 0) {Thread thread = new Thread(new Runnable() {public void run() {
        try {Thread.sleep(delay);
        } catch (Throwable e) { }
        doExport();}
    });
    thread.setDaemon(true);
    thread.setName("DelayExportServiceThread");
    thread.start();} else {doExport();
  }
}

我们发现,delay 的作用就是延迟暴露,而延迟的方式也很直截了当,Thread.sleep(delay)

  1. export 是 synchronized 修饰的方法。也就是说暴露的过程是原子操作,正常情况下不会出现锁竞争的问题,毕竟初始化过程大多数情况下都是单一线程操作,这里联想到了 spring 的初始化流程,也进行了加锁操作,这里也给我们平时设计一个不错的启示:初始化流程的性能调优优先级应该放的比较低,但是安全的优先级应该放的比较高!
  2. 继续看 doExport() 方法。同样是一堆初始化代码

export 的过程

继续看 doExport(),最终会调用到 doExportUrls()中:

private void doExportUrls() {List<URL> registryURLs = loadRegistries(true);// 是不是获得注册中心的配置
  for (ProtocolConfig protocolConfig : protocols) { // 是不是支持多协议发布
    doExportUrlsFor1Protocol(protocolConfig, registryURLs);
  }
}

对应上述 protocols 长成这样 <dubbo:protocol name="dubbo" port="20880" id ="dubbo"/>protocols 也是根据配置装配出来的,接下来进入对应的duExportUrlsFor1Protocol 方法查看对应 s 具体实现

最终实现逻辑

// 配置为 none 不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// 配置不是 remote 的情况下做本地暴露 (配置为 remote,则表示只暴露远程服务)
  if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);
  }
  // 如果配置不是 local 则暴露为远程服务.(配置为 local,则表示只暴露本地服务)
  if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){if (logger.isInfoEnabled()) {logger.info("Export dubbo service" + interfaceClass.getName() + "to url" + url);
    }
    if (registryURLs != null && registryURLs.size() > 0
        && url.getParameter("register", true)) {for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
        URL monitorUrl = loadMonitor(registryURL);
        if (monitorUrl != null) {url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
        }
        if (logger.isInfoEnabled()) {logger.info("Register dubbo service" + interfaceClass.getName() + "url" + url + "to registry" + registryURL);
        }
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

        Exporter<?> exporter = protocol.export(invoker);
        exporters.add(exporter);
      }
    } else {Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

      Exporter<?> exporter = protocol.export(invoker);
      exporters.add(exporter);
    }
  }
}

从上述代码 doExportUrlsFor1Protocol 方法,先创建两个 URL,分别如下

dubbo://192.168.xx.xx:20880/com.zzjson.IHello

registry://192.168.xx

其对应的 url 就是 servicesproviders的信息

在上面这段代码中可以看到 Dubbo 的比较核心的抽象:Invoker,Invoker 是一个代理类,从 ProxyFactory 中生成。这个地方可以做一个小结

  1. Invoker - 执行具体的远程调用
  2. Protocol – 服务地址的发布和订阅
  3. Exporter – 暴露服务或取消暴露

Protocol$Adaptive

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  }

  public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  }

  public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {if (arg1 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg1;
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
      throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
  }

  public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null)
      throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
    com.alibaba.dubbo.common.URL url = arg0.getUrl();
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
      throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
  }
}

上述代码做两件事

  1. 从 url 中获得 protocol 的协议地址,如果 protocol 为空,表示通过 dubbo 协议发布服务,否则根据配置的谢意类型来发布服务
  2. 调用ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName)

getExtension()

public T getExtension(String name) {if (name == null || name.length() == 0)
    throw new IllegalArgumentException("Extension name == null");
  if ("true".equals(name)) {return getDefaultExtension();
  }
  Holder<Object> holder = cachedInstances.get(name);
  if (holder == null) {cachedInstances.putIfAbsent(name, new Holder<Object>());
    holder = cachedInstances.get(name);
  }
  Object instance = holder.get();
  if (instance == null) {synchronized (holder) {instance = holder.get();
      if (instance == null) {instance = createExtension(name);
        holder.set(instance);
      }
    }
  }
  return (T) instance;
}
createExtension()
@SuppressWarnings("unchecked")
private T createExtension(String name) {Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {throw findException(name);
    }
    try {T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
      // 对获取的实例进行依赖注入
        injectExtension(instance);
      // 在 loadFile 中进行赋值的
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && wrapperClasses.size() > 0) {for (Class<?> wrapperClass : wrapperClasses) {
              // 对实例进行包装,分别调用带 Protocol 参数的构造函数创建实例,然后进行依赖注入
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name:" + name + ", class:" +
                type + ")  could not be instantiated:" + t.getMessage(), t);
    }
}

主要做如下四个事情

  1. 根据 name 获取对应的 class
  2. 根据 calss 创建一个实例
  3. 对获取的实例进行依赖注入
  4. 对实例进行包装,分别调用带 Protocol 参数的构造函数创建实例,然后进行依赖注入
  5. dubbo-rpc-apiresources路径下,找到 com.alibaba.dubbo.rpc.Protocol 文件中存在filter/listener
  6. 遍历 cachedWrapperClassDubboProtocol进行包装,会通过 ProtocolFilterWrapper,ProtocolListenerWrapper 包装
private Map<String, Class<?>> getExtensionClasses() {Map<String, Class<?>> classes = cachedClasses.get();
  if (classes == null) {synchronized (cachedClasses) {classes = cachedClasses.get();
      if (classes == null) {classes = loadExtensionClasses();
        cachedClasses.set(classes);
      }
    }
  }
  return classes;
}

总结

ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); 这段代码中,当 extNameregistry的时候,我们不需要再次去阅读这块代码了,直接可以在扩展点中找到相应的实现扩展类,/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol配置如下

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol

所以我们定位到 RegistryProtocolRegistryProtocol 这个类中的 export 方法

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  //export invoker
  final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
  //registry provider
  final Registry registry = getRegistry(originInvoker);
  final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
  registry.register(registedProviderUrl);
  // 订阅 override 数据
  // FIXME 提供者订阅时,会影响同一 JVM 即暴露服务,又引用同一服务的的场景,因为 subscribed 以服务名为缓存的 key,导致订阅信息覆盖。final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
  final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
  overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  // 保证每次 export 都返回一个新的 exporter 实例
  return new Exporter<T>() {public Invoker<T> getInvoker() {return exporter.getInvoker();
    }
    public void unexport() {
      try {exporter.unexport();
      } catch (Throwable t) {logger.warn(t.getMessage(), t);
      }
      try {registry.unregister(registedProviderUrl);
      } catch (Throwable t) {logger.warn(t.getMessage(), t);
      }
      try {overrideListeners.remove(overrideSubscribeUrl);
        registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
      } catch (Throwable t) {logger.warn(t.getMessage(), t);
      }
    }
  };
}

doLoalExport()

上述 doLoaclExport 源码为:

private Protocol protocol;

public void setProtocol(Protocol protocol) {this.protocol = protocol;}
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){String key = getCacheKey(originInvoker);
  ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
  if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
      if (exporter == null) {final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
        exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
        bounds.put(key, exporter);
      }
    }
  }
  return (ExporterChangeableWrapper<T>) exporter;
}

上述代码 Protocol 代码是怎么复制的呢,是在 injectExtension 方法中对已经加载的扩展点的属性进行依赖注入了

protocol 发布服务

因此我们知道 protocol 是一个自适应扩展点,Protocol$Adaptive,然后调用这个自适应扩展点中的 export 方法,这个时候传入的协议地址应该是

dubbo://127.0.0.1/xxx因此在 Protocol$Adaptive.export 方法中,ExtensionLoader.getExtension(Protocol.class).getExtension就是基于 DubboProtocol 协议发布服务了么?当然不是,此处获取的不是一个单纯的 DubboProtocol 扩展点,而是会通过 WrapperProtocl进行装饰,装饰器分别为 ProtocolFilterWrapper 或者是 ProtoclListenerWrapper,至于为什么MockProtocol 为什么不在装饰器里面呢?我们可以想到,在 ExtensionLoader.loadFile 这段代码的时候,装饰器必须要有一个 Protocol 的构造方法,如下

public ProtocolFilterWrapper(Protocol protocol){if (protocol == null) {throw new IllegalArgumentException("protocol == null");
  }
  this.protocol = protocol;
}

至此,我们可以知道 Protocol$Adaptive 中的 export 方法会调用 ProtocolFilterWrapper 以及 ProtocolListenerWrapper 类的方法

ProtocolFilterWrapper

public class ProtocolFilterWrapper implements Protocol {

  private final Protocol protocol;

  public ProtocolFilterWrapper(Protocol protocol) {if (protocol == null) {throw new IllegalArgumentException("protocol == null");
    }
    this.protocol = protocol;
  }
// 此方法读取所有的 filter 类,利用这些类封装 invoker
  private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 自动激活扩展点,根据条件获取当前扩展可自动激活的实现
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);
        final Invoker<T> next = last;
        last = new Invoker<T>() {

          @Override
          public Class<T> getInterface() {return invoker.getInterface();
          }

          @Override
          public URL getUrl() {return invoker.getUrl();
          }

          @Override
          public boolean isAvailable() {return invoker.isAvailable();
          }

          @Override
          public Result invoke(Invocation invocation) throws RpcException {return filter.invoke(next, invocation);
          }

          @Override
          public void destroy() {invoker.destroy();
          }

          @Override
          public String toString() {return invoker.toString();
          }
        };
      }
    }
    return last;
  }

  @Override
  public int getDefaultPort() {return protocol.getDefaultPort();
  }

  @Override
  public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);
    }
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
  }

  @Override
  public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {return protocol.refer(type, url);
    }
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
  }

  @Override
  public void destroy() {protocol.destroy();
  }

}

ProtocolFilterWrapper这个类非常重要

  1. 他有一个参数为 Protocol protocol 构造参数
  2. 它实现了 Protocol 接口
  3. 使用了责任链模式,对 exportrefer函数进行了封装

我们查看如下文件dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter

echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter

可以看到,invoker 通过如下的 Filter 组装成一个责任链

其中涉及到很多功能,包括权限验证,异常,超时等等,当然可以预计计算调用时间等等应该也是在这其中的某个类实现的,这里我们可以看到 exportrefer过程都会被 filter 过滤

ProtocolListenerWrapper

在这里我们可以看到 exportrefer分别对应了不同的 Wrapper;export 对应的 ListenerExporterWrapper 这块暂不去分析,因为此地方并没有提供实现类

public class ProtocolListenerWrapper implements Protocol {

  private final Protocol protocol;

  public ProtocolListenerWrapper(Protocol protocol){if (protocol == null) {throw new IllegalArgumentException("protocol == null");
    }
    this.protocol = protocol;
  }

  public int getDefaultPort() {return protocol.getDefaultPort();
  }

  public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);
    }
    return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                                          Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                                                                       .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
  }

  public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {return protocol.refer(type, url);
    }
    return new ListenerInvokerWrapper<T>(protocol.refer(type, url), 
                                         Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                           .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
  }

  public void destroy() {protocol.destroy();
  }

}

我们看一下 dubboProtocol 的 export 方法:openServer(url)

DuoProtocol.export()

通过上述代码分析完以后,我们能够定位到 DubboProtocol.export() 方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();

  // export service.
  String key = serviceKey(url);
  DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  exporterMap.put(key, exporter);

  //export an stub service for dispaching event
  Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
  Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
  if (isStubSupportEvent && !isCallbackservice){String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
    if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){if (logger.isWarnEnabled()){logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                                              "], has set stubproxy support event ,but no stub methods founded."));
      }
    } else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
    }
  }
// 暴露服务
  openServer(url);

  return exporter;
}

接着调用openServer

private void openServer(URL url) {
  // find server.
  String key = url.getAddress();//116.62.221.6:20880
  //client 也可以暴露一个只有 server 可以调用的服务。boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
  if (isServer) {ExchangeServer server = serverMap.get(key);
    if (server == null) {// 没有的话就创建服务
      serverMap.put(key, createServer(url));
    } else {
      //server 支持 reset, 配合 override 功能使用
      server.reset(url);
    }
  }
}

createServer()

继续看其中的 createServer 方法:

private ExchangeServer createServer(URL url) {
  // 默认开启 server 关闭时发送 readonly 事件
  url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
  // 默认开启 heartbeat
  url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
  String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

  if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
    throw new RpcException("Unsupported server type:" + str + ", url:" + url);

  url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
  ExchangeServer server;
  try {server = Exchangers.bind(url, requestHandler);
  } catch (RemotingException e) {throw new RpcException("Fail to start server(url:" + url + ")" + e.getMessage(), e);
  }
  str = url.getParameter(Constants.CLIENT_KEY);
  if (str != null && str.length() > 0) {Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type:" + str);
    }
  }
  return server;
}

创建服务,然后开启心跳监测,默认使用netty。组装 url

Exchanger.bind()

发现 ExchangeServer 是通过 Exchangers 创建的,直接看 Exchanger.bind 方法

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");
  }
  if (handler == null) {throw new IllegalArgumentException("handler == null");
  }
  url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
  return getExchanger(url).bind(url, handler);
}

getExchanger 方法实际上调用的是 ExtensionLoader 的相关方法,这里的 ExtensionLoader 是 dubbo 插件化的核心,我们会在后面的插件化讲解中详细讲解,这里我们只需要知道 Exchanger 的默认实现只有一个:HeaderExchanger。上面一段代码最终调用的是:

public static Exchanger getExchanger(URL url) {String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
  return getExchanger(type);
}

public static Exchanger getExchanger(String type) {return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
public class HeaderExchanger implements Exchanger {

  public static final String NAME = "header";

  public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  }

  public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  }

}

可以看到 Server 与 Client 实例均是在这里创建的,HeaderExchangeServer 需要一个 Server 类型的参数,来自Transporters.bind()

Transporters.bind()

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");
  }
  if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");
  }
  ChannelHandler handler;
  if (handlers.length == 1) {handler = handlers[0];
  } else {handler = new ChannelHandlerDispatcher(handlers);
  }
  return getTransporter().bind(url, handler);
} 

getTransporter()获取的实例来源于配置,默认返回一个 NettyTransporter:

NettyTransport.bind()

通过 NettyTranport 创建基于 Nettyserver服务

public class NettyTransporter implements Transporter {

  public static final String NAME = "netty";

  public Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);
  }

  public Client connect(URL url, ChannelHandler listener) throws RemotingException {return new NettyClient(url, listener);
  }

}

在调用 HeaderExchanger.bind() 方法的时候,是先 new 一个 HeaderExchangeServer 这个 server 是干嘛呢,是对当前这个链接去建立心跳机制

public class HeaderExchangeServer implements ExchangeServer {protected final Logger        logger = LoggerFactory.getLogger(getClass());

  private final ScheduledExecutorService scheduled   = Executors.newScheduledThreadPool(1,new NamedThreadFactory("dubbo-remoting-server-heartbeat",  true);

  // 心跳定时器
  private ScheduledFuture<?> heatbeatTimer;

  // 心跳超时,毫秒。缺省 0,不会执行心跳。private int                            heartbeat;

  private int                            heartbeatTimeout;

  private final Server server;

  private volatile boolean closed = false;

  public HeaderExchangeServer(Server server) {if (server == null) {throw new IllegalArgumentException("server == null");
    }
    this.server = server;
    this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
    this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    if (heartbeatTimeout < heartbeat * 2) {throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    }
    // 心跳
    startHeatbeatTimer();}
  private void startHeatbeatTimer() {
    // 关闭心跳定时
    stopHeartbeatTimer();
    if (heartbeat > 0) {
      // 每隔 heartbeat 时间执行一次
      heatbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
          // 获取 channels
          public Collection<Channel> getChannels() {
            return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels() );
          }
        }, heartbeat, heartbeatTimeout),
        heartbeat, heartbeat,TimeUnit.MILLISECONDS);
    }
  }
    // 关闭心跳定时
  private void stopHeartbeatTimer() {
    try {
      ScheduledFuture<?> timer = heatbeatTimer;
      if (timer != null && ! timer.isCancelled()) {timer.cancel(true);
      }
    } catch (Throwable t) {logger.warn(t.getMessage(), t);
    } finally {heatbeatTimer =null;}
  }

}

心跳线程 HeartBeatTask 在超时时间之内发送数据,在超时时间之外,是客户端的话,重连;是服务端,那么关闭服务端发布。

服务的注册

前面我们知道,基于 Spring 的这个解析入口,到发布服务的过程,接着基于 DubboProtocol 去发布,最终调用 Netty 的 api 创建了一个NettyServer

那么继续沿着 RegistryProtocol.export 这个方法,来看看注册服务的代码

RegistryProtocol.export()

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  //export invoker
  final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
  //registry provider
  final Registry registry = getRegistry(originInvoker);
  final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
  registry.register(registedProviderUrl);
  // 订阅 override 数据
  // FIXME 提供者订阅时,会影响同一 JVM 即暴露服务,又引用同一服务的的场景,因为 subscribed 以服务名为缓存的 key,导致订阅信息覆盖。final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
  final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
  overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  // 保证每次 export 都返回一个新的 exporter 实例
  return new Exporter<T>() {public Invoker<T> getInvoker() {return exporter.getInvoker();
    }
    public void unexport() {
      try {exporter.unexport();
      } catch (Throwable t) {logger.warn(t.getMessage(), t);
      }
      try {registry.unregister(registedProviderUrl);
      } catch (Throwable t) {logger.warn(t.getMessage(), t);
      }
      try {overrideListeners.remove(overrideSubscribeUrl);
        registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
      } catch (Throwable t) {logger.warn(t.getMessage(), t);
      }
    }
  };
}

getRegistry()

private RegistryFactory registryFactory;

public void setRegistryFactory(RegistryFactory registryFactory) {this.registryFactory = registryFactory;}
private Registry getRegistry(final Invoker<?> originInvoker){URL registryUrl = originInvoker.getUrl();// 获得 registry://192.168.xx.xx:2181 的协议地址
  if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
    // 得到 zk 的谢意地址
    String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
    //registryUrl 就会变成了 zookeeper://192.168.xx.xx
    registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
  }
  //registryFactory 是什么
  return registryFactory.getRegistry(registryUrl);
}

上述代码很明显了,通过分析,其实就是把 registry 的协议头改成服务提供者配置的协议地址,就是我们配置

<dubbo:registry address="zookeeper://192.168.xx.xx.2181"/>

然后 registryFactory.getRegistry 的目的,就是通过协议地址匹配到对应的注册中心。那 registryFactory 是一个什么样的对象呢,从上述我们可以才,其是一个扩展点,并且我们能够注意到这里面的一个方法上有一个 @Adaptive 的注解,说明了其实一个自适应扩展点,按照我们之前看过的代码,自适应扩展点加在方法层面上,表示会动态生成一个自适应的适配器,所以这个自适应适配器应该是RegistryFactory$Adaptive

@SPI("dubbo")
public interface RegistryFactory {

  /**
     * 连接注册中心.
     * 
     * 连接注册中心需处理契约:<br>
     * 1. 当设置 check=false 时表示不检查连接,否则在连接不上时抛出异常。<br>
     * 2. 支持 URL 上的 username:password 权限认证。<br>
     * 3. 支持 backup=10.20.153.10 备选注册中心集群地址。<br>
     * 4. 支持 file=registry.cache 本地磁盘文件缓存。<br>
     * 5. 支持 timeout=1000 请求超时设置。<br>
     * 6. 支持 session=60000 会话超时或过期设置。<br>
     * 
     * @param url 注册中心地址,不允许为空
     * @return 注册中心引用,总不返回空
     */
  @Adaptive({"protocol"})
  Registry getRegistry(URL url);

}

RegistryFactory$Adaptive

public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {if (arg0 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg0;
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
      throw new IllegalStateException("Fail to get     extension(com.alibaba.dubbo.registry.RegistryFactory)" +"name from url(" + url.toString() + ") usekeys([protocol])")";      
      com.alibaba.dubbo.registry.RegistryFactory extension =       (com.alibaba.dubbo.registry.RegistryFactory)
      ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.Reg                               istryFactory.class).getExtension(extName);
    return extension.getRegistry(arg0);
  }
}

我们拿到这个动态生成的自适应扩展点,看看这段代码中的实现

  1. 从 url 中拿到协议头信息,这个时候的协议头是zookeeper://
  2. 通过 ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper") 去获得一个指定的扩展点,而这个扩展点的配置在
/dubbo-registry/dubbo-registry-zookeeper/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory

内容为

zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory

ZookeeperRegistryFactory

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

  private ZookeeperTransporter zookeeperTransporter;

  public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {this.zookeeperTransporter = zookeeperTransporter;}

  public Registry createRegistry(URL url) {return new ZookeeperRegistry(url, zookeeperTransporter);
  }

}

此方法中并没有 getRegistry 方法,而是在父类AbstractRegistryFactory

public Registry getRegistry(URL url) {url = url.setPath(RegistryService.class.getName())
    .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
    .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
  String key = url.toServiceString();
  // 锁定注册中心获取过程,保证注册中心单一实例
  LOCK.lock();
  try {Registry registry = REGISTRIES.get(key);
    if (registry != null) {return registry;}
    registry = createRegistry(url);
    if (registry == null) {throw new IllegalStateException("Can not create registry" + url);
    }
    REGISTRIES.put(key, registry);
    return registry;
  } finally {
    // 释放锁
    LOCK.unlock();}
}

上述方法

  1. 从缓存 REGISTRIES 中,根据 key 获得对应的Registry
  2. 如果不存在,则创建

createRegistry

    public Registry createRegistry(URL url) {return new ZookeeperRegistry(url, zookeeperTransporter);
    }

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {super(url);
  if (url.isAnyHost()) {throw new IllegalStateException("registry address == null");
  }
  String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
  if (! group.startsWith(Constants.PATH_SEPARATOR)) {group = Constants.PATH_SEPARATOR + group;}
  this.root = group;
  zkClient = zookeeperTransporter.connect(url);
  zkClient.addStateListener(new StateListener() {public void stateChanged(int state) {if (state == RECONNECTED) {
        try {recover();
        } catch (Exception e) {logger.error(e.getMessage(), e);
        }
      }
    }
  });
}

代码分析到这,我们对于 getRegistry 得出结论根据当前注册中心的配置信息,获得一个匹配的注册中心,也就是ZookeeperRegistry registry.register(registedProviderUrl)

继续往下看会调用对应的 registry.register 去把 dubbo:// 的谢意地址注册到 zookeeper 上,这个方法会调用 FailbackRegistry 类中的 register, 因为其父类FailbackRegistry 中存在 register 方法,而这个类重写了此方法,所以我们可以直接定位到 FailbackRegistry 这个类中的 register 方法中

FailbackRegistry.register

@Override
public void register(URL url) {super.register(url);
  failedRegistered.remove(url);
  failedUnregistered.remove(url);
  try {
    // 向服务器端发送注册请求
    doRegister(url);
  } catch (Exception e) {
    Throwable t = e;

    // 如果开启了启动时检测,则直接抛出异常
    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
      && url.getParameter(Constants.CHECK_KEY, true)
      && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
    boolean skipFailback = t instanceof SkipFailbackWrapperException;
    if (check || skipFailback) {if(skipFailback) {t = t.getCause();
      }
      throw new IllegalStateException("Failed to register" + url + "to registry" + getUrl().getAddress() + ", cause:" + t.getMessage(), t);
    } else {logger.error("Failed to register" + url + ", waiting for retry, cause:" + t.getMessage(), t);
    }

    // 将失败的注册请求记录到失败列表,定时重试
    failedRegistered.add(url);
  }
}

从名字上来看,是一个失败重试机制,调用父类的 register 方法把当前 url 添加到缓存集合中,调用子类的 doRegister 方法

protected void doRegister(URL url) {
  try {zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
  } catch (Throwable e) {throw new RpcException("Failed to register" + url + "to zookeeper" + getUrl() + ", cause:" + e.getMessage(), e);
  }
}

可以看到,调用了 zkclient.create 在 zookeeper 中创建节点

最后 RegistryProtocol.export 这个方法之后的代码不再分析了,就是去服务提供端注册一个 zookeeper 监听,当监听发生变化的时候,服务端做相应的处理。

退出移动版