关于java:Skywalking11Skywalking查询协议案例分析

9次阅读

共计 11040 个字符,预计需要花费 28 分钟才能阅读完成。

以查问 Metrics 信息案例来剖析 Skywalking 查问协定

根本概述

Skywalking 查问协定默认基于 GraphQL,如果有须要也能够自定义扩大,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查问模块即可。

截取 Skywalking UI 发送的申请

  • 申请门路
POST http://127.0.0.1:8080/graphql
  • 申请体
{"query": "query queryData($condition: MetricsCondition!, $duration: Duration!) {\n  readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\n    label\n    values {\n      values {value}\n    }\n  }}",
  "variables": {
    "duration": {
      "start": "2021-07-03 1320",
      "end": "2021-07-03 1321",
      "step": "MINUTE"
    },
    "condition": {
      "name": "instance_jvm_thread_runnable_thread_count",
      "entity": {
        "scope": "ServiceInstance",
        "serviceName": "business-zone::projectA",
        "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
        "normal": true
      }
    }
  }
}
  • 响应
{
  "data": {
    "readMetricsValues": {
      "values": {
        "values": [
          {"value": 22},
          {"value": 22}
        ]
      }
    }
  }
}

Skywalking 源码中找到对应 GraphQL 定义

关上 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol 目录,应用申请体中的模板关键字 readMetricsValues 搜寻
oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2.graphqls 中找到对应的定义

extend type Query {
    # etc...
    # Read time-series values in the duration of required metrics
    readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues!
    # etc...
}

输出参数定义

input MetricsCondition {
    # Metrics name, which should be defined in OAL script
    # Such as:
    # Endpoint_avg = from(Endpoint.latency).avg()
    # Then, `Endpoint_avg`
    name: String!
    # Follow entity definition description.
    entity: Entity!
}

input Entity {
    # 1. scope=All, no name is required.
    # 2. scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName
    # 3. Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation
    #    serviceName/serviceInstanceName/endpointName is/are the source(s)
    #    destServiceName/destServiceInstanceName/destEndpointName is/are destination(s)
    #    set necessary names of sources and destinations.
    scope: Scope!
    serviceName: String
    # Normal service is the service having installed agent or metrics reported directly.
    # Unnormal service is conjectural service, usually detected by the agent.
    normal: Boolean
    serviceInstanceName: String
    endpointName: String
    destServiceName: String
    # Normal service is the service having installed agent or metrics reported directly.
    # Unnormal service is conjectural service, usually detected by the agent.
    destNormal: Boolean
    destServiceInstanceName: String
    destEndpointName: String
}

# The Duration defines the start and end time for each query operation.
# Fields: `start` and `end`
#   represents the time span. And each of them matches the step.
#   ref https://www.ietf.org/rfc/rfc3339.txt
#   The time formats are
#       `SECOND` step: yyyy-MM-dd HHmmss
#       `MINUTE` step: yyyy-MM-dd HHmm
#       `HOUR` step: yyyy-MM-dd HH
#       `DAY` step: yyyy-MM-dd
#       `MONTH` step: yyyy-MM
# Field: `step`
#   represents the accurate time point.
# e.g.
#   if step==HOUR , start=2017-11-08 09, end=2017-11-08 19
#   then
#       metrics from the following time points expected
#       2017-11-08 9:00 -> 2017-11-08 19:00
#       there are 11 time points (hours) in the time span.
input Duration {
    start: String!
    end: String!
    step: Step!
}

enum Step {
    DAY
    HOUR
    MINUTE
    SECOND
}

返回后果定义

type MetricsValues {
    # Could be null if no label assigned in the query condition
    label: String
    # Values of this label value.
    values: IntValues
}

type IntValues {values: [KVInt!]!
}

type KVInt {
    id: ID!
    # This is the value, the caller must understand the Unit.
    # Such as:
    # 1. If ask for cpm metric, the unit and result should be count.
    # 2. If ask for response time (p99 or avg), the unit should be millisecond.
    value: Long!
}

应用 GraphQL IDEA 插件验证 Skywalking UI 的申请

