Skywalking 插件开发

概念

Span

Span 是分布式跟踪系统中一个重要且常用的概念. 可从 Google Dapper PaperOpenTracing 学习更多与 Span 相关的知识.

SkyWalking 从 2017 年开始支持 OpenTracing 和 OpenTracing-Java API, 我们的 Span 概念与论文和 OpenTracing 类似. 我们也扩展了 Span.

Span 有三种类型

1.1 EntrySpan

EntrySpan 代表服务提供者, 也是服务器端的端点. 作为一个 APM 系统, 我们的目标是应用服务器. 所以几乎所有的服务和 MQ-消费者 都是 EntrySpan。可以理解一个进程处理第一个span就是EntrySpan,意思为entiry span 进入服务span。

1.2 LocalSpan

LocalSpan 表示普通的 Java 方法, 它与远程服务无关, 也不是 MQ 生产者/消费者, 也不是服务(例如 HTTP 服务)提供者/消费者。所有本地方法调用都是localSpan,包括异步线程调用,线程池提交任务都是。

1.3 ExitSpan

ExitSpan 代表一个服务客户端或MQ的生产者, 在 SkyWalking 的早期命名为 LeafSpan. 例如 通过 JDBC 访问DB, 读取 Redis/Memcached 被归类为 ExitSpan.

X83TBT.png

上下文载体 (ContextCarrier)

为了实现分布式跟踪, 需要绑定跨进程的追踪, 并且上下文应该在整个过程中随之传播. 这就是 ContextCarrier 的职责.

以下是有关如何在 A -> B 分布式调用中使用 ContextCarrier 的步骤.

  1. 在客户端, 创建一个新的空的 ContextCarrier.
  2. 通过 ContextManager#createExitSpan 创建一个 ExitSpan 或者使用 ContextManager#inject 来初始化 ContextCarrier.
  3. ContextCarrier 所有信息放到请求头 (如 HTTP HEAD), 附件(如 Dubbo RPC 框架), 或者消息 (如 Kafka) 中,详情可以看官方给出跨进程传输协议sw8
  4. 通过服务调用, 将 ContextCarrier 传递到服务端.
  5. 在服务端, 在对应组件的头部, 附件或消息中获取 ContextCarrier 所有内容.
  6. 通过 ContestManager#createEntrySpan 创建 EntrySpan 或者使用 ContextManager#extract 将服务端和客户端的绑定.

让我们通过 Apache HttpComponent client 插件和 Tomcat 7 服务器插件演示, 步骤如下:

  1. 客户端 Apache HttpComponent client 插件
1
2
3
4
5
6
span = ContextManager.createExitSpan("/span/operation/name", contextCarrier, "ip:port");
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
httpRequest.setHeader(next.getHeadKey(), next.getHeadValue());
}
  1. 服务端 Tomcat 7 服务器插件
1
2
3
4
5
6
7
8
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}

span = ContextManager.createEntrySpan("/span/operation/name", contextCarrier);

上下文快照 (ContextSnapshot)

除了跨进程, 跨线程也是需要支持的, 例如异步线程(内存中的消息队列)和批处理在 Java 中很常见, 跨进程和跨线程十分相似, 因为都是需要传播上下文. 唯一的区别是, 不需要跨线程序列化.

以下是有关跨线程传播的三个步骤:

  1. 使用 ContextManager#capture 方法获取 ContextSnapshot 对象.
  2. 让子线程以任何方式, 通过方法参数或由现有参数携带来访问 ContextSnapshot
  3. 在子线程中使用 ContextManager#continued

跨进程Span传输原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CarrierItem implements Iterator<CarrierItem> {
private String headKey;
private String headValue;
private CarrierItem next;

public CarrierItem(String headKey, String headValue) {
this(headKey, headValue, null);
}

public CarrierItem(String headKey, String headValue, CarrierItem next) {
this.headKey = headKey;
this.headValue = headValue;
this.next = next;
}

public String getHeadKey() {
return headKey;
}

public String getHeadValue() {
return headValue;
}

public void setHeadValue(String headValue) {
this.headValue = headValue;
}

@Override
public boolean hasNext() {
return next != null;
}

@Override
public CarrierItem next() {
return next;
}

@Override
public void remove() {

}
}

