关于java:Skywalking13Skywalking模块加载机制

2次阅读

共计 38024 个字符,预计需要花费 96 分钟才能阅读完成。

模块加载机制

根本概述

ModuleSkywalkingOAP 提供的一种治理性能个性的机制。通过 Module 机制,能够不便的定义模块,并且能够提供多种实现,在配置文件中任意抉择实现。

模块相干配置文件能够参考:Backend setup、Configuration Vocabulary

类图

Skywalking 中模块治理相干性能都在 org.apache.skywalking.oap.server.library.module 包下。

通过类图能够理解 Skywalking 模块机制大抵分成如下几个模块:

  • 模块配置:ApplicationConfigurationModuleConfigurationProviderConfiguration

    • PS:刚好对应 application.yml 三层构造:模块 -> 模块实现 -> 某个模块实现的配置。
  • 模块定义类:ModuleDefine
  • 模块提供类:ModuleProvider
  • 服务:Service
  • 治理类:ModuleManager
  • 一些辅助类

    • ModuleDefineHolder:模块治理类须要实现的接口,提供查找模块相干性能
    • ModuleProviderHolder:模块定义类须要实现的接口,提供获取模块的服务类性能
    • ModuleServiceHolder:模块提供类须要实现的接口,提供注册服务实现、获取服务对象的性能
    • ModuleConfig:模块配置类,模块定义类会将 ProviderConfiguration 映射为 ModuleConfig
    • ApplicationConfigLoaderApplicationConfiguration 的辅助类,将 application.yml 配置文件加载到内存,设置 selector 对应的 Provider 的配置信息

类图源文件:Skywalking-Module.uml

源码解析

ModuleDefine

package org.apache.skywalking.oap.server.library.module;

import java.lang.reflect.Field;
import java.util.Enumeration;
import java.util.Properties;
import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// 模块定义
public abstract class ModuleDefine implements ModuleProviderHolder {private static final Logger LOGGER = LoggerFactory.getLogger(ModuleDefine.class);
    // 模块理论的
    private ModuleProvider loadedProvider = null;

    private final String name;

    public ModuleDefine(String name) {this.name = name;}

    // 模块名
    public final String name() {return name;}

    // 实现类能够定义模块提供的服务类
    public abstract Class[] services();

    /**
     * Run the prepare stage for the module, including finding all potential providers, and asking them to prepare.
     *
     * @param moduleManager of this module
     * @param configuration of this module
     * @throws ProviderNotFoundException when even don't find a single one providers.
     */
    // 筹备阶段,找到 configuration 配置类对应的 ModuleProvider 对象,进行初始化操作
    void prepare(
        // 模块治理对象
        ModuleManager moduleManager, 
        // 模块配置类
        ApplicationConfiguration.ModuleConfiguration configuration,
        // 模块提供类的服务加载器
        ServiceLoader<ModuleProvider> moduleProviderLoader
    ) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException, ModuleStartException {
        // 找到 configuration 配置类对应的 ModuleProvider 对象
        for (ModuleProvider provider : moduleProviderLoader) {if (!configuration.has(provider.name())) {continue;}
            if (provider.module().equals(getClass())) {if (loadedProvider == null) {
                    loadedProvider = provider;
                    loadedProvider.setManager(moduleManager);
                    loadedProvider.setModuleDefine(this);
                } else {throw new DuplicateProviderException(this.name() + "module has one" + loadedProvider.name() + "[" + loadedProvider.getClass().getName() + "] provider already," + provider.name() + "[" + provider.getClass().getName() + "] is defined as 2nd provider.");
                }
            }
        }
        if (loadedProvider == null) {throw new ProviderNotFoundException(this.name() + "module no provider found.");
        }

        // 复制提供类的配置文件至 ModuleConfig 对象
        LOGGER.info("Prepare the {} provider in {} module.", loadedProvider.name(), this.name());
        try {
            copyProperties(loadedProvider.createConfigBeanIfAbsent(), 
                configuration.getProviderConfiguration(loadedProvider.name()), 
                this.name(), 
                loadedProvider.name());
        } catch (IllegalAccessException e) {throw new ModuleConfigException(this.name() + "module config transport to config bean failure.", e);
        }
        // 模块提供对象进入筹备阶段
        loadedProvider.prepare();}

    // 应用反射复制属性
    private void copyProperties(ModuleConfig dest, Properties src, String moduleName, String providerName) throws IllegalAccessException {if (dest == null) {return;}
        Enumeration<?> propertyNames = src.propertyNames();
        while (propertyNames.hasMoreElements()) {String propertyName = (String) propertyNames.nextElement();
            Class<? extends ModuleConfig> destClass = dest.getClass();
            try {Field field = getDeclaredField(destClass, propertyName);
                field.setAccessible(true);
                field.set(dest, src.get(propertyName));
            } catch (NoSuchFieldException e) {LOGGER.warn(propertyName + "setting is not supported in" + providerName + "provider of" + moduleName + "module");
            }
        }
    }

