1-源码分析SOFARPC可扩展的机制SPI

41次阅读

共计 9851 个字符,预计需要花费 25 分钟才能阅读完成。

原文链接:https://www.cnblogs.com/luozh…

这几天离职在家,正好没事可以疯狂的输出一下,本来想写 DUBBO 的源码解析的,但是发现写 DUBBO 源码的太多了,所以找一个写的不那么多的框架,所以就选中 SOFARPC 这个框架了。

SOFARPC 是蚂蚁金服开源的一个 RPC 框架,相比 DUBBO 它没有这么多历史的包袱,代码更加简洁,设计思路更加清晰,更加容易去理解其中的代码。

那么为什么要去重写原生的 SPI 呢?官方给出了如下解释:

  1. 按需加载
  2. 可以有别名
  3. 可以有优先级进行排序和覆盖
  4. 可以控制是否单例
  5. 可以在某些场景下使用编码
  6. 可以指定扩展配置位置
  7. 可以排斥其他扩展点

整个流程如下:

我们以 ConsumerBootstrap 为例:

先要有一个抽象类:

@Extensible(singleton = false)
public abstract class ConsumerBootstrap<T> {....}

指定扩展实现类:

@Extension("sofa")
public class DefaultConsumerBootstrap<T> extends ConsumerBootstrap<T> {...}

扩展描述文件 META-INF/services/sofa-rpc/com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap

sofa=com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap

当这些准备完成后,直接调用即可。

ConsumerBootstrap sofa =  ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");

接下来我们看看 ExtensionLoaderFactory 的源码

    /**
     * All extension loader {Class : ExtensionLoader}
     * 这个 map 里面装的是所有 ExtensionLoader
     */
    private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();


    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
        if (loader == null) {
            //get 不到则加上锁
            synchronized (ExtensionLoaderFactory.class) {
                // 防止其他线程操作再 get 一次
                loader = LOADER_MAP.get(clazz);
                if (loader == null) {loader = new ExtensionLoader<T>(clazz, listener);
                    LOADER_MAP.put(clazz, loader);
                }
            }
        }
        return loader;
    }

然后我们看一下 ExtensionLoader 这个类的构造器

    protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
        // 如果正在执行关闭,则将属性置空后直接返回
        if (RpcRunningState.isShuttingDown()) {
            this.interfaceClass = null;
            this.interfaceName = null;
            this.listener = null;
            this.factory = null;
            this.extensible = null;
            this.all = null;
            return;
        }
        // 接口为空,既不是接口,也不是抽象类
        if (interfaceClass == null ||
                !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
        }
        // 当前加载的接口类名
        this.interfaceClass = interfaceClass;
        // 接口名字
        this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
        this.listener = listener;
        // 接口上必须要有 Extensible 注解
        Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
        if (extensible == null) {
            throw new IllegalArgumentException("Error when load extensible interface" + interfaceName + ", must add annotation @Extensible.");
        } else {this.extensible = extensible;}
        // 如果是单例,那么 factory 不为空
        this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
        // 这个属性里面是这个接口的所有实现类
        this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
        if (autoLoad) {
            // 获取到扩展点加载的路径
            List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
            for (String path : paths) {
                // 根据路径加载文件
                loadFromFile(path);
            }
        }
    }

拿到所有的扩展点加载的路径后进入到 loadFromFile 中进行文件的加载

    protected synchronized void loadFromFile(String path) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Loading extension of extensible {} from path: {}", interfaceName, path);
        }
        // 默认如果不指定文件名字,就是接口名
        String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();
        String fullFileName = path + file;
        try {ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());
            loadFromClassLoader(classLoader, fullFileName);
        } catch (Throwable t) {if (LOGGER.isErrorEnabled()) {
                LOGGER.error("Failed to load extension of extensible" + interfaceName + "from path:" + fullFileName,
                    t);
            }
        }
    }
    
    
    protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable {Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName)
            : ClassLoader.getSystemResources(fullFileName);
        // 可能存在多个文件。if (urls != null) {while (urls.hasMoreElements()) {
                // 读取一个文件
                URL url = urls.nextElement();
                if (LOGGER.isDebugEnabled()) {LOGGER.debug("Loading extension of extensible {} from classloader: {} and file: {}",
                        interfaceName, classLoader, url);
                }
                BufferedReader reader = null;
                try {reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));
                    String line;
                    while ((line = reader.readLine()) != null) {readLine(url, line);
                    }
                } catch (Throwable t) {if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("Failed to load extension of extensible" + interfaceName
                            + "from classloader:" + classLoader + "and file:" + url, t);
                    }
                } finally {if (reader != null) {reader.close();
                    }
                }
            }
        }
    }

