Dubbo 实现原理分析 -SPI& 自适应扩展实现
Duboo 基本概念
Dubbo 整体架构:
evernotecid://D5D92FBE-C671-4B13-BAC8-4D01D3D20F5B/appyinxiangcom/8739769/ENNote/p68?hash=f07431ff8daafff6906882197d395a2a
Dubbo SPI
SPI 全称为 Service Provider Interface,一种服务提供发现机制。可以实现通过配置手段加载相应的实现类。JDK 里提供了一种 SPI 实现,Dubbo 并没有使用 jdk 的 spi,而是自己实现了一个 Dubbo SPI 服务发现机制。
首先看一下 JDK SPI 实现方式
定义一个接口类 Travel
public interface Travel {
public void travel();
}
Travel 有两个实现类
public class CarTravel implements Travel {
@Override
public void travel() {
System.out.println(“travel by car”);
}
}
public class PlaneTravel implements Travel{
@Override
public void travel() {
System.out.println(“travel by plane”);
}
}
接下来需要在 META-INF/services 文件夹下建立接口名称对应的文件,META-INF/services/com.demo.spi.jdk.Travel 文件内容为接口实现类的名称
com.demo.spi.jdk.CarTravel
com.demo.spi.jdk.PlaneTravel
测试使用
public class JdkSpiTest {
public static void main(String[] args) {
ServiceLoader<Travel> sServiceLoader = ServiceLoader.load(Travel.class);
sServiceLoader.forEach(Travel::travel);
}
}
测试结果
travel by car
travel by plane
JAVA SPI vs Dubbo SPI
首先加载路径不同,java spi 路径是 META-INF/services ,dubbo 是 META-INF/services/,META-INF/services/internal,META-INF/dubbo/
加载方式不同,java spi 是全量加载,dubbo 是按需加载接口实现类
dubbo spi 除了加载实现类外还增加了 IOC 和 AOP 特性
下面我们看一下 dubbo spi 的实现首先接口需要有 @SPI 注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
/**
* default extension name
*/
String value() default “”;
}
@SPI
public interface Travel {
public void travel();
}
配置文件采用 key=value 的形式配置
car=com.demo.spi.jdk.CarTravel
plane=com.demo.spi.jdk.PlaneTravel
测试 duubo spi 加载
public class DubboSPITest {
public static void main(String[] args) {
ExtensionLoader<Travel> extensionLoader = ExtensionLoader.getExtensionLoader(Travel.class);
Travel travel = extensionLoader.getExtension(“car”);
travel.travel();
}
}
输出结果
travel by car
Dubbo SPI 实现
1. 扩展类的加载
dubbo spi 是通过 ExtensionLoader 类来实现的,通过 ExtensionLoader 获取接口对应类型的扩展加载器 ExtensionLoader.getExtensionLoader, 第一次获取会新创建然后会缓存到 ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS 中,之后就在缓存中取就可以了
// 获取扩展加载器 ExtensionLoader,type 必须是 @SPI 注解接口
//ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS 缓存
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException(“Extension type == null”);
}
if (!type.isInterface()) {
throw new IllegalArgumentException(“Extension type (” + type + “) is not an interface!”);
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException(“Extension type (” + type +
“) is not an extension, because it is NOT annotated with @” + SPI.class.getSimpleName() + “!”);
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
构造方法
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
获取到接口对应的 ExtensionLoader 后,通过 getExtension(String name) 方法获取 name 对应的扩展对象
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException(“Extension name == null”);
}
if (“true”.equals(name)) {
return getDefaultExtension();
}
// 根据名称 获取 holder 对象,如果不存在则创建一个 holder
//holder 里缓存了 name 对应的实现类
Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
// 双重检查
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 如果为空,创建
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
扩展对象的创建
private T createExtension(String name) {
//1.getExtensionClasses() 获取所有扩展类
// 根据 name 获取扩展类的 Class
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
//2. 通过反射 newInstance() 创建扩展类实例,放到 EXTENSION_INSTANCES 缓存中
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
//3. 依赖注入 IOC
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
//4. 循环创建 Wrapper 实例 AOP
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException(“Extension instance (name: ” + name + “, class: ” +
type + “) couldn’t be instantiated: ” + t.getMessage(), t);
}
}
获取所有 Extension getExtensionClasses
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 通过 loadExtensionClasses 获取所有扩展类,并放到
//cachedClasses 缓存中
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
loadExtensionClasses
private Map<String, Class<?>> loadExtensionClasses() {
// 缓存默认的扩展名
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
// 去 dubbo spi 所在的路径下 META-INF/services/,META-INF/services/internal,META-INF/dubbo/
// 加载扩展类,放到 extensionClasses 中
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace(“org.apache”, “com.alibaba”));
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace(“org.apache”, “com.alibaba”));
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace(“org.apache”, “com.alibaba”));
return extensionClasses;
}
loadDirectory->loadResource->loadClass 最终的类加载实现
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException(“Error occurred when loading extension class (interface: ” +
type + “, class line: ” + clazz.getName() + “), class ”
+ clazz.getName() + ” is not subtype of interface.”);
}
// 如果类有 @Adaptive 注解,缓存到 cachedAdaptiveClass
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz);
} else if (isWrapperClass(clazz)) {
// 如果类的构造方法有加载类的类型,缓存到 Set<Class<?>> cachedWrapperClasses; 实现 aop 的类
cacheWrapperClass(clazz);
} else {
// 如果是普通类
clazz.getConstructor();
// 获取扩展类 name,为空就取默认 name
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException(“No such extension name for the class ” + clazz.getName() + ” in the config ” + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
cacheActivateClass(clazz, names[0]);
for (String n : names) {
// 缓存到 cachedNames
cacheName(clazz, n);
// 存储到 extensionClasses
saveInExtensionClass(extensionClasses, clazz, name);
}
}
}
}
2. dubbo ioc 注入实现
dubbo ioc aop 利用 setter 注入方式实现
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
// 判断是否是 setter 方法
if (isSetter(method)) {
/**
* Check {@link DisableInject} to see if we need auto injection for this property
*/
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
String property = getSetterProperty(method);
// 从 objectFactory 获取依赖对象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
//setter 注入
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error(“Failed to inject via method ” + method.getName()
+ ” of interface ” + type.getName() + “: ” + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
3. dubbo aop 实现
// 遍历所有 Wrapper 类 创建 Wrapper 实例
//setter 注入 injectExtension
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
Dubbo 扩展点自适应机制
1. 什么自适应扩展
自适应扩展的标记注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
Adaptive 标记在类上,表示扩展点的加载是人工编码完成,目前只有两个类被 @Adaptive,AdaptiveCompiler、AdaptiveExtensionFactory
Adaptive 标记在方法上,表示扩展点的加载是代理完成
AdaptiveCompiler 示例
@Adaptive
public class AdaptiveCompiler implements Compiler {
private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) {
DEFAULT_COMPILER = compiler;
}
@Override
public Class<?> compile(String code, ClassLoader classLoader) {
Compiler compiler;
ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
String name = DEFAULT_COMPILER; // copy reference
if (name != null && name.length() > 0) {
compiler = loader.getExtension(name);
} else {
compiler = loader.getDefaultExtension();
}
return compiler.compile(code, classLoader);
}
}
2. 加载自适应扩展
dubbo 通过 ExtensionLoader.getAdaptiveExtension() 获取自适应扩展实例
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建自适应扩展,并缓存到 cachedAdaptiveInstance
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException(“Failed to create adaptive instance: ” + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException(“Failed to create adaptive instance: ” + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
createAdaptiveExtension() 方法代码
private T createAdaptiveExtension() {
try {
// 反射创建 AdaptiveExtensionClass,并注入依赖
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException(“Can’t create adaptive extension ” + type + “, cause: ” + e.getMessage(), e);
}
}
获取 Class 类 getAdaptiveExtensionClass()
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// 创建自适应扩展类
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
createAdaptiveExtensionClass() 生成自适应扩展类
private Class<?> createAdaptiveExtensionClass() {
//1. 生成自适应扩展类代码
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
//2. 获取编译器
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
//3. 编译生成 class
return compiler.compile(code, classLoader);
}
Protocol 自适应扩展类生成代码 Protocol$Adaptive
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException(“The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!”);
}
public int getDefaultPort() {
throw new UnsupportedOperationException(“The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!”);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException(“url == null”);
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? “dubbo” : url.getProtocol());
if (extName == null)
throw new IllegalStateException(“Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (” + url.toString() + “) use keys([protocol])”);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException(“org.apache.dubbo.rpc.Invoker argument == null”);
if (arg0.getUrl() == null)
throw new IllegalArgumentException(“org.apache.dubbo.rpc.Invoker argument getUrl() == null”);
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? “dubbo” : url.getProtocol());
if (extName == null)
throw new IllegalStateException(“Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (” + url.toString() + “) use keys([protocol])”);
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}