关于java:Skywalking-插件开发

26次阅读

共计 15412 个字符,预计需要花费 39 分钟才能阅读完成。

本文出处 shenyifengtk.github.io 转载请阐明

概念

Span

Span 是分布式跟踪零碎中一个重要且罕用的概念. 可从 Google Dapper Paper 和 OpenTracing 学习更多与 Span 相干的常识.

SkyWalking 从 2017 年开始反对 OpenTracing 和 OpenTracing-Java API, 咱们的 Span 概念与论文和 OpenTracing 相似. 咱们也扩大了 Span.

Span 有三种类型

1.1 EntrySpan

EntrySpan 代表服务提供者, 也是服务器端的端点. 作为一个 APM 零碎, 咱们的指标是应用服务器. 所以简直所有的服务和 MQ- 消费者 都是 EntrySpan。能够了解一个过程解决第一个 span 就是 EntrySpan,意思为 entiry span 进入服务 span。

1.2 LocalSpan

LocalSpan 示意一般的 Java 办法, 它与近程服务无关, 也不是 MQ 生产者 / 消费者, 也不是服务(例如 HTTP 服务)提供者 / 消费者。所有本地办法调用都是 localSpan, 包含异步线程调用,线程池提交工作都是。

1.3 ExitSpan

ExitSpan 代表一个服务客户端或 MQ 的生产者, 在 SkyWalking 的晚期命名为 LeafSpan. 例如 通过 JDBC 拜访 DB, 读取 Redis/Memcached 被归类为 ExitSpan.

上下文载体 (ContextCarrier)

为了实现分布式跟踪, 须要绑定跨过程的追踪, 并且上下文应该在整个过程中随之流传. 这就是 ContextCarrier 的职责.

以下是无关如何在 A -> B 分布式调用中应用 ContextCarrier 的步骤.

  1. 在客户端, 创立一个新的空的 ContextCarrier.
  2. 通过 ContextManager#createExitSpan 创立一个 ExitSpan 或者应用 ContextManager#inject 来初始化 ContextCarrier.
  3. 将 ContextCarrier 所有信息放到申请头 (如 HTTP HEAD), 附件(如 Dubbo RPC 框架), 或者音讯 (如 Kafka) 中, 详情能够看官网给出跨过程传输协定 sw8
  4. 通过服务调用, 将 ContextCarrier 传递到服务端.
  5. 在服务端, 在对应组件的头部, 附件或音讯中获取 ContextCarrier 所有内容.
  6. 通过 ContestManager#createEntrySpan 创立 EntrySpan 或者应用 ContextManager#extract 将服务端和客户端的绑定.

让咱们通过 Apache HttpComponent client 插件和 Tomcat 7 服务器插件演示, 步骤如下:

  1. 客户端 Apache HttpComponent client 插件
span = ContextManager.createExitSpan("/span/operation/name", contextCarrier, "ip:port");
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {next = next.next();
    httpRequest.setHeader(next.getHeadKey(), next.getHeadValue());
}
  1. 服务端 Tomcat 7 服务器插件
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {next = next.next();
    next.setHeadValue(request.getHeader(next.getHeadKey()));
}

span = ContextManager.createEntrySpan("/span/operation/name", contextCarrier);

上下文快照 (ContextSnapshot)

除了跨过程, 跨线程也是须要反对的, 例如异步线程(内存中的音讯队列)和批处理在 Java 中很常见, 跨过程和跨线程十分相似, 因为都是须要流传上下文. 惟一的区别是, 不须要跨线程序列化.

以下是无关跨线程流传的三个步骤:

  1. 应用 ContextManager#capture 办法获取 ContextSnapshot 对象.
  2. 让子线程以任何形式, 通过办法参数或由现有参数携带来拜访 ContextSnapshot
  3. 在子线程中应用 ContextManager#continued

跨过程 Span 传输原理

public class CarrierItem implements Iterator<CarrierItem> {
    private String headKey;
    private String headValue;
    private CarrierItem next;

    public CarrierItem(String headKey, String headValue) {this(headKey, headValue, null);
    }

    public CarrierItem(String headKey, String headValue, CarrierItem next) {
        this.headKey = headKey;
        this.headValue = headValue;
        this.next = next;
    }

    public String getHeadKey() {return headKey;}

    public String getHeadValue() {return headValue;}

    public void setHeadValue(String headValue) {this.headValue = headValue;}

    @Override
    public boolean hasNext() {return next != null;}

    @Override
    public CarrierItem next() {return next;}

    @Override
    public void remove() {}
}

