以查问 Metrics 信息案例来剖析 Skywalking 查问协定

根本概述

Skywalking 查问协定默认基于 GraphQL ,如果有须要也能够自定义扩大,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查问模块即可。

截取 Skywalking UI 发送的申请

  • 申请门路
POST http://127.0.0.1:8080/graphql
  • 申请体
{  "query": "query queryData($condition: MetricsCondition!, $duration: Duration!) {\n  readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\n    label\n    values {\n      values {value}\n    }\n  }}",  "variables": {    "duration": {      "start": "2021-07-03 1320",      "end": "2021-07-03 1321",      "step": "MINUTE"    },    "condition": {      "name": "instance_jvm_thread_runnable_thread_count",      "entity": {        "scope": "ServiceInstance",        "serviceName": "business-zone::projectA",        "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",        "normal": true      }    }  }}
  • 响应
{  "data": {    "readMetricsValues": {      "values": {        "values": [          {            "value": 22          },          {            "value": 22          }        ]      }    }  }}

Skywalking 源码中找到对应 GraphQL 定义

关上 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol 目录,应用申请体中的模板关键字 readMetricsValues 搜寻
oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2.graphqls 中找到对应的定义

extend type Query {    # etc...    # Read time-series values in the duration of required metrics    readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues!    # etc...}

输出参数定义

input MetricsCondition {    # Metrics name, which should be defined in OAL script    # Such as:    # Endpoint_avg = from(Endpoint.latency).avg()    # Then, `Endpoint_avg`    name: String!    # Follow entity definition description.    entity: Entity!}input Entity {    # 1. scope=All, no name is required.    # 2. scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName    # 3. Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation    #    serviceName/serviceInstanceName/endpointName is/are the source(s)    #    destServiceName/destServiceInstanceName/destEndpointName is/are destination(s)    #    set necessary names of sources and destinations.    scope: Scope!    serviceName: String    # Normal service is the service having installed agent or metrics reported directly.    # Unnormal service is conjectural service, usually detected by the agent.    normal: Boolean    serviceInstanceName: String    endpointName: String    destServiceName: String    # Normal service is the service having installed agent or metrics reported directly.    # Unnormal service is conjectural service, usually detected by the agent.    destNormal: Boolean    destServiceInstanceName: String    destEndpointName: String}# The Duration defines the start and end time for each query operation.# Fields: `start` and `end`#   represents the time span. And each of them matches the step.#   ref https://www.ietf.org/rfc/rfc3339.txt#   The time formats are#       `SECOND` step: yyyy-MM-dd HHmmss#       `MINUTE` step: yyyy-MM-dd HHmm#       `HOUR` step: yyyy-MM-dd HH#       `DAY` step: yyyy-MM-dd#       `MONTH` step: yyyy-MM# Field: `step`#   represents the accurate time point.# e.g.#   if step==HOUR , start=2017-11-08 09, end=2017-11-08 19#   then#       metrics from the following time points expected#       2017-11-08 9:00 -> 2017-11-08 19:00#       there are 11 time points (hours) in the time span.input Duration {    start: String!    end: String!    step: Step!}enum Step {    DAY    HOUR    MINUTE    SECOND}

返回后果定义

type MetricsValues {    # Could be null if no label assigned in the query condition    label: String    # Values of this label value.    values: IntValues}type IntValues {    values: [KVInt!]!}type KVInt {    id: ID!    # This is the value, the caller must understand the Unit.    # Such as:    # 1. If ask for cpm metric, the unit and result should be count.    # 2. If ask for response time (p99 or avg), the unit should be millisecond.    value: Long!}

应用 GraphQL IDEA 插件验证 Skywalking UI 的申请

应用“ GraphQL 在 Skywalking 中的利用”一节中的形式,模拟“截取 Skywalking UI 发送的申请”一节中前端发送的申请

  • 申请模板
query queryData($condition: MetricsCondition!, $duration: Duration!) {    readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) {        label values { values { id value }}    }}
  • 申请参数
{  "duration": {    "start": "2021-07-03 1400",    "end": "2021-07-03 1401",     "step": "MINUTE"  },  "condition": {    "name": "instance_jvm_thread_runnable_thread_count",    "entity": {      "scope": "ServiceInstance",      "serviceName": "business-zone::projectA",      "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",      "normal": true    }  }}
  • 响应后果
{  "data": {    "readMetricsValues": {      "values": {        "values": [          {            "id": "202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",            "value": 22          },          {            "id": "202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",            "value": 22          }        ]      }    }  }}

PS:如果不应用模板的形式,写查问语句是会有代码提醒的

query queryData {    readMetricsValues(        duration: {start: "2021-07-03 1400",end: "2021-07-03 1401", step: MINUTE},        condition: {            name: "instance_jvm_thread_runnable_thread_count",            entity: {                scope: ServiceInstance,                serviceName: "business-zone::projectA",                serviceInstanceName: "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",                normal: true            }        }    ) {        label values{ values{ id value }}    }}

如何将 GraphQL Schema 文件加载到程序中

搜寻 metrics-v2.graphqls ,在 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java 找到加载代码

    // 初始化GraphQL引擎    @Override    public void prepare() throws ServiceNotProvidedException, ModuleStartException {        GraphQLSchema schema = SchemaParser.newParser()                                           // etc...                                           .file("query-protocol/metrics-v2.graphqls")                                           .resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com.coxautodev.graphql.tools.GraphQLQueryResolver 接口实现类                                           // etc...                                           .build()                                           .makeExecutableSchema();        this.graphQL = GraphQL.newGraphQL(schema).build();    }

org.apache.skywalking.oap.query.graphql.resolver.MetricsQuery 类中,找到 readMetricsValues 办法

    /**     * Read time-series values in the duration of required metrics     */    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {        if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {            final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();            MetricsValues values = new MetricsValues();            pointOfTimes.forEach(pointOfTime -> {                String id = pointOfTime.id(                    condition.getEntity().isValid() ? condition.getEntity().buildId() : "ILLEGAL_ENTITY"                );                final KVInt kvInt = new KVInt();                kvInt.setId(id);                kvInt.setValue(0);                values.getValues().addKVInt(kvInt);            });            return values;        }        return getMetricsQueryService().readMetricsValues(condition, duration);    }    private MetricsQueryService getMetricsQueryService() {        if (metricsQueryService == null) {            this.metricsQueryService = moduleManager.find(CoreModule.NAME)                                                    .provider()                                                    .getService(MetricsQueryService.class);        }        return metricsQueryService;    }

org.apache.skywalking.oap.server.core.query.MetricsQueryService#readMetricsValues

    /**     * Read time-series values in the duration of required metrics     */    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {        return getMetricQueryDAO().readMetricsValues(            condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);    }    private IMetricsQueryDAO getMetricQueryDAO() {        if (metricQueryDAO == null) {            metricQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetricsQueryDAO.class);        }        return metricQueryDAO;    }

查看Extend storage文档, IMetricsQueryDAO 为指标查问数据拜访对象

# Implement all DAOs# Here is the list of all DAO interfaces in storageIServiceInventoryCacheDAOIServiceInstanceInventoryCacheDAOIEndpointInventoryCacheDAOINetworkAddressInventoryCacheDAOIBatchDAOStorageDAOIRegisterLockDAOITopologyQueryDAOIMetricsQueryDAOITraceQueryDAOIMetadataQueryDAOIAggregationQueryDAOIAlarmQueryDAOIHistoryDeleteDAOIMetricsDAOIRecordDAOIRegisterDAOILogQueryDAOITopNRecordsQueryDAOIBrowserLogQueryDAO

通过类图,能够看出 IMetricsQueryDAO 实现类有 ES 、 ES7 、 InfluxDB 、 SQL 四种

如何将 GraphQL 引擎注册到 Jetty 服务

    // 注册GraphQL查问处理器至Jetty服务    @Override    public void start() throws ServiceNotProvidedException, ModuleStartException {        JettyHandlerRegister service = getManager().find(CoreModule.NAME)                                                   .provider()                                                   .getService(JettyHandlerRegister.class);        service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL));    }

通过剖析 GraphQLQueryProvider 该类,发现就是 QueryModule (查问模块)的 Provider (提供)类

由此,也验证了在“根本概述”一节的说法:

Skywalking 查问协定默认基于 GraphQL ,如果有须要也能够自定义扩大,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查问模块即可。
    @Override    public String name() {        return "graphql";    }    @Override    public Class<? extends ModuleDefine> module() {        return QueryModule.class;    }
package org.apache.skywalking.oap.query.graphql;import com.google.gson.Gson;import com.google.gson.JsonArray;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import com.google.gson.reflect.TypeToken;import graphql.ExecutionInput;import graphql.ExecutionResult;import graphql.GraphQL;import graphql.GraphQLError;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.lang.reflect.Type;import java.util.List;import java.util.Map;import javax.servlet.http.HttpServletRequest;import lombok.RequiredArgsConstructor;import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler;import org.apache.skywalking.oap.server.library.util.CollectionUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;@RequiredArgsConstructorpublic class GraphQLQueryHandler extends JettyJsonHandler {    private static final Logger LOGGER = LoggerFactory.getLogger(GraphQLQueryHandler.class);    private static final String QUERY = "query";    private static final String VARIABLES = "variables";    private static final String DATA = "data";    private static final String ERRORS = "errors";    private static final String MESSAGE = "message";    private final Gson gson = new Gson();    private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() {    }.getType();    private final String path;    private final GraphQL graphQL;    @Override    public String pathSpec() {        return path;    }    @Override    protected JsonElement doGet(HttpServletRequest req) {        throw new UnsupportedOperationException("GraphQL only supports POST method");    }    @Override    protected JsonElement doPost(HttpServletRequest req) throws IOException {        BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));        String line;        StringBuilder request = new StringBuilder();        while ((line = reader.readLine()) != null) {            request.append(line);        }        JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);        return execute(requestJson.get(QUERY)                                  .getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType));    }    private JsonObject execute(String request, Map<String, Object> variables) {        try {            ExecutionInput executionInput = ExecutionInput.newExecutionInput()                                                          .query(request)                                                          .variables(variables)                                                          .build();            // 应用GraphQL引擎获取查问后果            ExecutionResult executionResult = graphQL.execute(executionInput);            LOGGER.debug("Execution result is {}", executionResult);            // 封装返回后果            Object data = executionResult.getData();            List<GraphQLError> errors = executionResult.getErrors();            JsonObject jsonObject = new JsonObject();            if (data != null) {                jsonObject.add(DATA, gson.fromJson(gson.toJson(data), JsonObject.class));            }            if (CollectionUtils.isNotEmpty(errors)) {                JsonArray errorArray = new JsonArray();                errors.forEach(error -> {                    JsonObject errorJson = new JsonObject();                    errorJson.addProperty(MESSAGE, error.getMessage());                    errorArray.add(errorJson);                });                jsonObject.add(ERRORS, errorArray);            }            return jsonObject;        } catch (final Throwable e) {            LOGGER.error(e.getMessage(), e);            JsonObject jsonObject = new JsonObject();            JsonArray errorArray = new JsonArray();            JsonObject errorJson = new JsonObject();            errorJson.addProperty(MESSAGE, e.getMessage());            errorArray.add(errorJson);            jsonObject.add(ERRORS, errorArray);            return jsonObject;        }    }}

Webapp 网关转发 GraphQL 申请至 OAP

v8.6.0 及之前,网关都是 zuul , v8.7.0 及之后替换成了 Spring Cloud Gateway 。因为这块不是这篇文章的重点,这里不再赘述

总结

Skywalking 的查问协定默认应用通用性很强的 GraphQL 实现,客户端能够通过 GraphQL 协定很不便的选取本人须要的数据。
对应 Skywalking 这种模式绝对固定、变更不频繁的查问需要来说,还是挺适宜的。

参考文档

  • Extend storage

    分享并记录所学所见