    private Field getDeclaredField(Class<?> destClass, String fieldName) throws NoSuchFieldException {if (destClass != null) {Field[] fields = destClass.getDeclaredFields();
            for (Field field : fields) {if (field.getName().equals(fieldName)) {return field;}
            }
            return getDeclaredField(destClass.getSuperclass(), fieldName);
        }
        throw new NoSuchFieldException();}

    // 获取模块定义对应的 Provider 对象
    @Override
    public final ModuleProvider provider() throws DuplicateProviderException, ProviderNotFoundException {if (loadedProvider == null) {throw new ProviderNotFoundException("There is no module provider in" + this.name() + "module!");
        }
        return loadedProvider;
    }
}

ModuleProviderHolder

package org.apache.skywalking.oap.server.library.module;

// 模块提供持有接口,通过该接口,能够获取模块 Provider 对象对应的 Service 持有接口,从而拿到模块 Provider 对象对应的服务对象
public interface ModuleProviderHolder {
    // 获取模块提供对象
    ModuleServiceHolder provider() throws DuplicateProviderException, ProviderNotFoundException;}

ModuleProvider

package org.apache.skywalking.oap.server.library.module;

import java.util.HashMap;
import java.util.Map;
import lombok.Setter;

// 模块提供抽象类,所有的模块提供类都须要继承该抽象类
// 一个模块定义能够配置多个模块提供类,通过在 application.yml 进行切换
public abstract class ModuleProvider implements ModuleServiceHolder {
    // 模块管理器
    @Setter
    private ModuleManager manager;
    // 模块定义对象
    @Setter
    private ModuleDefine moduleDefine;
    // 模块提供对应的服务对象 map
    private final Map<Class<? extends Service>, Service> services = new HashMap<>();

    public ModuleProvider() {}

    protected final ModuleManager getManager() {return manager;}

    // 获取服务提供实现类的 name,须要子类实现
    public abstract String name();

    // 定义模块提供者所实现的模块定义类
    public abstract Class<? extends ModuleDefine> module();

    // 创立模块定义配置对象
    public abstract ModuleConfig createConfigBeanIfAbsent();

    // 筹备阶段(初始化与其余模块无关的事件)public abstract void prepare() throws ServiceNotProvidedException, ModuleStartException;

    // 启动阶段(该阶段模块间能够相互操作)public abstract void start() throws ServiceNotProvidedException, ModuleStartException;

    // 实现后告诉阶段(在所有模块胜利启动后执行)public abstract void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException;

    // 该模块须要依赖的其余模块名
    public abstract String[] requiredModules();

    // 注册服务实现类
    @Override
    public final void registerServiceImplementation(Class<? extends Service> serviceType, Service service) throws ServiceNotProvidedException {if (serviceType.isInstance(service)) {this.services.put(serviceType, service);
        } else {throw new ServiceNotProvidedException(serviceType + "is not implemented by" + service);
        }
    }

    // 确保所有服务被实现
    void requiredCheck(Class<? extends Service>[] requiredServices) throws ServiceNotProvidedException {if (requiredServices == null)
            return;
        for (Class<? extends Service> service : requiredServices) {if (!services.containsKey(service)) {throw new ServiceNotProvidedException("Service:" + service.getName() + "not provided");
            }
        }
        if (requiredServices.length != services.size()) {throw new ServiceNotProvidedException("The" + this.name() + "provider in" + moduleDefine.name() + "moduleDefine provide more service implementations than ModuleDefine requirements.");
        }
    }

    // 获取服务实现对象
    @Override
    public @SuppressWarnings("unchecked")
    <T extends Service> T getService(Class<T> serviceType) throws ServiceNotProvidedException {Service serviceImpl = services.get(serviceType);
        if (serviceImpl != null) {return (T) serviceImpl;
        }
        throw new ServiceNotProvidedException("Service" + serviceType.getName() + "should not be provided, based on moduleDefine define.");
    }

    ModuleDefine getModule() {return moduleDefine;}

    String getModuleName() {return moduleDefine.name();
    }
}

ModuleConfig

package org.apache.skywalking.oap.server.library.module;

// 模块配置类
public abstract class ModuleConfig {}

ModuleServiceHolder

package org.apache.skywalking.oap.server.library.module;

// 模块服务持有接口
public interface ModuleServiceHolder {
    // 注册服务实现对象
    void registerServiceImplementation(Class<? extends Service> serviceType, Service service) throws ServiceNotProvidedException;
    