CarrierItem 相似 Map key value 的数据接口,通过一个单向连贯将 K / V 连接起来。
看下 ContextCarrier.items()办法如何创立 CarrierItem

    public CarrierItem items() {
       // 内置一个 sw8-x key
        SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null); 
       // 内置  sw8-correlation key
        SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(correlationContext, sw8ExtensionCarrierItem);
       // 内置 sw8 key 
        SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);
        return new CarrierItemHead(sw8CarrierItem);
    }

创立一个链接 CarrierItemHead->SW8CarrierItem ->SW8CorrelationCarrierItem->SW8ExtensionCarrierItem
在看下下面 tomcat7 遍历 CarrierItem,调用 key 从 http header 获取值设置到对象内置值,这样就能够做到将上一个过程 header 值设置到下一个过程里,在调用

    ContextCarrier deserialize(String text, HeaderVersion version) {if (text == null) {return this;}
        if (HeaderVersion.v3.equals(version)) {String[] parts = text.split("-", 8);
            if (parts.length == 8) {
                try {// parts[0] is sample flag, always trace if header exists.
                    this.traceId = Base64.decode2UTFString(parts[1]);
                    this.traceSegmentId = Base64.decode2UTFString(parts[2]);
                    this.spanId = Integer.parseInt(parts[3]);
                    this.parentService = Base64.decode2UTFString(parts[4]);
                    this.parentServiceInstance = Base64.decode2UTFString(parts[5]);
                    this.parentEndpoint = Base64.decode2UTFString(parts[6]);
                    this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);
                } catch (IllegalArgumentException ignored) {}}
        }
        return this;
    }

这样刚刚 new 进去 ContextCarrier 就能够从上一个调用者上继承所有的属性,新创建 span 就能够跟上一个 span 关联起来了了。

开发插件

知识点

追踪的根本办法是拦挡 Java 办法, 应用字节码操作技术(byte-buddy)和 AOP 概念. SkyWalking 包装了字节码操作技术并追踪上下文的流传, 所以你只须要定义拦挡点(换句话说就是 Spring 的切面)。

ClassInstanceMethodsEnhancePluginDefine 定义了构造方法 Contructor 拦挡点和 instance method 实例办法拦挡点,次要有三个办法须要被重写

     /**
     * 须要被拦挡 Class
     * @return
     */
    @Override
    protected ClassMatch enhanceClass() {return null;}

    /**
     * 结构器切点
     * @return
     */
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {return new ConstructorInterceptPoint[0];
    }

    /**
     * 办法切点
     * @return InstanceMethodsInterceptPoint 外面会申明拦挡按个办法
     */
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {return new InstanceMethodsInterceptPoint[0];
    }

ClassMatch 以下有四种办法示意如何去匹配指标类:

  • NameMatch.byName, 通过类的全限定名(Fully Qualified Class Name, 即 包名 + . + 类名).
  • ClassAnnotationMatch.byClassAnnotationMatch, 依据指标类是否存在某些注解.
  • MethodAnnotationMatchbyMethodAnnotationMatch, 依据指标类的办法是否存在某些注解.
  • HierarchyMatch.byHierarchyMatch, 依据指标类的父类或接口

ClassStaticMethodsEnhancePluginDefine 定义了类办法 class 动态 method 拦挡点。

public abstract class ClassStaticMethodsEnhancePluginDefine extends ClassEnhancePluginDefine {

    /**
     * 结构器切点
     * @return null, means enhance no constructors.
     */
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {return null;}

    /**
     * 办法切点
     * @return null, means enhance no instance methods.
     */
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {return null;}
}

InstanceMethodsInterceptPoint 一般办法接口切点有哪些办法

public interface InstanceMethodsInterceptPoint {
    /**
     * class instance methods matcher.
     *  能够了解胜利对 Class 那些办法进行加强
     *  ElementMatcher 是 bytebuddy 类库一个办法匹配器,外面封装了各种办法匹配
     * @return methods matcher
     */
    ElementMatcher<MethodDescription> getMethodsMatcher();

    /**
     * @return represents a class name, the class instance must instanceof InstanceMethodsAroundInterceptor.
     *  返回一个拦截器全类名,所有拦截器必须实现    InstanceMethodsAroundInterceptor 接口
     */
    String getMethodsInterceptor();

    /**
     *  是否要笼罩原办法入参
     * @return
     */
    boolean isOverrideArgs();}

在看下拦截器有那些办法

/**
 * A interceptor, which intercept method's invocation. The target methods will be defined in {@link
 * ClassEnhancePluginDefine}'s subclass, most likely in {@link ClassInstanceMethodsEnhancePluginDefine}
 */
public interface InstanceMethodsAroundInterceptor {
    /**
     * called before target method invocation.
     * 前置告诉
     * @param result change this result, if you want to truncate the method.
     */
    void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable;

