本文出处shenyifengtk.github.io 转载请阐明
概念
Span
Span 是分布式跟踪零碎中一个重要且罕用的概念. 可从 Google Dapper Paper 和 OpenTracing 学习更多与 Span 相干的常识.
SkyWalking 从 2017 年开始反对 OpenTracing 和 OpenTracing-Java API, 咱们的 Span 概念与论文和 OpenTracing 相似. 咱们也扩大了 Span.
Span 有三种类型
1.1 EntrySpan
EntrySpan 代表服务提供者, 也是服务器端的端点. 作为一个 APM 零碎, 咱们的指标是应用服务器. 所以简直所有的服务和 MQ-消费者 都是 EntrySpan。能够了解一个过程解决第一个span就是EntrySpan,意思为entiry span 进入服务span。
1.2 LocalSpan
LocalSpan 示意一般的 Java 办法, 它与近程服务无关, 也不是 MQ 生产者/消费者, 也不是服务(例如 HTTP 服务)提供者/消费者。所有本地办法调用都是localSpan,包含异步线程调用,线程池提交工作都是。
1.3 ExitSpan
ExitSpan 代表一个服务客户端或MQ的生产者, 在 SkyWalking 的晚期命名为 LeafSpan
. 例如 通过 JDBC 拜访DB, 读取 Redis/Memcached 被归类为 ExitSpan.
上下文载体 (ContextCarrier)
为了实现分布式跟踪, 须要绑定跨过程的追踪, 并且上下文应该在整个过程中随之流传. 这就是 ContextCarrier 的职责.
以下是无关如何在 A -> B
分布式调用中应用 ContextCarrier 的步骤.
- 在客户端, 创立一个新的空的
ContextCarrier
. - 通过
ContextManager#createExitSpan
创立一个 ExitSpan 或者应用ContextManager#inject
来初始化ContextCarrier
. - 将
ContextCarrier
所有信息放到申请头 (如 HTTP HEAD), 附件(如 Dubbo RPC 框架), 或者音讯 (如 Kafka) 中,详情能够看官网给出跨过程传输协定sw8 - 通过服务调用, 将
ContextCarrier
传递到服务端. - 在服务端, 在对应组件的头部, 附件或音讯中获取
ContextCarrier
所有内容. - 通过
ContestManager#createEntrySpan
创立 EntrySpan 或者应用ContextManager#extract
将服务端和客户端的绑定.
让咱们通过 Apache HttpComponent client 插件和 Tomcat 7 服务器插件演示, 步骤如下:
- 客户端 Apache HttpComponent client 插件
span = ContextManager.createExitSpan("/span/operation/name", contextCarrier, "ip:port");CarrierItem next = contextCarrier.items();while (next.hasNext()) { next = next.next(); httpRequest.setHeader(next.getHeadKey(), next.getHeadValue());}
- 服务端 Tomcat 7 服务器插件
ContextCarrier contextCarrier = new ContextCarrier();CarrierItem next = contextCarrier.items();while (next.hasNext()) { next = next.next(); next.setHeadValue(request.getHeader(next.getHeadKey()));}span = ContextManager.createEntrySpan("/span/operation/name", contextCarrier);
上下文快照 (ContextSnapshot)
除了跨过程, 跨线程也是须要反对的, 例如异步线程(内存中的音讯队列)和批处理在 Java 中很常见, 跨过程和跨线程十分相似, 因为都是须要流传上下文. 惟一的区别是, 不须要跨线程序列化.
以下是无关跨线程流传的三个步骤:
- 应用
ContextManager#capture
办法获取 ContextSnapshot 对象. - 让子线程以任何形式, 通过办法参数或由现有参数携带来拜访 ContextSnapshot
- 在子线程中应用
ContextManager#continued
。
跨过程Span传输原理
public class CarrierItem implements Iterator<CarrierItem> { private String headKey; private String headValue; private CarrierItem next; public CarrierItem(String headKey, String headValue) { this(headKey, headValue, null); } public CarrierItem(String headKey, String headValue, CarrierItem next) { this.headKey = headKey; this.headValue = headValue; this.next = next; } public String getHeadKey() { return headKey; } public String getHeadValue() { return headValue; } public void setHeadValue(String headValue) { this.headValue = headValue; } @Override public boolean hasNext() { return next != null; } @Override public CarrierItem next() { return next; } @Override public void remove() { }}
CarrierItem 相似Map key value的数据接口,通过一个单向连贯将K/V连接起来。
看下 ContextCarrier.items()办法如何创立CarrierItem
public CarrierItem items() { //内置一个 sw8-x key SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null); //内置 sw8-correlation key SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem( correlationContext, sw8ExtensionCarrierItem); //内置 sw8 key SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem); return new CarrierItemHead(sw8CarrierItem); }
创立一个链接CarrierItemHead->SW8CarrierItem ->SW8CorrelationCarrierItem->SW8ExtensionCarrierItem
在看下下面tomcat7 遍历CarrierItem,调用key从http header获取值设置到对象内置值,这样就能够做到将上一个过程header 值设置到下一个过程里,在调用
ContextCarrier deserialize(String text, HeaderVersion version) { if (text == null) { return this; } if (HeaderVersion.v3.equals(version)) { String[] parts = text.split("-", 8); if (parts.length == 8) { try { // parts[0] is sample flag, always trace if header exists. this.traceId = Base64.decode2UTFString(parts[1]); this.traceSegmentId = Base64.decode2UTFString(parts[2]); this.spanId = Integer.parseInt(parts[3]); this.parentService = Base64.decode2UTFString(parts[4]); this.parentServiceInstance = Base64.decode2UTFString(parts[5]); this.parentEndpoint = Base64.decode2UTFString(parts[6]); this.addressUsedAtClient = Base64.decode2UTFString(parts[7]); } catch (IllegalArgumentException ignored) { } } } return this; }
这样刚刚new 进去ContextCarrier就能够从上一个调用者上继承所有的属性,新创建span就能够跟上一个span 关联起来了了。
开发插件
知识点
追踪的根本办法是拦挡 Java 办法, 应用字节码操作技术(byte-buddy)和 AOP 概念. SkyWalking 包装了字节码操作技术并追踪上下文的流传, 所以你只须要定义拦挡点(换句话说就是 Spring 的切面)。
ClassInstanceMethodsEnhancePluginDefine
定义了构造方法 Contructor 拦挡点和 instance method 实例办法拦挡点,次要有三个办法须要被重写
/** * 须要被拦挡Class * @return */ @Override protected ClassMatch enhanceClass() { return null; } /** * 结构器切点 * @return */ @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[0]; } /** * 办法切点 * @return InstanceMethodsInterceptPoint 外面会申明拦挡按个办法 */ @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[0]; }
ClassMatch 以下有四种办法示意如何去匹配指标类:
NameMatch.byName
, 通过类的全限定名(Fully Qualified Class Name, 即 包名 + . + 类名).ClassAnnotationMatch.byClassAnnotationMatch
, 依据指标类是否存在某些注解.MethodAnnotationMatchbyMethodAnnotationMatch
, 依据指标类的办法是否存在某些注解.HierarchyMatch.byHierarchyMatch
, 依据指标类的父类或接口
ClassStaticMethodsEnhancePluginDefine
定义了类办法 class 动态method 拦挡点。
public abstract class ClassStaticMethodsEnhancePluginDefine extends ClassEnhancePluginDefine { /** * 结构器切点 * @return null, means enhance no constructors. */ @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return null; } /** * 办法切点 * @return null, means enhance no instance methods. */ @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return null; }}
InstanceMethodsInterceptPoint
一般办法接口切点有哪些办法
public interface InstanceMethodsInterceptPoint { /** * class instance methods matcher. * 能够了解胜利对Class 那些办法进行加强 * ElementMatcher 是bytebuddy 类库一个办法匹配器,外面封装了各种办法匹配 * @return methods matcher */ ElementMatcher<MethodDescription> getMethodsMatcher(); /** * @return represents a class name, the class instance must instanceof InstanceMethodsAroundInterceptor. * 返回一个拦截器全类名,所有拦截器必须实现 InstanceMethodsAroundInterceptor 接口 */ String getMethodsInterceptor(); /** * 是否要笼罩原办法入参 * @return */ boolean isOverrideArgs();}
在看下拦截器有那些办法
/** * A interceptor, which intercept method's invocation. The target methods will be defined in {@link * ClassEnhancePluginDefine}'s subclass, most likely in {@link ClassInstanceMethodsEnhancePluginDefine} */public interface InstanceMethodsAroundInterceptor { /** * called before target method invocation. * 前置告诉 * @param result change this result, if you want to truncate the method. */ void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable; /** * called after target method invocation. Even method's invocation triggers an exception. * 后置告诉 * @param ret the method's original return value. May be null if the method triggers an exception. * @return the method's actual return value. */ Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable; /** * called when occur exception. * 异样告诉 * @param t the exception occur. */ void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t);}
开发Skywalking实战
我的项目maven环境配置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>tk.shenyifeng</groupId> <artifactId>skywalking-plugin</artifactId> <version>1.0-SNAPSHOT</version> <properties> <skywalking.version>8.10.0</skywalking.version> </properties> <dependencies> <dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-agent-core</artifactId> <version>${skywalking.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.skywalking</groupId> <artifactId>java-agent-util</artifactId> <version>${skywalking.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> <plugin> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> <createDependencyReducedPom>true</createDependencyReducedPom> <createSourcesJar>true</createSourcesJar> <shadeSourcesContent>true</shadeSourcesContent> <relocations> <relocation> <pattern>net.bytebuddy</pattern> <shadedPattern>org.apache.skywalking.apm.dependencies.net.bytebuddy</shadedPattern> </relocation> </relocations> </configuration> </execution> </executions> </plugin> </plugins> </build></project>
为了更有代表性一些,应用Skywalking官网开发的ES插件来做一个例子。为了兼容不同版本框架,Skywalking 官网应用witnessClasses,以后框架Jar存在这个Class就会工作是某个版本、同样witnessMethods当Class存在某个Method。
public class AdapterActionFutureInstrumentation extends ClassEnhancePluginDefine { @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[0]; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[] { new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named("actionGet"); //拦挡办法 } @Override public String getMethodsInterceptor() { //拦截器全类名 return "org.apache.skywalking.apm.plugin.elasticsearch.v7.interceptor.AdapterActionFutureActionGetMethodsInterceptor"; } @Override public boolean isOverrideArgs() { return false; } } }; } @Override public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() { return new StaticMethodsInterceptPoint[0]; } @Override protected ClassMatch enhanceClass() { //加强Class return byName("org.elasticsearch.action.support.AdapterActionFuture"); } @Override protected String[] witnessClasses() {//ES7 存在Class return new String[] {"org.elasticsearch.transport.TaskTransportChannel"}; } @Override protected List<WitnessMethod> witnessMethods() { //ES7 SearchHits 存在办法 return Collections.singletonList(new WitnessMethod( "org.elasticsearch.search.SearchHits", named("getTotalHits").and(takesArguments(0)).and(returns(named("org.apache.lucene.search.TotalHits"))) )); }}
创立一个给定类名的拦截器,实现InstanceMethodsAroundInterceptor
接口。创立一个EntrySpan
public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor { private static boolean IS_SERVLET_GET_STATUS_METHOD_EXIST; private static final String SERVLET_RESPONSE_CLASS = "javax.servlet.http.HttpServletResponse"; private static final String GET_STATUS_METHOD = "getStatus"; static { IS_SERVLET_GET_STATUS_METHOD_EXIST = MethodUtil.isMethodExist( TomcatInvokeInterceptor.class.getClassLoader(), SERVLET_RESPONSE_CLASS, GET_STATUS_METHOD); } /** * * The {@link TraceSegment#ref} of current trace segment will reference to the trace segment id of the previous * level if the serialized context is not null. * * @param result change this result, if you want to truncate the method. */ @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { Request request = (Request) allArguments[0]; ContextCarrier contextCarrier = new ContextCarrier(); CarrierItem next = contextCarrier.items(); //如果 HTTP 申请头中有合乎sw8 传输协定的申请头则 取出来设置到上下文ContextCarrier while (next.hasNext()) { next = next.next(); next.setHeadValue(request.getHeader(next.getHeadKey())); } String operationName = String.join(":", request.getMethod(), request.getRequestURI()); AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);//关联起来 Tags.URL.set(span, request.getRequestURL().toString()); //增加 span 参数 Tags.HTTP.METHOD.set(span, request.getMethod()); span.setComponent(ComponentsDefine.TOMCAT); SpanLayer.asHttp(span); if (TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS) { collectHttpParam(request, span); } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { Request request = (Request) allArguments[0]; HttpServletResponse response = (HttpServletResponse) allArguments[1]; AbstractSpan span = ContextManager.activeSpan(); if (IS_SERVLET_GET_STATUS_METHOD_EXIST && response.getStatus() >= 400) { span.errorOccurred(); Tags.HTTP_RESPONSE_STATUS_CODE.set(span, response.getStatus()); } // Active HTTP parameter collection automatically in the profiling context. if (!TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS && span.isProfiling()) { collectHttpParam(request, span); } ContextManager.getRuntimeContext().remove(Constants.FORWARD_REQUEST_FLAG); ContextManager.stopSpan(); return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { AbstractSpan span = ContextManager.activeSpan(); span.log(t); } private void collectHttpParam(Request request, AbstractSpan span) { final Map<String, String[]> parameterMap = new HashMap<>(); final org.apache.coyote.Request coyoteRequest = request.getCoyoteRequest(); final Parameters parameters = coyoteRequest.getParameters(); for (final Enumeration<String> names = parameters.getParameterNames(); names.hasMoreElements(); ) { final String name = names.nextElement(); parameterMap.put(name, parameters.getParameterValues(name)); } if (!parameterMap.isEmpty()) { String tagValue = CollectionUtil.toString(parameterMap); tagValue = TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD) : tagValue; Tags.HTTP.PARAMS.set(span, tagValue); } }}
开发实现拦截器后,肯定要在类门路上增加skywalking-plugin.def
文件,将开发后的全类名增加到配置。
xxxName = tk.shenyifeng.skywalking.plugin.RepladInstrumentation
如果jar 外面没有这个文件,插件不会被Skywalking加载的。
最初将打包的jar 放到Skywalking的plugin或者activations目录就能够了。
xml配置插件
<?xml version="1.0" encoding="UTF-8"?><enhanced> <class class_name="test.apache.skywalking.apm.testcase.customize.service.TestService1"> <method method="staticMethod()" operation_name="/is_static_method" static="true"></method> <method method="staticMethod(java.lang.String,int.class,java.util.Map,java.util.List,[Ljava.lang.Object;)" operation_name="/is_static_method_args" static="true"> <operation_name_suffix>arg[0]</operation_name_suffix> <operation_name_suffix>arg[1]</operation_name_suffix> <operation_name_suffix>arg[3].[0]</operation_name_suffix> <tag key="tag_1">arg[2].['k1']</tag> <tag key="tag_2">arg[4].[1]</tag> <log key="log_1">arg[4].[2]</log> </method> <method method="method()" static="false"></method> <method method="method(java.lang.String,int.class)" operation_name="/method_2" static="false"> <operation_name_suffix>arg[0]</operation_name_suffix> <tag key="tag_1">arg[0]</tag> <log key="log_1">arg[1]</log> </method> <method method="method(test.apache.skywalking.apm.testcase.customize.model.Model0,java.lang.String,int.class)" operation_name="/method_3" static="false"> <operation_name_suffix>arg[0].id</operation_name_suffix> <operation_name_suffix>arg[0].model1.name</operation_name_suffix> <operation_name_suffix>arg[0].model1.getId()</operation_name_suffix> <tag key="tag_os">arg[0].os.[1]</tag> <log key="log_map">arg[0].getM().['k1']</log> </method> <method method="retString(java.lang.String)" operation_name="/retString" static="false"> <tag key="tag_ret">returnedObj</tag> <log key="log_map">returnedObj</log> </method> <method method="retModel0(test.apache.skywalking.apm.testcase.customize.model.Model0)" operation_name="/retModel0" static="false"> <tag key="tag_ret">returnedObj.model1.id</tag> <log key="log_map">returnedObj.model1.getId()</log> </method> </class> </enhanced>
通过xml配置能够省去编写Java代码,打包jar步骤。
xml规定
配置 | 阐明 |
---|---|
class_name | 须要被加强Class |
method | 须要被加强Method,反对参数定义 |
operation_name | 操作名称 |
operation_name_suffix | 操作后缀,用于生成动静operation_name |
tag | 将在local span中增加一个tag。key的值须要在XML节点上示意 |
log | 将在local span中增加一个log。key的值须要在XML节点上示意 |
arg[n] | 示意输出的参数值。比方args[0]示意第一个参数 |
.[n] | 当正在被解析的对象是Array或List,你能够用这个表达式失去对应index上的对象 |
.['key'] | 当正在被解析的对象是Map, 你能够用这个表达式失去map的key |
在配置文件agent.config中增加配置:
plugin.customize.enhance_file=customize_enhance.xml的绝对路径
援用材料
https://www.itmuch.com/skywal...
https://skyapm.github.io/docu...