    // 获取服务实现对象
    <T extends Service> T getService(Class<T> serviceType) throws ServiceNotProvidedException;
}

Service

package org.apache.skywalking.oap.server.library.module;

// 服务接口
public interface Service {}

ModuleDefineHolder

package org.apache.skywalking.oap.server.library.module;

// 模块定义持有接口
public interface ModuleDefineHolder {
    // 判断是否有该模块
    boolean has(String moduleName);
    
    // 通过模块名获取模块定义对象
    ModuleProviderHolder find(String moduleName) throws ModuleNotFoundRuntimeException;
}

ModuleManager

package org.apache.skywalking.oap.server.library.module;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.ServiceLoader;

// 模块治理类,治理模块的生命周期
public class ModuleManager implements ModuleDefineHolder {
    // 所有模块是否曾经通过筹备阶段
    private boolean isInPrepareStage = true;
    
    // 所有被加载的模块定义对象 map
    private final Map<String, ModuleDefine> loadedModules = new HashMap<>();

    // 初始化所有配置的模块
    public void init(ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
        // 获取配置类中的模块名
        String[] moduleNames = applicationConfiguration.moduleList();
        // SPI 加载所有模块定义对象
        ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
        // SPI 加载所有模块提供对象
        ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
        // 所有配置类中定义的模块,进行筹备阶段
        LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
        for (ModuleDefine module : moduleServiceLoader) {for (String moduleName : moduleNames) {if (moduleName.equals(module.name())) {module.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName), moduleProviderLoader);
                    loadedModules.put(moduleName, module);
                    moduleList.remove(moduleName);
                }
            }
        }
        // 筹备阶段完结
        isInPrepareStage = false;

        if (moduleList.size() > 0) {throw new ModuleNotFoundException(moduleList.toString() + "missing.");
        }

        // 依据模块提供对象中的 requiredModules 办法,确定模块的初始化程序(被依赖的模块后行加载)BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
        // 所有模块进入启动阶段
        bootstrapFlow.start(this);
        // 所有模块进入实现后告诉阶段
        bootstrapFlow.notifyAfterCompleted();}

    // 判断是否有该模块
    @Override
    public boolean has(String moduleName) {return loadedModules.get(moduleName) != null;
    }

    // 通过模块名获取模块定义对象
    @Override
    public ModuleProviderHolder find(String moduleName) throws ModuleNotFoundRuntimeException {assertPreparedStage();
        ModuleDefine module = loadedModules.get(moduleName);
        if (module != null)
            return module;
        throw new ModuleNotFoundRuntimeException(moduleName + "missing.");
    }

    // 断言是否还在筹备阶段,如果还在筹备阶段,则抛出异样
    private void assertPreparedStage() {if (isInPrepareStage) {throw new AssertionError("Still in preparing stage.");
        }
    }
}

BootstrapFlow

package org.apache.skywalking.oap.server.library.module;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// 依据模块提供对象中的 requiredModules 办法,确定模块的初始化程序(被依赖的模块后行加载)class BootstrapFlow {private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapFlow.class);

    private Map<String, ModuleDefine> loadedModules;
    // 按依赖程序排序的模块提供对象列表
    private List<ModuleProvider> startupSequence;

    BootstrapFlow(Map<String, ModuleDefine> loadedModules) throws CycleDependencyException, ModuleNotFoundException {
        this.loadedModules = loadedModules;
        startupSequence = new LinkedList<>();

        // 被依赖的模块后行加载
        makeSequence();}

    @SuppressWarnings("unchecked")
    void start(ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {for (ModuleProvider provider : startupSequence) {LOGGER.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
            provider.requiredCheck(provider.getModule().services());

            provider.start();}
    }

    void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {for (ModuleProvider provider : startupSequence) {provider.notifyAfterCompleted();
        }
    }

    private void makeSequence() throws CycleDependencyException, ModuleNotFoundException {List<ModuleProvider> allProviders = new ArrayList<>();
        // 判断所有被依赖的模块是否存在
        for (final ModuleDefine module : loadedModules.values()) {String[] requiredModules = module.provider().requiredModules();
            if (requiredModules != null) {for (String requiredModule : requiredModules) {if (!loadedModules.containsKey(requiredModule)) {throw new ModuleNotFoundException(requiredModule + "module is required by" + module.provider().getModuleName() + "." + module.provider().name() + ", but not found.");
                    }
                }
            }

            allProviders.add(module.provider());
        }

        do {int numOfToBeSequenced = allProviders.size();
            for (int i = 0; i < allProviders.size(); i++) {ModuleProvider provider = allProviders.get(i);
                String[] requiredModules = provider.requiredModules();
                if (CollectionUtils.isNotEmpty(requiredModules)) {
                    // 是否所有依赖的模块都在 startupSequence 中
                    boolean isAllRequiredModuleStarted = true;
                    for (String module : requiredModules) {
                        boolean exist = false;
                        for (ModuleProvider moduleProvider : startupSequence) {if (moduleProvider.getModuleName().equals(module)) {
                                exist = true;
                                break;
                            }
                        }
                        if (!exist) {
                            isAllRequiredModuleStarted = false;
                            break;
                        }
                    }
                    // 所有依赖的模块都在 startupSequence,则将该模块提供对象退出 startupSequence
                    if (isAllRequiredModuleStarted) {startupSequence.add(provider);
                        allProviders.remove(i);
                        i--;
                    }
                } else {
                    // 如果该模块提供对象不依赖任何其余模块,则退出 startupSequence
                    startupSequence.add(provider);
                    allProviders.remove(i);
                    i--;
                }
            }

            // 如果一次循环后,没有任何一个对象退出到 startupSequence,则证实有循环依赖
            if (numOfToBeSequenced == allProviders.size()) {StringBuilder unSequencedProviders = new StringBuilder();
                allProviders.forEach(provider -> unSequencedProviders.append(provider.getModuleName()).append("[provider=").append(provider.getClass().getName()).append("]\n"));
                throw new CycleDependencyException("Exist cycle module dependencies in \n" + unSequencedProviders.substring(0, unSequencedProviders.length() - 1));
            }
        } while (allProviders.size() != 0); // 当提供对象列表不为空,则始终循环执行上来
    }
}