    /**
     * called after target method invocation. Even method's invocation triggers an exception.
     * 后置告诉
     * @param ret the method's original return value. May be null if the method triggers an exception.
     * @return the method's actual return value.
     */
    Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable;

    /**
     * called when occur exception.
     * 异样告诉
     * @param t the exception occur.
     */
    void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t);
}

开发 Skywalking 实战

我的项目 maven 环境配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tk.shenyifeng</groupId>
    <artifactId>skywalking-plugin</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <skywalking.version>8.10.0</skywalking.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.skywalking</groupId>
            <artifactId>apm-agent-core</artifactId>
            <version>${skywalking.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.skywalking</groupId>
            <artifactId>java-agent-util</artifactId>
            <version>${skywalking.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>false</shadedArtifactAttached>
                            <createDependencyReducedPom>true</createDependencyReducedPom>
                            <createSourcesJar>true</createSourcesJar>
                            <shadeSourcesContent>true</shadeSourcesContent>
                            <relocations>
                                <relocation>
                                    <pattern>net.bytebuddy</pattern>
                                    <shadedPattern>org.apache.skywalking.apm.dependencies.net.bytebuddy</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

为了更有代表性一些,应用 Skywalking 官网开发的 ES 插件来做一个例子。为了兼容不同版本框架,Skywalking 官网应用 witnessClasses,以后框架 Jar 存在这个 Class 就会工作是某个版本、同样 witnessMethods 当 Class 存在某个 Method。

public class AdapterActionFutureInstrumentation extends ClassEnhancePluginDefine {

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {return new ConstructorInterceptPoint[0];
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {return new InstanceMethodsInterceptPoint[] {new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {return named("actionGet"); // 拦挡办法
                }

                @Override
                public String getMethodsInterceptor() {  // 拦截器全类名
                    return "org.apache.skywalking.apm.plugin.elasticsearch.v7.interceptor.AdapterActionFutureActionGetMethodsInterceptor";
                }

                @Override
                public boolean isOverrideArgs() {return false;}
            }
        };
    }

    @Override
    public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {return new StaticMethodsInterceptPoint[0];
    }

    @Override
    protected ClassMatch enhanceClass() { // 加强 Class
        return byName("org.elasticsearch.action.support.AdapterActionFuture");
    }

    @Override
    protected String[] witnessClasses() {//ES7 存在 Class
        return new String[] {"org.elasticsearch.transport.TaskTransportChannel"};
    }

    @Override
    protected List<WitnessMethod> witnessMethods() { //ES7 SearchHits 存在办法
        return Collections.singletonList(new WitnessMethod(
            "org.elasticsearch.search.SearchHits",
          named("getTotalHits").and(takesArguments(0)).and(returns(named("org.apache.lucene.search.TotalHits")))
        ));
    }
}

创立一个给定类名的拦截器,实现 InstanceMethodsAroundInterceptor 接口。创立一个 EntrySpan

public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor {

    private static boolean IS_SERVLET_GET_STATUS_METHOD_EXIST;
    private static final String SERVLET_RESPONSE_CLASS = "javax.servlet.http.HttpServletResponse";
    private static final String GET_STATUS_METHOD = "getStatus";

    static {
        IS_SERVLET_GET_STATUS_METHOD_EXIST = MethodUtil.isMethodExist(TomcatInvokeInterceptor.class.getClassLoader(), SERVLET_RESPONSE_CLASS, GET_STATUS_METHOD);
    }

    /**
     * * The {@link TraceSegment#ref} of current trace segment will reference to the trace segment id of the previous
     * level if the serialized context is not null.
     *
     * @param result change this result, if you want to truncate the method.
     */
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {Request request = (Request) allArguments[0];
        ContextCarrier contextCarrier = new ContextCarrier();

        CarrierItem next = contextCarrier.items();
       // 如果 HTTP 申请头中有合乎 sw8 传输协定的申请头则 取出来设置到上下文 ContextCarrier
        while (next.hasNext()) {next = next.next();
            next.setHeadValue(request.getHeader(next.getHeadKey()));
        }
        String operationName =  String.join(":", request.getMethod(), request.getRequestURI());
        AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);// 关联起来
        Tags.URL.set(span, request.getRequestURL().toString()); // 增加 span 参数
        Tags.HTTP.METHOD.set(span, request.getMethod());
        span.setComponent(ComponentsDefine.TOMCAT);
        SpanLayer.asHttp(span);

        if (TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS) {collectHttpParam(request, span);
        }
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {Request request = (Request) allArguments[0];
        HttpServletResponse response = (HttpServletResponse) allArguments[1];

        AbstractSpan span = ContextManager.activeSpan();
        if (IS_SERVLET_GET_STATUS_METHOD_EXIST && response.getStatus() >= 400) {span.errorOccurred();
            Tags.HTTP_RESPONSE_STATUS_CODE.set(span, response.getStatus());
        }
        // Active HTTP parameter collection automatically in the profiling context.
        if (!TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS && span.isProfiling()) {collectHttpParam(request, span);
        }
        ContextManager.getRuntimeContext().remove(Constants.FORWARD_REQUEST_FLAG);
        ContextManager.stopSpan();
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {AbstractSpan span = ContextManager.activeSpan();
        span.log(t);
    }

    private void collectHttpParam(Request request, AbstractSpan span) {final Map<String, String[]> parameterMap = new HashMap<>();
        final org.apache.coyote.Request coyoteRequest = request.getCoyoteRequest();
        final Parameters parameters = coyoteRequest.getParameters();
        for (final Enumeration<String> names = parameters.getParameterNames(); names.hasMoreElements();) {final String name = names.nextElement();
            parameterMap.put(name, parameters.getParameterValues(name));
        }

        if (!parameterMap.isEmpty()) {String tagValue = CollectionUtil.toString(parameterMap);
            tagValue = TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD > 0 ?
                StringUtil.cut(tagValue, TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD) :
                tagValue;
            Tags.HTTP.PARAMS.set(span, tagValue);
        }
    }
}

开发实现拦截器后,肯定要在类门路上增加 skywalking-plugin.def 文件,将开发后的全类名增加到配置。

xxxName = tk.shenyifeng.skywalking.plugin.RepladInstrumentation

如果 jar 外面没有这个文件,插件不会被 Skywalking 加载的。
最初将打包的 jar 放到 Skywalking 的 plugin 或者 activations 目录就能够了。

xml 配置插件

<?xml version="1.0" encoding="UTF-8"?>
<enhanced>
    <class class_name="test.apache.skywalking.apm.testcase.customize.service.TestService1">
        <method method="staticMethod()" operation_name="/is_static_method" static="true"></method>
        <method method="staticMethod(java.lang.String,int.class,java.util.Map,java.util.List,[Ljava.lang.Object;)"
                operation_name="/is_static_method_args" static="true">
            <operation_name_suffix>arg[0]</operation_name_suffix>
            <operation_name_suffix>arg[1]</operation_name_suffix>
            <operation_name_suffix>arg[3].[0]</operation_name_suffix>
            <tag key="tag_1">arg[2].['k1']</tag>
            <tag key="tag_2">arg[4].[1]</tag>
            <log key="log_1">arg[4].[2]</log>
        </method>
        <method method="method()" static="false"></method>
        <method method="method(java.lang.String,int.class)" operation_name="/method_2" static="false">
            <operation_name_suffix>arg[0]</operation_name_suffix>
            <tag key="tag_1">arg[0]</tag>
            <log key="log_1">arg[1]</log>
        </method>
        <method
            method="method(test.apache.skywalking.apm.testcase.customize.model.Model0,java.lang.String,int.class)"
            operation_name="/method_3" static="false">
            <operation_name_suffix>arg[0].id</operation_name_suffix>
            <operation_name_suffix>arg[0].model1.name</operation_name_suffix>
            <operation_name_suffix>arg[0].model1.getId()</operation_name_suffix>
            <tag key="tag_os">arg[0].os.[1]</tag>
            <log key="log_map">arg[0].getM().['k1']</log>
        </method>
        <method method="retString(java.lang.String)" operation_name="/retString" static="false">
            <tag key="tag_ret">returnedObj</tag>
            <log key="log_map">returnedObj</log>
        </method>
        <method method="retModel0(test.apache.skywalking.apm.testcase.customize.model.Model0)"
          operation_name="/retModel0" static="false">
            <tag key="tag_ret">returnedObj.model1.id</tag>
            <log key="log_map">returnedObj.model1.getId()</log>
        </method>
    </class>
    
</enhanced>

通过 xml 配置能够省去编写 Java 代码,打包 jar 步骤。
xml 规定

配置 阐明
class_name 须要被加强 Class
method 须要被加强 Method, 反对参数定义
operation_name 操作名称
operation_name_suffix 操作后缀,用于生成动静 operation_name
tag 将在 local span 中增加一个 tag。key 的值须要在 XML 节点上示意
log 将在 local span 中增加一个 log。key 的值须要在 XML 节点上示意
arg[n] 示意输出的参数值。比方 args[0]示意第一个参数
.[n] 当正在被解析的对象是 Array 或 List,你能够用这个表达式失去对应 index 上的对象
.[‘key’] 当正在被解析的对象是 Map, 你能够用这个表达式失去 map 的 key

在配置文件 agent.config 中增加配置:

plugin.customize.enhance_file=customize_enhance.xml 的绝对路径


援用材料
https://www.itmuch.com/skywal…
https://skyapm.github.io/docu…

正文完
 0