OAL 如何通过动静生成的 Class 类,保留数据

前置工作

OAL 如何将动静生成的 SourceDispatcher 增加到 DispatcherManager

    // org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load    public void load(OALDefine define) throws ModuleStartException {        if (oalDefineSet.contains(define)) {            // each oal define will only be activated once            return;        }        try {            OALEngine engine = loadOALEngine(define);            // 设置Stream注解监听器,用来解决org.apache.skywalking.oap.server.core.analysis.Stream注解            StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager);            engine.setStreamListener(streamAnnotationListener);                        // org.apache.skywalking.oap.server.core.source.SourceReceiverImpl#getDispatcherDetectorListener            // 获取的就是org.apache.skywalking.oap.server.core.analysis.DispatcherManager对象            engine.setDispatcherListener(moduleManager.find(CoreModule.NAME)                                                      .provider()                                                      .getService(SourceReceiver.class)                                                      .getDispatcherDetectorListener());                        // 调用的就是 org.apache.skywalking.oal.rt.OALRuntime#start            engine.start(OALEngineLoaderService.class.getClassLoader());                        // 告诉所有的监听器            engine.notifyAllListeners();            oalDefineSet.add(define);        } catch (ReflectiveOperationException | OALCompileException e) {            throw new ModuleStartException(e.getMessage(), e);        }    }

org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load 办法做了如下操作:

  1. 设置 Stream 注解监听器,用来获取指标类的根本信息,并进行相应解决