ApplicationConfiguration

package org.apache.skywalking.oap.server.library.module;

import java.util.HashMap;
import java.util.Properties;

// OAP 利用配置类
public class ApplicationConfiguration {
    // 模块定义配置 map
    private HashMap<String, ModuleConfiguration> modules = new HashMap<>();

    // 模块配置名列表
    public String[] moduleList() {return modules.keySet().toArray(new String[0]);
    }

    // 增加模块定义配置
    public ModuleConfiguration addModule(String moduleName) {ModuleConfiguration newModule = new ModuleConfiguration();
        modules.put(moduleName, newModule);
        return newModule;
    }

    // 判断指定模块名是否存在模块定义配置 map 中
    public boolean has(String moduleName) {return modules.containsKey(moduleName);
    }

    // 获取模块定义配置
    public ModuleConfiguration getModuleConfiguration(String name) {return modules.get(name);
    }

    // 模块定义配置类
    public static class ModuleConfiguration {
        // 模块提供对象 map
        private HashMap<String, ProviderConfiguration> providers = new HashMap<>();

        private ModuleConfiguration() {}

        // 获取模块提供配置
        public Properties getProviderConfiguration(String name) {return providers.get(name).getProperties();}

        // 是否存在模块提供配置
        public boolean has(String name) {return providers.containsKey(name);
        }

        // 增加模块提供配置
        public ModuleConfiguration addProviderConfiguration(String name, Properties properties) {ProviderConfiguration newProvider = new ProviderConfiguration(properties);
            providers.put(name, newProvider);
            return this;
        }
    }

    // 模块提供配置类
    public static class ProviderConfiguration {
        // 模块提供属性
        private Properties properties;

        ProviderConfiguration(Properties properties) {this.properties = properties;}

        private Properties getProperties() {return properties;}
    }
}

ApplicationConfigLoader

package org.apache.skywalking.oap.server.starter.config;

import java.io.FileNotFoundException;
import java.io.Reader;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.PropertyPlaceholderHelper;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.yaml.snakeyaml.Yaml;

