以查问 Metrics
信息案例来剖析 Skywalking
查问协定
根本概述
Skywalking
查问协定默认基于 GraphQL
,如果有须要也能够自定义扩大,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule
的查问模块即可。
截取 Skywalking UI
发送的申请
- 申请门路
POST http://127.0.0.1:8080/graphql
- 申请体
{ "query": "query queryData($condition: MetricsCondition!, $duration: Duration!) {\n readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\n label\n values {\n values {value}\n }\n }}", "variables": { "duration": { "start": "2021-07-03 1320", "end": "2021-07-03 1321", "step": "MINUTE" }, "condition": { "name": "instance_jvm_thread_runnable_thread_count", "entity": { "scope": "ServiceInstance", "serviceName": "business-zone::projectA", "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113", "normal": true } } }}
- 响应
{ "data": { "readMetricsValues": { "values": { "values": [ { "value": 22 }, { "value": 22 } ] } } }}
在 Skywalking
源码中找到对应 GraphQL
定义
关上 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
目录,应用申请体中的模板关键字 readMetricsValues
搜寻
在 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2.graphqls
中找到对应的定义
extend type Query { # etc... # Read time-series values in the duration of required metrics readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues! # etc...}
输出参数定义
input MetricsCondition { # Metrics name, which should be defined in OAL script # Such as: # Endpoint_avg = from(Endpoint.latency).avg() # Then, `Endpoint_avg` name: String! # Follow entity definition description. entity: Entity!}input Entity { # 1. scope=All, no name is required. # 2. scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName # 3. Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation # serviceName/serviceInstanceName/endpointName is/are the source(s) # destServiceName/destServiceInstanceName/destEndpointName is/are destination(s) # set necessary names of sources and destinations. scope: Scope! serviceName: String # Normal service is the service having installed agent or metrics reported directly. # Unnormal service is conjectural service, usually detected by the agent. normal: Boolean serviceInstanceName: String endpointName: String destServiceName: String # Normal service is the service having installed agent or metrics reported directly. # Unnormal service is conjectural service, usually detected by the agent. destNormal: Boolean destServiceInstanceName: String destEndpointName: String}# The Duration defines the start and end time for each query operation.# Fields: `start` and `end`# represents the time span. And each of them matches the step.# ref https://www.ietf.org/rfc/rfc3339.txt# The time formats are# `SECOND` step: yyyy-MM-dd HHmmss# `MINUTE` step: yyyy-MM-dd HHmm# `HOUR` step: yyyy-MM-dd HH# `DAY` step: yyyy-MM-dd# `MONTH` step: yyyy-MM# Field: `step`# represents the accurate time point.# e.g.# if step==HOUR , start=2017-11-08 09, end=2017-11-08 19# then# metrics from the following time points expected# 2017-11-08 9:00 -> 2017-11-08 19:00# there are 11 time points (hours) in the time span.input Duration { start: String! end: String! step: Step!}enum Step { DAY HOUR MINUTE SECOND}
返回后果定义
type MetricsValues { # Could be null if no label assigned in the query condition label: String # Values of this label value. values: IntValues}type IntValues { values: [KVInt!]!}type KVInt { id: ID! # This is the value, the caller must understand the Unit. # Such as: # 1. If ask for cpm metric, the unit and result should be count. # 2. If ask for response time (p99 or avg), the unit should be millisecond. value: Long!}
应用 GraphQL
IDEA
插件验证 Skywalking UI
的申请
应用“ GraphQL
在 Skywalking
中的利用”一节中的形式,模拟“截取 Skywalking UI 发送的申请”一节中前端发送的申请
- 申请模板
query queryData($condition: MetricsCondition!, $duration: Duration!) { readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) { label values { values { id value }} }}
- 申请参数
{ "duration": { "start": "2021-07-03 1400", "end": "2021-07-03 1401", "step": "MINUTE" }, "condition": { "name": "instance_jvm_thread_runnable_thread_count", "entity": { "scope": "ServiceInstance", "serviceName": "business-zone::projectA", "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113", "normal": true } }}
- 响应后果
{ "data": { "readMetricsValues": { "values": { "values": [ { "id": "202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=", "value": 22 }, { "id": "202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=", "value": 22 } ] } } }}
PS:如果不应用模板的形式,写查问语句是会有代码提醒的
query queryData { readMetricsValues( duration: {start: "2021-07-03 1400",end: "2021-07-03 1401", step: MINUTE}, condition: { name: "instance_jvm_thread_runnable_thread_count", entity: { scope: ServiceInstance, serviceName: "business-zone::projectA", serviceInstanceName: "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113", normal: true } } ) { label values{ values{ id value }} }}
如何将 GraphQL Schema
文件加载到程序中
搜寻 metrics-v2.graphqls
,在 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
找到加载代码
// 初始化GraphQL引擎 @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { GraphQLSchema schema = SchemaParser.newParser() // etc... .file("query-protocol/metrics-v2.graphqls") .resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com.coxautodev.graphql.tools.GraphQLQueryResolver 接口实现类 // etc... .build() .makeExecutableSchema(); this.graphQL = GraphQL.newGraphQL(schema).build(); }
在 org.apache.skywalking.oap.query.graphql.resolver.MetricsQuery
类中,找到 readMetricsValues
办法
/** * Read time-series values in the duration of required metrics */ public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException { if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) { final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints(); MetricsValues values = new MetricsValues(); pointOfTimes.forEach(pointOfTime -> { String id = pointOfTime.id( condition.getEntity().isValid() ? condition.getEntity().buildId() : "ILLEGAL_ENTITY" ); final KVInt kvInt = new KVInt(); kvInt.setId(id); kvInt.setValue(0); values.getValues().addKVInt(kvInt); }); return values; } return getMetricsQueryService().readMetricsValues(condition, duration); } private MetricsQueryService getMetricsQueryService() { if (metricsQueryService == null) { this.metricsQueryService = moduleManager.find(CoreModule.NAME) .provider() .getService(MetricsQueryService.class); } return metricsQueryService; }
org.apache.skywalking.oap.server.core.query.MetricsQueryService#readMetricsValues
/** * Read time-series values in the duration of required metrics */ public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException { return getMetricQueryDAO().readMetricsValues( condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration); } private IMetricsQueryDAO getMetricQueryDAO() { if (metricQueryDAO == null) { metricQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetricsQueryDAO.class); } return metricQueryDAO; }
查看Extend storage文档, IMetricsQueryDAO
为指标查问数据拜访对象
# Implement all DAOs# Here is the list of all DAO interfaces in storageIServiceInventoryCacheDAOIServiceInstanceInventoryCacheDAOIEndpointInventoryCacheDAOINetworkAddressInventoryCacheDAOIBatchDAOStorageDAOIRegisterLockDAOITopologyQueryDAOIMetricsQueryDAOITraceQueryDAOIMetadataQueryDAOIAggregationQueryDAOIAlarmQueryDAOIHistoryDeleteDAOIMetricsDAOIRecordDAOIRegisterDAOILogQueryDAOITopNRecordsQueryDAOIBrowserLogQueryDAO
通过类图,能够看出 IMetricsQueryDAO
实现类有 ES
、 ES7
、 InfluxDB
、 SQL
四种
如何将 GraphQL
引擎注册到 Jetty
服务
// 注册GraphQL查问处理器至Jetty服务 @Override public void start() throws ServiceNotProvidedException, ModuleStartException { JettyHandlerRegister service = getManager().find(CoreModule.NAME) .provider() .getService(JettyHandlerRegister.class); service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL)); }
通过剖析 GraphQLQueryProvider
该类,发现就是 QueryModule
(查问模块)的 Provider
(提供)类
由此,也验证了在“根本概述”一节的说法:
Skywalking
查问协定默认基于GraphQL
,如果有须要也能够自定义扩大,提供一个实现了org.apache.skywalking.oap.server.core.query.QueryModule
的查问模块即可。
@Override public String name() { return "graphql"; } @Override public Class<? extends ModuleDefine> module() { return QueryModule.class; }
package org.apache.skywalking.oap.query.graphql;import com.google.gson.Gson;import com.google.gson.JsonArray;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import com.google.gson.reflect.TypeToken;import graphql.ExecutionInput;import graphql.ExecutionResult;import graphql.GraphQL;import graphql.GraphQLError;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.lang.reflect.Type;import java.util.List;import java.util.Map;import javax.servlet.http.HttpServletRequest;import lombok.RequiredArgsConstructor;import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler;import org.apache.skywalking.oap.server.library.util.CollectionUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;@RequiredArgsConstructorpublic class GraphQLQueryHandler extends JettyJsonHandler { private static final Logger LOGGER = LoggerFactory.getLogger(GraphQLQueryHandler.class); private static final String QUERY = "query"; private static final String VARIABLES = "variables"; private static final String DATA = "data"; private static final String ERRORS = "errors"; private static final String MESSAGE = "message"; private final Gson gson = new Gson(); private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() { }.getType(); private final String path; private final GraphQL graphQL; @Override public String pathSpec() { return path; } @Override protected JsonElement doGet(HttpServletRequest req) { throw new UnsupportedOperationException("GraphQL only supports POST method"); } @Override protected JsonElement doPost(HttpServletRequest req) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream())); String line; StringBuilder request = new StringBuilder(); while ((line = reader.readLine()) != null) { request.append(line); } JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class); return execute(requestJson.get(QUERY) .getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType)); } private JsonObject execute(String request, Map<String, Object> variables) { try { ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(request) .variables(variables) .build(); // 应用GraphQL引擎获取查问后果 ExecutionResult executionResult = graphQL.execute(executionInput); LOGGER.debug("Execution result is {}", executionResult); // 封装返回后果 Object data = executionResult.getData(); List<GraphQLError> errors = executionResult.getErrors(); JsonObject jsonObject = new JsonObject(); if (data != null) { jsonObject.add(DATA, gson.fromJson(gson.toJson(data), JsonObject.class)); } if (CollectionUtils.isNotEmpty(errors)) { JsonArray errorArray = new JsonArray(); errors.forEach(error -> { JsonObject errorJson = new JsonObject(); errorJson.addProperty(MESSAGE, error.getMessage()); errorArray.add(errorJson); }); jsonObject.add(ERRORS, errorArray); } return jsonObject; } catch (final Throwable e) { LOGGER.error(e.getMessage(), e); JsonObject jsonObject = new JsonObject(); JsonArray errorArray = new JsonArray(); JsonObject errorJson = new JsonObject(); errorJson.addProperty(MESSAGE, e.getMessage()); errorArray.add(errorJson); jsonObject.add(ERRORS, errorArray); return jsonObject; } }}
Webapp
网关转发 GraphQL
申请至 OAP
v8.6.0
及之前,网关都是 zuul
, v8.7.0
及之后替换成了 Spring Cloud Gateway
。因为这块不是这篇文章的重点,这里不再赘述
总结
Skywalking
的查问协定默认应用通用性很强的 GraphQL
实现,客户端能够通过 GraphQL
协定很不便的选取本人须要的数据。
对应 Skywalking
这种模式绝对固定、变更不频繁的查问需要来说,还是挺适宜的。
参考文档
Extend storage
分享并记录所学所见