本文出处 shenyifengtk.github.io 转载请阐明
概念
Span
Span 是分布式跟踪零碎中一个重要且罕用的概念. 可从 Google Dapper Paper 和 OpenTracing 学习更多与 Span 相干的常识.
SkyWalking 从 2017 年开始反对 OpenTracing 和 OpenTracing-Java API, 咱们的 Span 概念与论文和 OpenTracing 相似. 咱们也扩大了 Span.
Span 有三种类型
1.1 EntrySpan
EntrySpan 代表服务提供者, 也是服务器端的端点. 作为一个 APM 零碎, 咱们的指标是应用服务器. 所以简直所有的服务和 MQ- 消费者 都是 EntrySpan。能够了解一个过程解决第一个 span 就是 EntrySpan,意思为 entiry span 进入服务 span。
1.2 LocalSpan
LocalSpan 示意一般的 Java 办法, 它与近程服务无关, 也不是 MQ 生产者 / 消费者, 也不是服务(例如 HTTP 服务)提供者 / 消费者。所有本地办法调用都是 localSpan, 包含异步线程调用,线程池提交工作都是。
1.3 ExitSpan
ExitSpan 代表一个服务客户端或 MQ 的生产者, 在 SkyWalking 的晚期命名为 LeafSpan
. 例如 通过 JDBC 拜访 DB, 读取 Redis/Memcached 被归类为 ExitSpan.
上下文载体 (ContextCarrier)
为了实现分布式跟踪, 须要绑定跨过程的追踪, 并且上下文应该在整个过程中随之流传. 这就是 ContextCarrier 的职责.
以下是无关如何在 A -> B
分布式调用中应用 ContextCarrier 的步骤.
- 在客户端, 创立一个新的空的
ContextCarrier
. - 通过
ContextManager#createExitSpan
创立一个 ExitSpan 或者应用ContextManager#inject
来初始化ContextCarrier
. - 将
ContextCarrier
所有信息放到申请头 (如 HTTP HEAD), 附件(如 Dubbo RPC 框架), 或者音讯 (如 Kafka) 中, 详情能够看官网给出跨过程传输协定 sw8 - 通过服务调用, 将
ContextCarrier
传递到服务端. - 在服务端, 在对应组件的头部, 附件或音讯中获取
ContextCarrier
所有内容. - 通过
ContestManager#createEntrySpan
创立 EntrySpan 或者应用ContextManager#extract
将服务端和客户端的绑定.
让咱们通过 Apache HttpComponent client 插件和 Tomcat 7 服务器插件演示, 步骤如下:
- 客户端 Apache HttpComponent client 插件
span = ContextManager.createExitSpan("/span/operation/name", contextCarrier, "ip:port");
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {next = next.next();
httpRequest.setHeader(next.getHeadKey(), next.getHeadValue());
}
- 服务端 Tomcat 7 服务器插件
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {next = next.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
span = ContextManager.createEntrySpan("/span/operation/name", contextCarrier);
上下文快照 (ContextSnapshot)
除了跨过程, 跨线程也是须要反对的, 例如异步线程(内存中的音讯队列)和批处理在 Java 中很常见, 跨过程和跨线程十分相似, 因为都是须要流传上下文. 惟一的区别是, 不须要跨线程序列化.
以下是无关跨线程流传的三个步骤:
- 应用
ContextManager#capture
办法获取 ContextSnapshot 对象. - 让子线程以任何形式, 通过办法参数或由现有参数携带来拜访 ContextSnapshot
- 在子线程中应用
ContextManager#continued
。
跨过程 Span 传输原理
public class CarrierItem implements Iterator<CarrierItem> {
private String headKey;
private String headValue;
private CarrierItem next;
public CarrierItem(String headKey, String headValue) {this(headKey, headValue, null);
}
public CarrierItem(String headKey, String headValue, CarrierItem next) {
this.headKey = headKey;
this.headValue = headValue;
this.next = next;
}
public String getHeadKey() {return headKey;}
public String getHeadValue() {return headValue;}
public void setHeadValue(String headValue) {this.headValue = headValue;}
@Override
public boolean hasNext() {return next != null;}
@Override
public CarrierItem next() {return next;}
@Override
public void remove() {}
}
CarrierItem 相似 Map key value 的数据接口,通过一个单向连贯将 K / V 连接起来。
看下 ContextCarrier.items()办法如何创立 CarrierItem
public CarrierItem items() {
// 内置一个 sw8-x key
SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);
// 内置 sw8-correlation key
SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(correlationContext, sw8ExtensionCarrierItem);
// 内置 sw8 key
SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);
return new CarrierItemHead(sw8CarrierItem);
}
创立一个链接 CarrierItemHead->SW8CarrierItem ->SW8CorrelationCarrierItem->SW8ExtensionCarrierItem
在看下下面 tomcat7 遍历 CarrierItem,调用 key 从 http header 获取值设置到对象内置值,这样就能够做到将上一个过程 header 值设置到下一个过程里,在调用
ContextCarrier deserialize(String text, HeaderVersion version) {if (text == null) {return this;}
if (HeaderVersion.v3.equals(version)) {String[] parts = text.split("-", 8);
if (parts.length == 8) {
try {// parts[0] is sample flag, always trace if header exists.
this.traceId = Base64.decode2UTFString(parts[1]);
this.traceSegmentId = Base64.decode2UTFString(parts[2]);
this.spanId = Integer.parseInt(parts[3]);
this.parentService = Base64.decode2UTFString(parts[4]);
this.parentServiceInstance = Base64.decode2UTFString(parts[5]);
this.parentEndpoint = Base64.decode2UTFString(parts[6]);
this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);
} catch (IllegalArgumentException ignored) {}}
}
return this;
}
这样刚刚 new 进去 ContextCarrier 就能够从上一个调用者上继承所有的属性,新创建 span 就能够跟上一个 span 关联起来了了。
开发插件
知识点
追踪的根本办法是拦挡 Java 办法, 应用字节码操作技术(byte-buddy)和 AOP 概念. SkyWalking 包装了字节码操作技术并追踪上下文的流传, 所以你只须要定义拦挡点(换句话说就是 Spring 的切面)。
ClassInstanceMethodsEnhancePluginDefine
定义了构造方法 Contructor 拦挡点和 instance method 实例办法拦挡点,次要有三个办法须要被重写
/**
* 须要被拦挡 Class
* @return
*/
@Override
protected ClassMatch enhanceClass() {return null;}
/**
* 结构器切点
* @return
*/
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {return new ConstructorInterceptPoint[0];
}
/**
* 办法切点
* @return InstanceMethodsInterceptPoint 外面会申明拦挡按个办法
*/
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {return new InstanceMethodsInterceptPoint[0];
}
ClassMatch 以下有四种办法示意如何去匹配指标类:
NameMatch.byName
, 通过类的全限定名(Fully Qualified Class Name, 即 包名 + . + 类名).ClassAnnotationMatch.byClassAnnotationMatch
, 依据指标类是否存在某些注解.MethodAnnotationMatchbyMethodAnnotationMatch
, 依据指标类的办法是否存在某些注解.HierarchyMatch.byHierarchyMatch
, 依据指标类的父类或接口
ClassStaticMethodsEnhancePluginDefine
定义了类办法 class 动态 method 拦挡点。
public abstract class ClassStaticMethodsEnhancePluginDefine extends ClassEnhancePluginDefine {
/**
* 结构器切点
* @return null, means enhance no constructors.
*/
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {return null;}
/**
* 办法切点
* @return null, means enhance no instance methods.
*/
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {return null;}
}
InstanceMethodsInterceptPoint
一般办法接口切点有哪些办法
public interface InstanceMethodsInterceptPoint {
/**
* class instance methods matcher.
* 能够了解胜利对 Class 那些办法进行加强
* ElementMatcher 是 bytebuddy 类库一个办法匹配器,外面封装了各种办法匹配
* @return methods matcher
*/
ElementMatcher<MethodDescription> getMethodsMatcher();
/**
* @return represents a class name, the class instance must instanceof InstanceMethodsAroundInterceptor.
* 返回一个拦截器全类名,所有拦截器必须实现 InstanceMethodsAroundInterceptor 接口
*/
String getMethodsInterceptor();
/**
* 是否要笼罩原办法入参
* @return
*/
boolean isOverrideArgs();}
在看下拦截器有那些办法
/**
* A interceptor, which intercept method's invocation. The target methods will be defined in {@link
* ClassEnhancePluginDefine}'s subclass, most likely in {@link ClassInstanceMethodsEnhancePluginDefine}
*/
public interface InstanceMethodsAroundInterceptor {
/**
* called before target method invocation.
* 前置告诉
* @param result change this result, if you want to truncate the method.
*/
void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable;
/**
* called after target method invocation. Even method's invocation triggers an exception.
* 后置告诉
* @param ret the method's original return value. May be null if the method triggers an exception.
* @return the method's actual return value.
*/
Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable;
/**
* called when occur exception.
* 异样告诉
* @param t the exception occur.
*/
void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t);
}
开发 Skywalking 实战
我的项目 maven 环境配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tk.shenyifeng</groupId>
<artifactId>skywalking-plugin</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<skywalking.version>8.10.0</skywalking.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>${skywalking.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>java-agent-util</artifactId>
<version>${skywalking.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<relocations>
<relocation>
<pattern>net.bytebuddy</pattern>
<shadedPattern>org.apache.skywalking.apm.dependencies.net.bytebuddy</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
为了更有代表性一些,应用 Skywalking 官网开发的 ES 插件来做一个例子。为了兼容不同版本框架,Skywalking 官网应用 witnessClasses,以后框架 Jar 存在这个 Class 就会工作是某个版本、同样 witnessMethods 当 Class 存在某个 Method。
public class AdapterActionFutureInstrumentation extends ClassEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {return new InstanceMethodsInterceptPoint[] {new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {return named("actionGet"); // 拦挡办法
}
@Override
public String getMethodsInterceptor() { // 拦截器全类名
return "org.apache.skywalking.apm.plugin.elasticsearch.v7.interceptor.AdapterActionFutureActionGetMethodsInterceptor";
}
@Override
public boolean isOverrideArgs() {return false;}
}
};
}
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {return new StaticMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() { // 加强 Class
return byName("org.elasticsearch.action.support.AdapterActionFuture");
}
@Override
protected String[] witnessClasses() {//ES7 存在 Class
return new String[] {"org.elasticsearch.transport.TaskTransportChannel"};
}
@Override
protected List<WitnessMethod> witnessMethods() { //ES7 SearchHits 存在办法
return Collections.singletonList(new WitnessMethod(
"org.elasticsearch.search.SearchHits",
named("getTotalHits").and(takesArguments(0)).and(returns(named("org.apache.lucene.search.TotalHits")))
));
}
}
创立一个给定类名的拦截器,实现 InstanceMethodsAroundInterceptor
接口。创立一个 EntrySpan
public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor {
private static boolean IS_SERVLET_GET_STATUS_METHOD_EXIST;
private static final String SERVLET_RESPONSE_CLASS = "javax.servlet.http.HttpServletResponse";
private static final String GET_STATUS_METHOD = "getStatus";
static {
IS_SERVLET_GET_STATUS_METHOD_EXIST = MethodUtil.isMethodExist(TomcatInvokeInterceptor.class.getClassLoader(), SERVLET_RESPONSE_CLASS, GET_STATUS_METHOD);
}
/**
* * The {@link TraceSegment#ref} of current trace segment will reference to the trace segment id of the previous
* level if the serialized context is not null.
*
* @param result change this result, if you want to truncate the method.
*/
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {Request request = (Request) allArguments[0];
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
// 如果 HTTP 申请头中有合乎 sw8 传输协定的申请头则 取出来设置到上下文 ContextCarrier
while (next.hasNext()) {next = next.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
String operationName = String.join(":", request.getMethod(), request.getRequestURI());
AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);// 关联起来
Tags.URL.set(span, request.getRequestURL().toString()); // 增加 span 参数
Tags.HTTP.METHOD.set(span, request.getMethod());
span.setComponent(ComponentsDefine.TOMCAT);
SpanLayer.asHttp(span);
if (TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS) {collectHttpParam(request, span);
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {Request request = (Request) allArguments[0];
HttpServletResponse response = (HttpServletResponse) allArguments[1];
AbstractSpan span = ContextManager.activeSpan();
if (IS_SERVLET_GET_STATUS_METHOD_EXIST && response.getStatus() >= 400) {span.errorOccurred();
Tags.HTTP_RESPONSE_STATUS_CODE.set(span, response.getStatus());
}
// Active HTTP parameter collection automatically in the profiling context.
if (!TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS && span.isProfiling()) {collectHttpParam(request, span);
}
ContextManager.getRuntimeContext().remove(Constants.FORWARD_REQUEST_FLAG);
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {AbstractSpan span = ContextManager.activeSpan();
span.log(t);
}
private void collectHttpParam(Request request, AbstractSpan span) {final Map<String, String[]> parameterMap = new HashMap<>();
final org.apache.coyote.Request coyoteRequest = request.getCoyoteRequest();
final Parameters parameters = coyoteRequest.getParameters();
for (final Enumeration<String> names = parameters.getParameterNames(); names.hasMoreElements();) {final String name = names.nextElement();
parameterMap.put(name, parameters.getParameterValues(name));
}
if (!parameterMap.isEmpty()) {String tagValue = CollectionUtil.toString(parameterMap);
tagValue = TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD > 0 ?
StringUtil.cut(tagValue, TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD) :
tagValue;
Tags.HTTP.PARAMS.set(span, tagValue);
}
}
}
开发实现拦截器后,肯定要在类门路上增加 skywalking-plugin.def
文件,将开发后的全类名增加到配置。
xxxName = tk.shenyifeng.skywalking.plugin.RepladInstrumentation
如果 jar 外面没有这个文件,插件不会被 Skywalking 加载的。
最初将打包的 jar 放到 Skywalking 的 plugin 或者 activations 目录就能够了。
xml 配置插件
<?xml version="1.0" encoding="UTF-8"?>
<enhanced>
<class class_name="test.apache.skywalking.apm.testcase.customize.service.TestService1">
<method method="staticMethod()" operation_name="/is_static_method" static="true"></method>
<method method="staticMethod(java.lang.String,int.class,java.util.Map,java.util.List,[Ljava.lang.Object;)"
operation_name="/is_static_method_args" static="true">
<operation_name_suffix>arg[0]</operation_name_suffix>
<operation_name_suffix>arg[1]</operation_name_suffix>
<operation_name_suffix>arg[3].[0]</operation_name_suffix>
<tag key="tag_1">arg[2].['k1']</tag>
<tag key="tag_2">arg[4].[1]</tag>
<log key="log_1">arg[4].[2]</log>
</method>
<method method="method()" static="false"></method>
<method method="method(java.lang.String,int.class)" operation_name="/method_2" static="false">
<operation_name_suffix>arg[0]</operation_name_suffix>
<tag key="tag_1">arg[0]</tag>
<log key="log_1">arg[1]</log>
</method>
<method
method="method(test.apache.skywalking.apm.testcase.customize.model.Model0,java.lang.String,int.class)"
operation_name="/method_3" static="false">
<operation_name_suffix>arg[0].id</operation_name_suffix>
<operation_name_suffix>arg[0].model1.name</operation_name_suffix>
<operation_name_suffix>arg[0].model1.getId()</operation_name_suffix>
<tag key="tag_os">arg[0].os.[1]</tag>
<log key="log_map">arg[0].getM().['k1']</log>
</method>
<method method="retString(java.lang.String)" operation_name="/retString" static="false">
<tag key="tag_ret">returnedObj</tag>
<log key="log_map">returnedObj</log>
</method>
<method method="retModel0(test.apache.skywalking.apm.testcase.customize.model.Model0)"
operation_name="/retModel0" static="false">
<tag key="tag_ret">returnedObj.model1.id</tag>
<log key="log_map">returnedObj.model1.getId()</log>
</method>
</class>
</enhanced>
通过 xml 配置能够省去编写 Java 代码,打包 jar 步骤。
xml 规定
配置 | 阐明 |
---|---|
class_name | 须要被加强 Class |
method | 须要被加强 Method, 反对参数定义 |
operation_name | 操作名称 |
operation_name_suffix | 操作后缀,用于生成动静 operation_name |
tag | 将在 local span 中增加一个 tag。key 的值须要在 XML 节点上示意 |
log | 将在 local span 中增加一个 log。key 的值须要在 XML 节点上示意 |
arg[n] | 示意输出的参数值。比方 args[0]示意第一个参数 |
.[n] | 当正在被解析的对象是 Array 或 List,你能够用这个表达式失去对应 index 上的对象 |
.[‘key’] | 当正在被解析的对象是 Map, 你能够用这个表达式失去 map 的 key |
在配置文件 agent.config 中增加配置:
plugin.customize.enhance_file=customize_enhance.xml 的绝对路径
援用材料
https://www.itmuch.com/skywal…
https://skyapm.github.io/docu…