// application.yml 加载类, 三层构造: 模块定义名. 模块提供名. 属性 key
@Slf4j
public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfiguration> {
    // 当不配置模块提供者时,应用 "-"
    private static final String DISABLE_SELECTOR = "-";
    // 该字段抉择模块提供者
    private static final String SELECTOR = "selector";

    private final Yaml yaml = new Yaml();

    @Override
    public ApplicationConfiguration load() throws ConfigFileNotFoundException {ApplicationConfiguration configuration = new ApplicationConfiguration();
        this.loadConfig(configuration);
        this.overrideConfigBySystemEnv(configuration);
        return configuration;
    }

    @SuppressWarnings("unchecked")
    private void loadConfig(ApplicationConfiguration configuration) throws ConfigFileNotFoundException {
        try {Reader applicationReader = ResourceUtils.read("application.yml");
            Map<String, Map<String, Object>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
            if (CollectionUtils.isNotEmpty(moduleConfig)) {selectConfig(moduleConfig);
                moduleConfig.forEach((moduleName, providerConfig) -> {if (providerConfig.size() > 0) {log.info("Get a module define from application.yml, module name: {}", moduleName);
                        ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
                        providerConfig.forEach((providerName, config) -> {log.info("Get a provider define belong to {} module, provider name: {}", moduleName, providerName);
                            final Map<String, ?> propertiesConfig = (Map<String, ?>) config;
                            final Properties properties = new Properties();
                            if (propertiesConfig != null) {propertiesConfig.forEach((propertyName, propertyValue) -> {if (propertyValue instanceof Map) {Properties subProperties = new Properties();
                                        ((Map) propertyValue).forEach((key, value) -> {subProperties.put(key, value);
                                            replacePropertyAndLog(key, value, subProperties, providerName);
                                        });
                                        properties.put(propertyName, subProperties);
                                    } else {properties.put(propertyName, propertyValue);
                                        replacePropertyAndLog(propertyName, propertyValue, properties, providerName);
                                    }
                                });
                            }
                            moduleConfiguration.addProviderConfiguration(providerName, properties);
                        });
                    } else {log.warn("Get a module define from application.yml, but no provider define, use default, module name: {}", moduleName);
                    }
                });
            }
        } catch (FileNotFoundException e) {throw new ConfigFileNotFoundException(e.getMessage(), e);
        }
    }

    private void replacePropertyAndLog(final Object propertyName, final Object propertyValue, final Properties target, final Object providerName) {final String valueString = PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(propertyValue + "", target);
        if (valueString != null) {if (valueString.trim().length() == 0) {target.replace(propertyName, valueString);
                log.info("Provider={} config={} has been set as an empty string", providerName, propertyName);
            } else {
                // Use YAML to do data type conversion.
                final Object replaceValue = yaml.load(valueString);
                if (replaceValue != null) {target.replace(propertyName, replaceValue);
                    log.info("Provider={} config={} has been set as {}", providerName, propertyName, replaceValue.toString());
                }
            }
        }
    }

    private void overrideConfigBySystemEnv(ApplicationConfiguration configuration) {for (Map.Entry<Object, Object> prop : System.getProperties().entrySet()) {overrideModuleSettings(configuration, prop.getKey().toString(), prop.getValue().toString());
        }
    }

    private void selectConfig(final Map<String, Map<String, Object>> moduleConfiguration) {final Set<String> modulesWithoutProvider = new HashSet<>();
        for (final Map.Entry<String, Map<String, Object>> entry : moduleConfiguration.entrySet()) {final String moduleName = entry.getKey();
            final Map<String, Object> providerConfig = entry.getValue();
            if (!providerConfig.containsKey(SELECTOR)) {continue;}
            final String selector = (String) providerConfig.get(SELECTOR);
            final String resolvedSelector = PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(selector, System.getProperties()
            );
            providerConfig.entrySet().removeIf(e -> !resolvedSelector.equals(e.getKey()));

            if (!providerConfig.isEmpty()) {continue;}

            if (!DISABLE_SELECTOR.equals(resolvedSelector)) {throw new ProviderNotFoundException("no provider found for module" + moduleName + "," + "if you're sure it's not required module and want to remove it," + "set the selector to -");
            }

            // now the module can be safely removed
            modulesWithoutProvider.add(moduleName);
        }

        moduleConfiguration.entrySet().removeIf(e -> {final String module = e.getKey();
            final boolean shouldBeRemoved = modulesWithoutProvider.contains(module);
            if (shouldBeRemoved) {log.info("Remove module {} without any provider", module);
            }
            return shouldBeRemoved;
        });
    }

    private void overrideModuleSettings(ApplicationConfiguration configuration, String key, String value) {int moduleAndConfigSeparator = key.indexOf('.');
        if (moduleAndConfigSeparator <= 0) {return;}
        String moduleName = key.substring(0, moduleAndConfigSeparator);
        String providerSettingSubKey = key.substring(moduleAndConfigSeparator + 1);
        ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.getModuleConfiguration(moduleName);
        if (moduleConfiguration == null) {return;}
        int providerAndConfigSeparator = providerSettingSubKey.indexOf('.');
        if (providerAndConfigSeparator <= 0) {return;}
        String providerName = providerSettingSubKey.substring(0, providerAndConfigSeparator);
        String settingKey = providerSettingSubKey.substring(providerAndConfigSeparator + 1);
        if (!moduleConfiguration.has(providerName)) {return;}
        Properties providerSettings = moduleConfiguration.getProviderConfiguration(providerName);
        if (!providerSettings.containsKey(settingKey)) {return;}
        Object originValue = providerSettings.get(settingKey);
        Class<?> type = originValue.getClass();
        if (type.equals(int.class) || type.equals(Integer.class))
            providerSettings.put(settingKey, Integer.valueOf(value));
        else if (type.equals(String.class))
            providerSettings.put(settingKey, value);
        else if (type.equals(long.class) || type.equals(Long.class))
            providerSettings.put(settingKey, Long.valueOf(value));
        else if (type.equals(boolean.class) || type.equals(Boolean.class)) {providerSettings.put(settingKey, Boolean.valueOf(value));
        } else {return;}
        log.info("The setting has been override by key: {}, value: {}, in {} provider of {} module through {}", settingKey, value, providerName, moduleName, "System.properties");
    }
}