接下来进入到 readLine,这个方法主要是读取 prop 文件里面的每一行记录,并加载该实现类的类文件校验完后将文件添加到 all 属性中

    protected void readLine(URL url, String line) {
        // 读取文件里面的一行记录,并将这行记录用 = 号分割
        String[] aliasAndClassName = parseAliasAndClassName(line);
        if (aliasAndClassName == null || aliasAndClassName.length != 2) {return;}
        // 别名
        String alias = aliasAndClassName[0];
        // 包名
        String className = aliasAndClassName[1];
        // 读取配置的实现类
        Class tmp;
        try {tmp = ClassUtils.forName(className, false);
        } catch (Throwable e) {if (LOGGER.isWarnEnabled()) {LOGGER.warn("Extension {} of extensible {} is disabled, cause by: {}",
                    className, interfaceName, ExceptionUtils.toShortString(e, 2));
            }
            if (LOGGER.isDebugEnabled()) {LOGGER.debug("Extension" + className + "of extensible" + interfaceName + "is disabled.", e);
            }
            return;
        }
        if (!interfaceClass.isAssignableFrom(tmp)) {
            throw new IllegalArgumentException("Error when load extension of extensible" + interfaceName +
                "from file:" + url + "," + className + "is not subtype of interface.");
        }
        Class<? extends T> implClass = (Class<? extends T>) tmp;

        // 检查是否有可扩展标识
        Extension extension = implClass.getAnnotation(Extension.class);
        if (extension == null) {
            throw new IllegalArgumentException("Error when load extension of extensible" + interfaceName +
                "from file:" + url + "," + className + "must add annotation @Extension.");
        } else {String aliasInCode = extension.value();
            if (StringUtils.isBlank(aliasInCode)) {
                // 扩展实现类未配置 @Extension 标签
                throw new IllegalArgumentException("Error when load extension of extensible" + interfaceClass +
                    "from file:" + url + "," + className + "'s alias of @Extension is blank");
            }
            if (alias == null) {
                // spi 文件里没配置,用代码里的
                alias = aliasInCode;
            } else {
                // spi 文件里配置的和代码里的不一致
                if (!aliasInCode.equals(alias)) {
                    throw new IllegalArgumentException("Error when load extension of extensible" + interfaceName +
                        "from file:" + url + ", aliases of" + className + "are" +
                        "not equal between" + aliasInCode + "(code) and" + alias + "(file).");
                }
            }
            // 接口需要编号,实现类没设置
            if (extensible.coded() && extension.code() < 0) {
                throw new IllegalArgumentException("Error when load extension of extensible" + interfaceName +
                    "from file:" + url + ", code of @Extension must >=0 at" + className + ".");
            }
        }
        // 不可以是 default 和 *
        if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) {
            throw new IllegalArgumentException("Error when load extension of extensible" + interfaceName +
                "from file:" + url + ", alias of @Extension must not \"default\"and \"*\"at" + className + ".");
        }
        // 检查是否有存在同名的
        ExtensionClass old = all.get(alias);
        ExtensionClass<T> extensionClass = null;
        if (old != null) {
            // 如果当前扩展可以覆盖其它同名扩展
            if (extension.override()) {
                // 如果优先级还没有旧的高,则忽略
                if (extension.order() < old.getOrder()) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Extension of extensible {} with alias {} override from {} to {} failure," +
                            "cause by: order of old extension is higher",
                            interfaceName, alias, old.getClazz(), implClass);
                    }
                } else {if (LOGGER.isInfoEnabled()) {LOGGER.info("Extension of extensible {} with alias {}: {} has been override to {}",
                            interfaceName, alias, old.getClazz(), implClass);
                    }
                    // 如果当前扩展可以覆盖其它同名扩展
                    extensionClass = buildClass(extension, implClass, alias);
                }
            }
            // 如果旧扩展是可覆盖的
            else {if (old.isOverride() && old.getOrder() >= extension.order()) {
                    // 如果已加载覆盖扩展,再加载到原始扩展
                    if (LOGGER.isInfoEnabled()) {LOGGER.info("Extension of extensible {} with alias {}: {} has been loaded, ignore origin {}",
                            interfaceName, alias, old.getClazz(), implClass);
                    }
                } else {
                    // 如果不能被覆盖,抛出已存在异常
                    throw new IllegalStateException(
                        "Error when load extension of extensible" + interfaceClass + "from file:" + url +
                            ", Duplicate class with same alias:" + alias + "," + old.getClazz() + "and" + implClass);
                }
            }
        } else {extensionClass = buildClass(extension, implClass, alias);
        }
        if (extensionClass != null) {
            // 检查是否有互斥的扩展点
            for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) {ExtensionClass existed = entry.getValue();
                if (extensionClass.getOrder() >= existed.getOrder()) {
                    // 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展
                    String[] rejection = extensionClass.getRejection();
                    if (CommonUtils.isNotEmpty(rejection)) {for (String rej : rejection) {existed = all.get(rej);
                            if (existed == null || extensionClass.getOrder() < existed.getOrder()) {continue;}
                            ExtensionClass removed = all.remove(rej);
                            if (removed != null) {if (LOGGER.isInfoEnabled()) {
                                    LOGGER.info("Extension of extensible {} with alias {}: {} has been reject by new {}",
                                        interfaceName, removed.getAlias(), removed.getClazz(), implClass);
                                }
                            }
                        }
                    }
                } else {String[] rejection = existed.getRejection();
                    if (CommonUtils.isNotEmpty(rejection)) {for (String rej : rejection) {if (rej.equals(extensionClass.getAlias())) {
                                // 被其它扩展排掉
                                if (LOGGER.isInfoEnabled()) {
                                    LOGGER.info("Extension of extensible {} with alias {}: {} has been reject by old {}",
                                        interfaceName, alias, implClass, existed.getClazz());
                                    return;
                                }
                            }
                        }
                    }
                }
            }

            loadSuccess(alias, extensionClass);
        }
    }