应用“GraphQL 在 Skywalking 中的利用”一节中的形式,模拟“截取 Skywalking UI 发送的申请”一节中前端发送的申请

  • 申请模板
query queryData($condition: MetricsCondition!, $duration: Duration!) {readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) {label values { values { id value}}
    }
}
  • 申请参数
{
  "duration": {
    "start": "2021-07-03 1400",
    "end": "2021-07-03 1401", 
    "step": "MINUTE"
  },
  "condition": {
    "name": "instance_jvm_thread_runnable_thread_count",
    "entity": {
      "scope": "ServiceInstance",
      "serviceName": "business-zone::projectA",
      "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
      "normal": true
    }
  }
}
  • 响应后果
{
  "data": {
    "readMetricsValues": {
      "values": {
        "values": [
          {
            "id": "202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
            "value": 22
          },
          {
            "id": "202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
            "value": 22
          }
        ]
      }
    }
  }
}

PS:如果不应用模板的形式,写查问语句是会有代码提醒的

query queryData {
    readMetricsValues(duration: {start: "2021-07-03 1400",end: "2021-07-03 1401", step: MINUTE},
        condition: {
            name: "instance_jvm_thread_runnable_thread_count",
            entity: {
                scope: ServiceInstance,
                serviceName: "business-zone::projectA",
                serviceInstanceName: "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
                normal: true
            }
        }
    ) {label values{ values{ id value}}
    }
}

如何将 GraphQL Schema 文件加载到程序中

搜寻 metrics-v2.graphqls,在 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java 找到加载代码

    // 初始化 GraphQL 引擎
    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {GraphQLSchema schema = SchemaParser.newParser()
                                           // etc...
                                           .file("query-protocol/metrics-v2.graphqls")
                                           .resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com.coxautodev.graphql.tools.GraphQLQueryResolver 接口实现类
                                           // etc...
                                           .build()
                                           .makeExecutableSchema();
        this.graphQL = GraphQL.newGraphQL(schema).build();}

org.apache.skywalking.oap.query.graphql.resolver.MetricsQuery 类中,找到 readMetricsValues 办法

    /**
     * Read time-series values in the duration of required metrics
     */
    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
            MetricsValues values = new MetricsValues();
            pointOfTimes.forEach(pointOfTime -> {
                String id = pointOfTime.id(condition.getEntity().isValid() ? condition.getEntity().buildId() : "ILLEGAL_ENTITY");
                final KVInt kvInt = new KVInt();
                kvInt.setId(id);
                kvInt.setValue(0);
                values.getValues().addKVInt(kvInt);
            });
            return values;
        }
        return getMetricsQueryService().readMetricsValues(condition, duration);
    }

    private MetricsQueryService getMetricsQueryService() {if (metricsQueryService == null) {this.metricsQueryService = moduleManager.find(CoreModule.NAME)
                                                    .provider()
                                                    .getService(MetricsQueryService.class);
        }
        return metricsQueryService;
    }

org.apache.skywalking.oap.server.core.query.MetricsQueryService#readMetricsValues

    /**
     * Read time-series values in the duration of required metrics
     */
    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {return getMetricQueryDAO().readMetricsValues(condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
    }

    private IMetricsQueryDAO getMetricQueryDAO() {if (metricQueryDAO == null) {metricQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetricsQueryDAO.class);
        }
        return metricQueryDAO;
    }

查看 Extend storage 文档,IMetricsQueryDAO 为指标查问数据拜访对象

# Implement all DAOs
# Here is the list of all DAO interfaces in storage
IServiceInventoryCacheDAO
IServiceInstanceInventoryCacheDAO
IEndpointInventoryCacheDAO
INetworkAddressInventoryCacheDAO
IBatchDAO
StorageDAO
IRegisterLockDAO
ITopologyQueryDAO
IMetricsQueryDAO
ITraceQueryDAO
IMetadataQueryDAO
IAggregationQueryDAO
IAlarmQueryDAO
IHistoryDeleteDAO
IMetricsDAO
IRecordDAO
IRegisterDAO
ILogQueryDAO
ITopNRecordsQueryDAO
IBrowserLogQueryDAO