ConfigLoader

package org.apache.skywalking.oap.server.starter.config;

// 配置加载接口
public interface ConfigLoader<T> {T load() throws ConfigFileNotFoundException;
}

OAPServerBootstrap

package org.apache.skywalking.oap.server.starter;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader;
import org.apache.skywalking.oap.server.starter.config.ConfigLoader;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;

// OAP 启动类,加载配置文件,初始化模块
@Slf4j
public class OAPServerBootstrap {public static void start() {String mode = System.getProperty("mode");
        // 启动模式
        RunningMode.setMode(mode);

        ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
        ModuleManager manager = new ModuleManager();
        try {
            // 从配置文件中加载配置
            ApplicationConfiguration applicationConfiguration = configLoader.load();
            // 初始化模块
            manager.init(applicationConfiguration);

            // 将启动工夫发送给 Telemetry
            manager.find(TelemetryModule.NAME)
                   .provider()
                   .getService(MetricsCreator.class)
                   .createGauge("uptime", "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
                   // Set uptime to second
                   .setValue(System.currentTimeMillis() / 1000d);

            if (RunningMode.isInitMode()) {log.info("OAP starts up in init mode successfully, exit now...");
                System.exit(0);
            }
        } catch (Throwable t) {log.error(t.getMessage(), t);
            System.exit(1);
        }
    }
}

OAPServerStartUp

package org.apache.skywalking.oap.server.starter;

// OAP 启动类
public class OAPServerStartUp {public static void main(String[] args) {OAPServerBootstrap.start();
    }
}

Skywalking OAP 启动流程剖析模块加载机制

时序图

源文件:OAPServerStartUp.sdt

案例:存储模块加载剖析

配置文件

application.yml 配置文件,能够看出。

模块是以三层构造来定义的:

  • 第一层:模块定义名
  • 第二层:模块提供名 / selector
  • 第三层:模块提供配置信息 / selector 抉择的模块提供配置
storage:
  selector: ${SW_STORAGE:h2}
  elasticsearch:
    nameSpace: ${SW_NAMESPACE:""}
    clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
    # etc...
  elasticsearch7:
    nameSpace: ${SW_NAMESPACE:""}
    clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
    # etc...
  h2:
    driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
    url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db;DB_CLOSE_DELAY=-1}
    # etc...
  mysql:
    properties:
      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
      dataSource.user: ${SW_DATA_SOURCE_USER:root}
      dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
      # etc...
  tidb:
    properties:
      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
      dataSource.user: ${SW_DATA_SOURCE_USER:root}
      dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
      # etc...
  influxdb:
    url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086}
    user: ${SW_STORAGE_INFLUXDB_USER:root}
    password: ${SW_STORAGE_INFLUXDB_PASSWORD:}
    # etc...

加载配置

通过 org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader#load 的调用,org.apache.skywalking.oap.server.starter.OAPServerBootstrap#start 获取到了所有须要加载的模块,其中包含存储模块。

org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader#selectConfig 也通过 storage.selector=h2,存储模块只保留了 h2 的配置信息:

storage:
    h2:
    driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
    url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db;DB_CLOSE_DELAY=-1}
    # etc...

筹备阶段

org.apache.skywalking.oap.server.library.module.ModuleManager#init 中,通过 SPI 加载了模块定义对象,存储模块对应的定义类如下:

PS:能够看到定义了大量 Service 接口

package org.apache.skywalking.oap.server.core.storage;

import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;

/**
 * StorageModule provides the capabilities(services) to interact with the database. With different databases, this
 * module could have different providers, such as currently, H2, MySQL, ES, TiDB.
 */
public class StorageModule extends ModuleDefine {

    public static final String NAME = "storage";

    public StorageModule() {super(NAME);
    }