@Stream(    name = "instance_jvm_class_loaded_class_count",    scopeId = 11000,    builder = InstanceJvmClassLoadedClassCountMetricsBuilder.class,    processor = MetricsStreamProcessor.class)public class InstanceJvmClassLoadedClassCountMetrics extends LongAvgMetrics implements WithMetadata {    // 省略}
  1. 通过模块管理器,先获取到 SourceReceiver 对象,借由此对象获取到 DispatcherManager 对象
public class SourceReceiverImpl implements SourceReceiver {    @Getter    private final DispatcherManager dispatcherManager;    @Override    public DispatcherDetectorListener getDispatcherDetectorListener() {        return getDispatcherManager();    }}
  1. 启动 OAL 引擎
  2. 告诉所有的监听器

org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners

    @Override    public void notifyAllListeners() throws ModuleStartException {        for (Class metricsClass : metricsClasses) {            try {                // 将动静生成的Metrics增加到MetricsStreamProcessor                streamAnnotationListener.notify(metricsClass);            } catch (StorageException e) {                throw new ModuleStartException(e.getMessage(), e);            }        }        for (Class dispatcherClass : dispatcherClasses) {            try {                // 增加动静生成的SourceDispatch至DispatcherManager                dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);            } catch (Exception e) {                throw new ModuleStartException(e.getMessage(), e);            }        }    }

org.apache.skywalking.oap.server.core.analysis.DispatcherManager#addIfAsSourceDispatcher

    @Override    public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException {        if (!aClass.isInterface() && !Modifier.isAbstract(            aClass.getModifiers()) && SourceDispatcher.class.isAssignableFrom(aClass)) {            Type[] genericInterfaces = aClass.getGenericInterfaces();            for (Type genericInterface : genericInterfaces) {                ParameterizedType anInterface = (ParameterizedType) genericInterface;                if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) {                    Type[] arguments = anInterface.getActualTypeArguments();                    if (arguments.length != 1) {                        throw new UnexpectedException("unexpected type argument number, class " + aClass.getName());                    }                    Type argument = arguments[0];                    Object source = ((Class) argument).newInstance();                    if (!Source.class.isAssignableFrom(source.getClass())) {                        throw new UnexpectedException(                            "unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");                    }                    Source dispatcherSource = (Source) source;                    SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance();                    int scopeId = dispatcherSource.scope();                                        // 应用scope做SourceDispatcher Map的key                    List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scopeId);                    if (dispatchers == null) {                        dispatchers = new ArrayList<>();                        this.dispatcherMap.put(scopeId, dispatchers);                    }                    // 增加                    dispatchers.add(dispatcher);                    LOGGER.info("Dispatcher {} is added into DefaultScopeDefine {}.", dispatcher.getClass()                                                                                                .getName(), scopeId);                }            }        }    }

OAL 如何将动静生成的 Metrics 增加到 MetricsStreamProcessor

与“ OAL  如何将动静生成的 SourceDispatcher  增加到 DispatcherManager ”流程基本一致,都是在 org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners 办法中解决的

    @Override    public void notifyAllListeners() throws ModuleStartException {        for (Class metricsClass : metricsClasses) {            try {                // 将动静生成的Metrics增加到MetricsStreamProcessor                streamAnnotationListener.notify(metricsClass);            } catch (StorageException e) {                throw new ModuleStartException(e.getMessage(), e);            }        }        for (Class dispatcherClass : dispatcherClasses) {            try {                // 增加动静生成的SourceDispatch至DispatcherManager                dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);            } catch (Exception e) {                throw new ModuleStartException(e.getMessage(), e);            }        }    }

org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener#notify

    @Override    public void notify(Class aClass) throws StorageException {        if (aClass.isAnnotationPresent(Stream.class)) {            Stream stream = (Stream) aClass.getAnnotation(Stream.class);            if (stream.processor().equals(RecordStreamProcessor.class)) {                RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);            } else if (stream.processor().equals(MetricsStreamProcessor.class)) {                // 因为所有的Metrics类上的@Stream注解的processor = MetricsStreamProcessor.class,所以只会走该分支                MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);            } else if (stream.processor().equals(TopNStreamProcessor.class)) {                TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);            } else if (stream.processor().equals(NoneStreamProcessor.class)) {                NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);            } else if (stream.processor().equals(ManagementStreamProcessor.class)) {                ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);            } else {                throw new UnexpectedException("Unknown stream processor.");            }        } else {            throw new UnexpectedException(                    "Stream annotation listener could only parse the class present stream annotation.");        }    }

org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create 中,通过一系列的解决,最初将 Worker (处理器)放入 map 中,期待后续被应用

    /**     * Create the workers and work flow for every metrics.     *     * @param moduleDefineHolder pointer of the module define.     * @param stream             definition of the metrics class.     * @param metricsClass       data type of the streaming calculation.     */    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) throws StorageException {        this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);    }    @SuppressWarnings("unchecked")    public void create(ModuleDefineHolder moduleDefineHolder,                       StreamDefinition stream,                       Class<? extends Metrics> metricsClass) throws StorageException {        if (DisableRegister.INSTANCE.include(stream.getName())) {            return;        }        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);        IMetricsDAO metricsDAO;        try {            // 获取@Stream注解上的builder类,并创立Metrics存储DAO对象            metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());        } catch (InstantiationException | IllegalAccessException e) {            throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);        }        ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);        DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)                                                                    .provider()                                                                    .getService(DownSamplingConfigService.class);        MetricsPersistentWorker hourPersistentWorker = null;        MetricsPersistentWorker dayPersistentWorker = null;        MetricsTransWorker transWorker = null;        final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class);        /**         * All metrics default are `supportDownSampling` and `insertAndUpdate`, unless it has explicit definition.         */        boolean supportDownSampling = true;        boolean supportUpdate = true;        if (metricsExtension != null) {            supportDownSampling = metricsExtension.supportDownSampling();            supportUpdate = metricsExtension.supportUpdate();        }        if (supportDownSampling) {            if (configService.shouldToHour()) {                Model model = modelSetter.add(                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false);                hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);            }            if (configService.shouldToDay()) {                Model model = modelSetter.add(                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false);                dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);            }            transWorker = new MetricsTransWorker(                moduleDefineHolder, hourPersistentWorker, dayPersistentWorker);        }        Model model = modelSetter.add(            metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false);        MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(            moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);        String remoteReceiverWorkerName = stream.getName() + "_rec";        IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)                                                                       .provider()                                                                       .getService(IWorkerInstanceSetter.class);        workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);        MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);        MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(            moduleDefineHolder, remoteWorker, stream.getName());        // private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();        // 将指标类的Class与MetricsAggregateWorker放入map中        // 当须要解决指标数据时,从map中获取即可        entryWorkers.put(metricsClass, aggregateWorker);    }

SourceReceiver 解决 Source 相干流程

在“从一个案例开始剖析 OAL 原理”一节,聊到了 oap server 将从 agent 收到的指标信息,发送至 SourceReceive
对应的坐标是:org.apache.skywalking.oap.server.analyzer.provider.jvm.JVMSourceDispatcher#sendToClassMetricProcess 

    private void sendToClassMetricProcess(String service,            String serviceId,            String serviceInstance,            String serviceInstanceId,            long timeBucket,            Class clazz) {        // 拼装Source对象        ServiceInstanceJVMClass serviceInstanceJVMClass = new ServiceInstanceJVMClass();        serviceInstanceJVMClass.setId(serviceInstanceId);        serviceInstanceJVMClass.setName(serviceInstance);        serviceInstanceJVMClass.setServiceId(serviceId);        serviceInstanceJVMClass.setServiceName(service);        serviceInstanceJVMClass.setLoadedClassCount(clazz.getLoadedClassCount());        serviceInstanceJVMClass.setUnloadedClassCount(clazz.getUnloadedClassCount());        serviceInstanceJVMClass.setTotalLoadedClassCount(clazz.getTotalLoadedClassCount());        serviceInstanceJVMClass.setTimeBucket(timeBucket);        // 将Source对象发送至SourceReceive进行解决        sourceReceiver.receive(serviceInstanceJVMClass);    }

SourceReceiver 的默认实现类 org.apache.skywalking.oap.server.core.source.SourceReceiverImpl ,将收集到的指标通过 org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward 进行散发

package org.apache.skywalking.oap.server.core.source;import java.io.IOException;import lombok.Getter;import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;public class SourceReceiverImpl implements SourceReceiver {    @Getter    private final DispatcherManager dispatcherManager;    public SourceReceiverImpl() {        this.dispatcherManager = new DispatcherManager();    }    @Override    public void receive(Source source) {        // 通过调配器管理器进行转发        dispatcherManager.forward(source);    }    @Override    public DispatcherDetectorListener getDispatcherDetectorListener() {        return getDispatcherManager();    }    public void scan() throws IOException, InstantiationException, IllegalAccessException {        dispatcherManager.scan();    }}
    // org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward    public void forward(Source source) {        if (source == null) {            return;        }        // 通过source的scope找到对应的调度器        List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());        /**         * Dispatcher is only generated by oal script analysis result.         * So these will/could be possible, the given source doesn't have the dispatcher,         * when the receiver is open, and oal script doesn't ask for analysis.         */        if (dispatchers != null) {            source.prepare();            // 调度器进行散发,OAL动静生成的调度器,也会在这进行散发            for (SourceDispatcher dispatcher : dispatchers) {                dispatcher.dispatch(source);            }        }    }

MetricsStreamProcessor 如何解决 SourceDispatcher 发送过去的指标数据

残缺代码请见“ OAL 如何动静生成 Class 类”下“案例”一节

org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher.ServiceInstanceJVMClassDispatcher#doInstanceJvmClassLoadedClassCount 发送数据至 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor

package org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher;import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMClass;import org.apache.skywalking.oap.server.core.source.Source;import org.apache.skywalking.oap.server.core.source.oal.rt.metrics.InstanceJvmClassLoadedClassCountMetrics;public class ServiceInstanceJVMClassDispatcher implements SourceDispatcher<ServiceInstanceJVMClass> {    private void doInstanceJvmClassLoadedClassCount(ServiceInstanceJVMClass var1) {        InstanceJvmClassLoadedClassCountMetrics var2 = new InstanceJvmClassLoadedClassCountMetrics();        var2.setTimeBucket(var1.getTimeBucket());        var2.setEntityId(var1.getEntityId());        var2.setServiceId(var1.getServiceId());        var2.combine(var1.getLoadedClassCount(), (long)1);        // 发送数据到指标流处理器        MetricsStreamProcessor.getInstance().in(var2);    }    public void dispatch(Source var1) {        ServiceInstanceJVMClass var2 = (ServiceInstanceJVMClass)var1;        this.doInstanceJvmClassLoadedClassCount(var2);    }    public ServiceInstanceJVMClassDispatcher() {    }}

org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#in 办法中,应用在 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create 中创立的 Worker 对象,保留数据

    public void in(Metrics metrics) {        MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass());        if (worker != null) {            worker.in(metrics);        }    }

PS:外部再细节一些的数据处理流程,相干的关键字有: DataCarrier 、 Worker 、 StorageModule  ,暂且不表,不是这篇文章的内容。

总结

Skywalking Metrics解决流程

参考文档

  • Extend storage
分享并记录所学所见