CarrierItem 类似Map key value的数据接口,通过一个单向连接将K/V连接起来。
看下 ContextCarrier.items()方法如何创建CarrierItem

1
2
3
4
5
6
7
8
9
10
public CarrierItem items() {
//内置一个 sw8-x key
SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);
//内置 sw8-correlation key
SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(
correlationContext, sw8ExtensionCarrierItem);
//内置 sw8 key
SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);
return new CarrierItemHead(sw8CarrierItem);
}

创建一个链接CarrierItemHead->SW8CarrierItem ->SW8CorrelationCarrierItem->SW8ExtensionCarrierItem
在看下上面tomcat7 遍历CarrierItem,调用key从http header获取值设置到对象内置值,这样就可以做到将上一个进程header 值设置到下一个进程里,在调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ContextCarrier deserialize(String text, HeaderVersion version) {
if (text == null) {
return this;
}
if (HeaderVersion.v3.equals(version)) {
String[] parts = text.split("-", 8);
if (parts.length == 8) {
try {
// parts[0] is sample flag, always trace if header exists.
this.traceId = Base64.decode2UTFString(parts[1]);
this.traceSegmentId = Base64.decode2UTFString(parts[2]);
this.spanId = Integer.parseInt(parts[3]);
this.parentService = Base64.decode2UTFString(parts[4]);
this.parentServiceInstance = Base64.decode2UTFString(parts[5]);
this.parentEndpoint = Base64.decode2UTFString(parts[6]);
this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);
} catch (IllegalArgumentException ignored) {

}
}
}
return this;
}

这样刚刚new 出来ContextCarrier就可以从上一个调用者上继承所有的属性,新创建span就可以跟上一个span 关联起来了了。

开发插件

知识点

追踪的基本方法是拦截 Java 方法, 使用字节码操作技术(byte-buddy)和 AOP 概念. SkyWalking 包装了字节码操作技术并追踪上下文的传播, 所以你只需要定义拦截点(换句话说就是 Spring 的切面)。

ClassInstanceMethodsEnhancePluginDefine定义了构造方法 Contructor 拦截点和 instance method 实例方法拦截点,主要有三个方法需要被重写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 /**
* 需要被拦截Class
* @return
*/
@Override
protected ClassMatch enhanceClass() {
return null;
}

/**
* 构造器切点
* @return
*/
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}

/**
* 方法切点
* @return InstanceMethodsInterceptPoint 里面会声明拦截按个方法
*/
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}

ClassMatch 以下有四种方法表示如何去匹配目标类:

  • NameMatch.byName, 通过类的全限定名(Fully Qualified Class Name, 即 包名 + . + 类名).
  • ClassAnnotationMatch.byClassAnnotationMatch, 根据目标类是否存在某些注解.
  • MethodAnnotationMatchbyMethodAnnotationMatch, 根据目标类的方法是否存在某些注解.
  • HierarchyMatch.byHierarchyMatch, 根据目标类的父类或接口

ClassStaticMethodsEnhancePluginDefine 定义了类方法 class 静态method 拦截点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class ClassStaticMethodsEnhancePluginDefine extends ClassEnhancePluginDefine {

/**
* 构造器切点
* @return null, means enhance no constructors.
*/
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}

/**
* 方法切点
* @return null, means enhance no instance methods.
*/
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return null;
}
}

InstanceMethodsInterceptPoint 普通方法接口切点有哪些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface InstanceMethodsInterceptPoint {
/**
* class instance methods matcher.
* 可以理解成功对Class 那些方法进行增强
* ElementMatcher 是bytebuddy 类库一个方法匹配器,里面封装了各种方法匹配
* @return methods matcher
*/
ElementMatcher<MethodDescription> getMethodsMatcher();

/**
* @return represents a class name, the class instance must instanceof InstanceMethodsAroundInterceptor.
* 返回一个拦截器全类名,所有拦截器必须实现 InstanceMethodsAroundInterceptor 接口
*/
String getMethodsInterceptor();

/**
* 是否要覆盖原方法入参
* @return
*/
boolean isOverrideArgs();
}