    @Override
    public Class[] services() {return new Class[]{
                IBatchDAO.class,
                StorageDAO.class,
                IHistoryDeleteDAO.class,
                INetworkAddressAliasDAO.class,
                ITopologyQueryDAO.class,
                IMetricsQueryDAO.class,
                ITraceQueryDAO.class,
                IMetadataQueryDAO.class,
                IAggregationQueryDAO.class,
                IAlarmQueryDAO.class,
                ITopNRecordsQueryDAO.class,
                ILogQueryDAO.class,
                IProfileTaskQueryDAO.class,
                IProfileTaskLogQueryDAO.class,
                IProfileThreadSnapshotQueryDAO.class,
                UITemplateManagementDAO.class,
                IBrowserLogQueryDAO.class
        };
    }
}

同时也会调用 org.apache.skywalking.oap.server.library.module.ModuleDefine#prepare 进入筹备阶段

        String[] moduleNames = applicationConfiguration.moduleList();
        ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
        ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);

        LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
        for (ModuleDefine module : moduleServiceLoader) {for (String moduleName : moduleNames) {if (moduleName.equals(module.name())) {module.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName), moduleProviderLoader);
                    loadedModules.put(moduleName, module);
                    moduleList.remove(moduleName);
                }
            }
        }

org.apache.skywalking.oap.server.library.module.ModuleDefine#prepare 会通过传入的配置,只匹配上配置文件抉择的模块提供对象。

        for (ModuleProvider provider : moduleProviderLoader) {if (!configuration.has(provider.name())) {continue;}

            if (provider.module().equals(getClass())) {if (loadedProvider == null) {
                    loadedProvider = provider;
                    loadedProvider.setManager(moduleManager);
                    loadedProvider.setModuleDefine(this);
                } else {throw new DuplicateProviderException(this.name() + "module has one" + loadedProvider.name() + "[" + loadedProvider.getClass().getName() 
                                                         + "] provider already," + provider.name() + "[" + provider.getClass().getName() + "] is defined as 2nd provider.");
                }
            }

        }

        if (loadedProvider == null) {throw new ProviderNotFoundException(this.name() + "module no provider found.");
        }

        LOGGER.info("Prepare the {} provider in {} module.", loadedProvider.name(), this.name());
        try {copyProperties(loadedProvider.createConfigBeanIfAbsent(), configuration.getProviderConfiguration(loadedProvider.name()), this.name(), loadedProvider.name());
        } catch (IllegalAccessException e) {throw new ModuleConfigException(this.name() + "module config transport to config bean failure.", e);
        }
        loadedProvider.prepare();

例如“配置文件”一节抉择的 h2,则加载的提供类为 org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider

它的 prepare 办法如下,能够看到:注册了所有 StorageModule 申明的 Service 接口

    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {Properties settings = new Properties();
        settings.setProperty("dataSourceClassName", config.getDriver());
        settings.setProperty("dataSource.url", config.getUrl());
        settings.setProperty("dataSource.user", config.getUser());
        settings.setProperty("dataSource.password", config.getPassword());
        h2Client = new JDBCHikariCPClient(settings);

        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
        this.registerServiceImplementation(
            StorageDAO.class,
            new H2StorageDAO(getManager(), h2Client, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
        );

        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));

        this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
        this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(h2Client));
        this.registerServiceImplementation(
            ITraceQueryDAO.class, new H2TraceQueryDAO(getManager(),
                h2Client,
                config.getMaxSizeOfArrayColumn(),
                config.getNumOfSearchableValuesPerTag()));
        this.registerServiceImplementation(IBrowserLogQueryDAO.class, new H2BrowserLogQueryDAO(h2Client));
        this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
        this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
        this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
        this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
        this.registerServiceImplementation(
            ILogQueryDAO.class,
            new H2LogQueryDAO(
                h2Client,
                getManager(),
                config.getMaxSizeOfArrayColumn(),
                config.getNumOfSearchableValuesPerTag())
        );

        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
        this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
        this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
        this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(h2Client));
    }

启动阶段

org.apache.skywalking.oap.server.library.module.ModuleManager#init 通过调用 org.apache.skywalking.oap.server.library.module.BootstrapFlow#start 进入启动阶段

    void start(ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {for (ModuleProvider provider : startupSequence) {LOGGER.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
            provider.requiredCheck(provider.getModule().services());

            provider.start();}
    }

例如“配置文件”一节抉择的 h2,则加载的提供类为 org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider

它的 start 办法如下,能够看到:启动 h2client 并监听 ModelCreator

    @Override
    public void start() throws ServiceNotProvidedException, ModuleStartException {final ConfigService configService = getManager().find(CoreModule.NAME)
                                                        .provider()
                                                        .getService(ConfigService.class);
        final int numOfSearchableTracesTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
        if (numOfSearchableTracesTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
            throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTracesTags
                                               + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
                                               + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
                                               + "]. Potential out of bound in the runtime.");
        }
        final int numOfSearchableLogsTags = configService.getSearchableLogsTags().split(Const.COMMA).length;
        if (numOfSearchableLogsTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
            throw new ModuleStartException("Size of searchableLogsTags[" + numOfSearchableLogsTags
                                               + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
                                               + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
                                               + "]. Potential out of bound in the runtime.");
        }

        MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
                                                   .provider()
                                                   .getService(MetricsCreator.class);
        HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        h2Client.registerChecker(healthChecker);
        try {h2Client.connect();

            H2TableInstaller installer = new H2TableInstaller(h2Client, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag());
            getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
        } catch (StorageException e) {throw new ModuleStartException(e.getMessage(), e);
        }
    }