通过类图,能够看出 IMetricsQueryDAO 实现类有 ESES7InfluxDBSQL 四种

如何将 GraphQL 引擎注册到 Jetty 服务

    // 注册 GraphQL 查问处理器至 Jetty 服务
    @Override
    public void start() throws ServiceNotProvidedException, ModuleStartException {JettyHandlerRegister service = getManager().find(CoreModule.NAME)
                                                   .provider()
                                                   .getService(JettyHandlerRegister.class);
        service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL));
    }

通过剖析 GraphQLQueryProvider 该类,发现就是 QueryModule (查问模块)的 Provider (提供)类

由此,也验证了在“根本概述”一节的说法:

Skywalking 查问协定默认基于 GraphQL,如果有须要也能够自定义扩大,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查问模块即可。

    @Override
    public String name() {return "graphql";}

    @Override
    public Class<? extends ModuleDefine> module() {return QueryModule.class;}
package org.apache.skywalking.oap.query.graphql;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.GraphQLError;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiredArgsConstructor
public class GraphQLQueryHandler extends JettyJsonHandler {private static final Logger LOGGER = LoggerFactory.getLogger(GraphQLQueryHandler.class);

    private static final String QUERY = "query";
    private static final String VARIABLES = "variables";
    private static final String DATA = "data";
    private static final String ERRORS = "errors";
    private static final String MESSAGE = "message";

    private final Gson gson = new Gson();
    private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() {}.getType();

    private final String path;

    private final GraphQL graphQL;

    @Override
    public String pathSpec() {return path;}

    @Override
    protected JsonElement doGet(HttpServletRequest req) {throw new UnsupportedOperationException("GraphQL only supports POST method");
    }

    @Override
    protected JsonElement doPost(HttpServletRequest req) throws IOException {BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
        String line;
        StringBuilder request = new StringBuilder();
        while ((line = reader.readLine()) != null) {request.append(line);
        }

        JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);

        return execute(requestJson.get(QUERY)
                                  .getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType));
    }

    private JsonObject execute(String request, Map<String, Object> variables) {
        try {ExecutionInput executionInput = ExecutionInput.newExecutionInput()
                                                          .query(request)
                                                          .variables(variables)
                                                          .build();
            // 应用 GraphQL 引擎获取查问后果
            ExecutionResult executionResult = graphQL.execute(executionInput);
            LOGGER.debug("Execution result is {}", executionResult);
            // 封装返回后果
            Object data = executionResult.getData();
            List<GraphQLError> errors = executionResult.getErrors();
            JsonObject jsonObject = new JsonObject();
            if (data != null) {jsonObject.add(DATA, gson.fromJson(gson.toJson(data), JsonObject.class));
            }

            if (CollectionUtils.isNotEmpty(errors)) {JsonArray errorArray = new JsonArray();
                errors.forEach(error -> {JsonObject errorJson = new JsonObject();
                    errorJson.addProperty(MESSAGE, error.getMessage());
                    errorArray.add(errorJson);
                });
                jsonObject.add(ERRORS, errorArray);
            }
            return jsonObject;
        } catch (final Throwable e) {LOGGER.error(e.getMessage(), e);
            JsonObject jsonObject = new JsonObject();
            JsonArray errorArray = new JsonArray();
            JsonObject errorJson = new JsonObject();
            errorJson.addProperty(MESSAGE, e.getMessage());
            errorArray.add(errorJson);
            jsonObject.add(ERRORS, errorArray);
            return jsonObject;
        }
    }
}

Webapp 网关转发 GraphQL 申请至 OAP

v8.6.0 及之前,网关都是 zuulv8.7.0 及之后替换成了 Spring Cloud Gateway。因为这块不是这篇文章的重点,这里不再赘述

总结

Skywalking 的查问协定默认应用通用性很强的 GraphQL 实现,客户端能够通过 GraphQL 协定很不便的选取本人须要的数据。
对应 Skywalking 这种模式绝对固定、变更不频繁的查问需要来说,还是挺适宜的。

参考文档

  • Extend storage

    分享并记录所学所见

正文完
 0