在看下拦截器有那些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* A interceptor, which intercept method's invocation. The target methods will be defined in {@link
* ClassEnhancePluginDefine}'s subclass, most likely in {@link ClassInstanceMethodsEnhancePluginDefine}
*/
public interface InstanceMethodsAroundInterceptor {
/**
* called before target method invocation.
* 前置通知
* @param result change this result, if you want to truncate the method.
*/
void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable;

/**
* called after target method invocation. Even method's invocation triggers an exception.
* 后置通知
* @param ret the method's original return value. May be null if the method triggers an exception.
* @return the method's actual return value.
*/
Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable;

/**
* called when occur exception.
* 异常通知
* @param t the exception occur.
*/
void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t);
}

开发Skywalking实战

项目maven环境配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>tk.shenyifeng</groupId>
<artifactId>skywalking-plugin</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<skywalking.version>8.10.0</skywalking.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>${skywalking.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>java-agent-util</artifactId>
<version>${skywalking.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>

<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<relocations>
<relocation>
<pattern>net.bytebuddy</pattern>
<shadedPattern>org.apache.skywalking.apm.dependencies.net.bytebuddy</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

为了更有代表性一些,使用Skywalking官方开发的ES插件来做一个例子。为了兼容不同版本框架,Skywalking 官方使用witnessClasses,当前框架Jar存在这个Class就会任务是某个版本、同样witnessMethods当Class存在某个Method。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class AdapterActionFutureInstrumentation extends ClassEnhancePluginDefine {

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("actionGet"); //拦截方法
}

@Override
public String getMethodsInterceptor() { //拦截器全类名
return "org.apache.skywalking.apm.plugin.elasticsearch.v7.interceptor.AdapterActionFutureActionGetMethodsInterceptor";
}

@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}

@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[0];
}

@Override
protected ClassMatch enhanceClass() { //增强Class
return byName("org.elasticsearch.action.support.AdapterActionFuture");
}

@Override
protected String[] witnessClasses() {//ES7 存在Class
return new String[] {"org.elasticsearch.transport.TaskTransportChannel"};
}

@Override
protected List<WitnessMethod> witnessMethods() { //ES7 SearchHits 存在方法
return Collections.singletonList(new WitnessMethod(
"org.elasticsearch.search.SearchHits",
named("getTotalHits").and(takesArguments(0)).and(returns(named("org.apache.lucene.search.TotalHits")))
));
}
}

创建一个给定类名的拦截器,实现InstanceMethodsAroundInterceptor接口。创建一个EntrySpan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor {

private static boolean IS_SERVLET_GET_STATUS_METHOD_EXIST;
private static final String SERVLET_RESPONSE_CLASS = "javax.servlet.http.HttpServletResponse";
private static final String GET_STATUS_METHOD = "getStatus";

static {
IS_SERVLET_GET_STATUS_METHOD_EXIST = MethodUtil.isMethodExist(
TomcatInvokeInterceptor.class.getClassLoader(), SERVLET_RESPONSE_CLASS, GET_STATUS_METHOD);
}

/**
* * The {@link TraceSegment#ref} of current trace segment will reference to the trace segment id of the previous
* level if the serialized context is not null.
*
* @param result change this result, if you want to truncate the method.
*/
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Request request = (Request) allArguments[0];
ContextCarrier contextCarrier = new ContextCarrier();

CarrierItem next = contextCarrier.items();
//如果 HTTP 请求头中有符合sw8 传输协议的请求头则 取出来设置到上下文ContextCarrier
while (next.hasNext()) {
next = next.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
String operationName = String.join(":", request.getMethod(), request.getRequestURI());
AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);//关联起来
Tags.URL.set(span, request.getRequestURL().toString()); //添加 span 参数
Tags.HTTP.METHOD.set(span, request.getMethod());
span.setComponent(ComponentsDefine.TOMCAT);
SpanLayer.asHttp(span);

if (TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS) {
collectHttpParam(request, span);
}
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Request request = (Request) allArguments[0];
HttpServletResponse response = (HttpServletResponse) allArguments[1];

AbstractSpan span = ContextManager.activeSpan();
if (IS_SERVLET_GET_STATUS_METHOD_EXIST && response.getStatus() >= 400) {
span.errorOccurred();
Tags.HTTP_RESPONSE_STATUS_CODE.set(span, response.getStatus());
}
// Active HTTP parameter collection automatically in the profiling context.
if (!TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS && span.isProfiling()) {
collectHttpParam(request, span);
}
ContextManager.getRuntimeContext().remove(Constants.FORWARD_REQUEST_FLAG);
ContextManager.stopSpan();
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan span = ContextManager.activeSpan();
span.log(t);
}