实现后告诉阶段

org.apache.skywalking.oap.server.library.module.ModuleManager#init 通过调用 org.apache.skywalking.oap.server.library.module.BootstrapFlow#notifyAfterCompleted 进入实现后告诉阶段

    void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {for (ModuleProvider provider : startupSequence) {provider.notifyAfterCompleted();
        }
    }

例如“配置文件”一节抉择的 h2,则加载的提供类为 org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider

它的 notifyAfterCompleted 办法如下,能够看到:不须要做什么

    @Override
    public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {}

H2StorageProvider 残缺源码

package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;

// etc...

/**
 * H2 Storage provider is for demonstration and preview only. I will find that haven't implemented several interfaces,
 * because not necessary, and don't consider about performance very much.
 * <p>
 * If someone wants to implement SQL-style database as storage, please just refer the logic.
 */
@Slf4j
public class H2StorageProvider extends ModuleProvider {

    private H2StorageConfig config;
    private JDBCHikariCPClient h2Client;

    public H2StorageProvider() {config = new H2StorageConfig();
    }

    @Override
    public String name() {return "h2";}

    @Override
    public Class<? extends ModuleDefine> module() {return StorageModule.class;}

    @Override
    public ModuleConfig createConfigBeanIfAbsent() {return config;}

    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {Properties settings = new Properties();
        settings.setProperty("dataSourceClassName", config.getDriver());
        settings.setProperty("dataSource.url", config.getUrl());
        settings.setProperty("dataSource.user", config.getUser());
        settings.setProperty("dataSource.password", config.getPassword());
        h2Client = new JDBCHikariCPClient(settings);

        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
        this.registerServiceImplementation(
            StorageDAO.class,
            new H2StorageDAO(getManager(), h2Client, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
        );

        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));

        this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
        this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(h2Client));
        this.registerServiceImplementation(
            ITraceQueryDAO.class, new H2TraceQueryDAO(getManager(),
                h2Client,
                config.getMaxSizeOfArrayColumn(),
                config.getNumOfSearchableValuesPerTag()));
        this.registerServiceImplementation(IBrowserLogQueryDAO.class, new H2BrowserLogQueryDAO(h2Client));
        this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
        this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
        this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
        this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
        this.registerServiceImplementation(
            ILogQueryDAO.class,
            new H2LogQueryDAO(
                h2Client,
                getManager(),
                config.getMaxSizeOfArrayColumn(),
                config.getNumOfSearchableValuesPerTag())
        );

        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
        this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
        this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
        this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(h2Client));
    }

    @Override
    public void start() throws ServiceNotProvidedException, ModuleStartException {final ConfigService configService = getManager().find(CoreModule.NAME)
                                                        .provider()
                                                        .getService(ConfigService.class);
        final int numOfSearchableTracesTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
        if (numOfSearchableTracesTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
            throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTracesTags
                                               + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
                                               + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
                                               + "]. Potential out of bound in the runtime.");
        }
        final int numOfSearchableLogsTags = configService.getSearchableLogsTags().split(Const.COMMA).length;
        if (numOfSearchableLogsTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
            throw new ModuleStartException("Size of searchableLogsTags[" + numOfSearchableLogsTags
                                               + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
                                               + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
                                               + "]. Potential out of bound in the runtime.");
        }

        MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
                                                   .provider()
                                                   .getService(MetricsCreator.class);
        HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        h2Client.registerChecker(healthChecker);
        try {h2Client.connect();

            H2TableInstaller installer = new H2TableInstaller(h2Client, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag());
            getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
        } catch (StorageException e) {throw new ModuleStartException(e.getMessage(), e);
        }
    }

    @Override
    public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {}

    @Override
    public String[] requiredModules() {return new String[] {CoreModule.NAME};
    }
}

总结

Skywalking 提供的模块机制是十分低劣的设计,在工作中,如果有多个 N1 的场景,是能够借鉴它的设计的。

参考文档

  • Backend setup
  • Configuration Vocabulary

    分享并记录所学所见

正文完
 0