加载完文件后我们再回到

ConsumerBootstrap sofa =  ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");

进入到 getExtension 方法中

    public ExtensionClass<T> getExtensionClass(String alias) {return all == null ? null : all.get(alias);
    }

    public T getExtension(String alias) {
        // 从 all 属性中拿到加载的 class
        ExtensionClass<T> extensionClass = getExtensionClass(alias);
        if (extensionClass == null) {throw new SofaRpcRuntimeException("Not found extension of" + interfaceName + "named: \"" + alias + "\"!");
        } else {
            // 在加载 class 的时候,校验了是否是单例,如果是单例,那么 factory 不为 null
            if (extensible.singleton() && factory != null) {T t = factory.get(alias);
                if (t == null) {synchronized (this) {t = factory.get(alias);
                        if (t == null) {
                            // 实例化
                            t = extensionClass.getExtInstance();
                            // 放入到 factory,单例的 class 下次直接拿就好了,不需要重新创建
                            factory.put(alias, t);
                        }
                    }
                }
                return t;
            } else {
                // 实例化
                return extensionClass.getExtInstance();}
        }
    }

我们进入到 ExtensionClass 看看 getExtInstance 方法

    /**
     * 服务端实例对象(只在是单例的时候保留)* 用 volatile 修饰,保证了可见性
     */
    private volatile transient T       instance;
    
    /**
     * 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象
     *
     * @param argTypes 构造函数参数类型
     * @param args     构造函数参数值
     * @return 扩展点对象实例 ext instance
     */
    public T getExtInstance(Class[] argTypes, Object[] args) {if (clazz != null) {
            try {if (singleton) { // 如果是单例
                    if (instance == null) {synchronized (this) {if (instance == null) {
                                // 通过反射创建实例
                                instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
                            }
                        }
                    }
                    return instance; // 保留单例
                } else {
                    // 通过反射创建实例
                    return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
                }
            } catch (Exception e) {throw new SofaRpcRuntimeException("create" + clazz.getCanonicalName() + "instance error", e);
            }
        }
        throw new SofaRpcRuntimeException("Class of ExtensionClass is null");
    }

看完了 SOFARPC 的扩展类实现后感觉代码写的非常的整洁,逻辑非常的清晰,里面有很多可以学习的地方,比如线程安全用到了双重检查锁和 volatile 保证可见性。

正文完
 0