private void collectHttpParam(Request request, AbstractSpan span) {
final Map<String, String[]> parameterMap = new HashMap<>();
final org.apache.coyote.Request coyoteRequest = request.getCoyoteRequest();
final Parameters parameters = coyoteRequest.getParameters();
for (final Enumeration<String> names = parameters.getParameterNames(); names.hasMoreElements(); ) {
final String name = names.nextElement();
parameterMap.put(name, parameters.getParameterValues(name));
}

if (!parameterMap.isEmpty()) {
String tagValue = CollectionUtil.toString(parameterMap);
tagValue = TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD > 0 ?
StringUtil.cut(tagValue, TomcatPluginConfig.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD) :
tagValue;
Tags.HTTP.PARAMS.set(span, tagValue);
}
}
}

开发完成拦截器后,一定要在类路径上添加skywalking-plugin.def文件,将开发后的全类名添加到配置。

xxxName = tk.shenyifeng.skywalking.plugin.RepladInstrumentation

如果jar 里面没有这个文件,插件不会被Skywalking加载的。
最后将打包的jar 放到Skywalking的plugin或者activations目录就可以了。

xml配置插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<enhanced>
<class class_name="test.apache.skywalking.apm.testcase.customize.service.TestService1">
<method method="staticMethod()" operation_name="/is_static_method" static="true"></method>
<method method="staticMethod(java.lang.String,int.class,java.util.Map,java.util.List,[Ljava.lang.Object;)"
operation_name="/is_static_method_args" static="true">
<operation_name_suffix>arg[0]</operation_name_suffix>
<operation_name_suffix>arg[1]</operation_name_suffix>
<operation_name_suffix>arg[3].[0]</operation_name_suffix>
<tag key="tag_1">arg[2].['k1']</tag>
<tag key="tag_2">arg[4].[1]</tag>
<log key="log_1">arg[4].[2]</log>
</method>
<method method="method()" static="false"></method>
<method method="method(java.lang.String,int.class)" operation_name="/method_2" static="false">
<operation_name_suffix>arg[0]</operation_name_suffix>
<tag key="tag_1">arg[0]</tag>
<log key="log_1">arg[1]</log>
</method>
<method
method="method(test.apache.skywalking.apm.testcase.customize.model.Model0,java.lang.String,int.class)"
operation_name="/method_3" static="false">
<operation_name_suffix>arg[0].id</operation_name_suffix>
<operation_name_suffix>arg[0].model1.name</operation_name_suffix>
<operation_name_suffix>arg[0].model1.getId()</operation_name_suffix>
<tag key="tag_os">arg[0].os.[1]</tag>
<log key="log_map">arg[0].getM().['k1']</log>
</method>
<method method="retString(java.lang.String)" operation_name="/retString" static="false">
<tag key="tag_ret">returnedObj</tag>
<log key="log_map">returnedObj</log>
</method>
<method method="retModel0(test.apache.skywalking.apm.testcase.customize.model.Model0)"
operation_name="/retModel0" static="false">
<tag key="tag_ret">returnedObj.model1.id</tag>
<log key="log_map">returnedObj.model1.getId()</log>
</method>
</class>

</enhanced>

通过xml配置可以省去编写Java代码,打包jar步骤

配置 说明
class_name 需要被增强Class
method 需要被增强Method,支持参数定义
operation_name 操作名称
operation_name_suffix 操作后缀,用于生成动态operation_name
tag 将在local span中添加一个tag。key的值需要在XML节点上表示
log 将在local span中添加一个log。key的值需要在XML节点上表示
arg[n] 表示输入的参数值。比如args[0]表示第一个参数
.[n] 当正在被解析的对象是Array或List,你可以用这个表达式得到对应index上的对象
.[‘key’] 当正在被解析的对象是Map, 你可以用这个表达式得到map的key

在配置文件agent.config中添加配置:

plugin.customize.enhance_file=customize_enhance.xml的绝对路径


引用资料
https://www.itmuch.com/skywalking/apm-customize-enhance-plugin/
https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.1.0/guides/Java-Plugin-Development-Guide.html