关于美团:一次找回TraceId的问题分析与过程思考

66次阅读

共计 19996 个字符,预计需要花费 50 分钟才能阅读完成。

用好中间件是每一个开发人员的基本功,一个业余的开发人员,谋求的不仅是中间件的日常应用,还要探索这背地的设计初衷和底层逻辑,进而保障咱们的零碎运行更加稳固,让开发工作更加高效。联合这一主题,本文从一次线上告警问题登程,通过第一工夫定位问题的根本原因,进而引出 Google Dapper 与 MTrace(美团外部自研)这类分布式链路追踪零碎的设计思维和实现路径,再回到问题实质深刻 @Async 的源码剖析底层的异步逻辑和实现特点,并给出 MTrace 跨线程传递生效的起因和解决方案,最初梳理目前支流的分布式跟踪零碎的现状,并联合开发人员日常应用中间件的场景提出一些思考和总结。

1. 问题背景和思考

1.1 问题背景

在一次排查线上告警的过程中,忽然发现一个链路信息有点不同寻常(这里仅展现测试复现的内容):

在机器中能够分明的发现“2022-08-02 19:26:34.952 DXMsgRemoteService”这一行日志信息并没有携带 TraceId,导致调用链路信息戛然而止,无奈追踪过后的调用状况。

1.2 问题复现和思考

在解决完线上告警后,咱们开始剖析“失落”的 TraceId 到底去了哪里?首先在代码中定位 TraceId 没有追踪到的局部,发现问题呈现在一个 @Async 注解下的办法,删除无关的业务信息代码,并减少 MTrace 埋点办法后的复现代码如下:

@SpringBootTest
@RunWith(SpringRunner.class)
@EnableAsync
public class DemoServiceTest extends TestCase {
    @Resource
        private DemoService demoService;
    @Test
        public void testTestAsy() {Tracer.serverRecv("test");
        String mainThreadName = Thread.currentThread().getName();
        long mainThreadId = Thread.currentThread().getId();
        System.out.println("------We got main thread:"+ mainThreadName + "-" +  mainThreadId + "Trace Id:" + Tracer.id() + "----------");
        demoService.testAsy();}
}
@Component
public class DemoService {
    @Async
        public void testAsy(){String asyThreadName = Thread.currentThread().getName();
        long asyThreadId = Thread.currentThread().getId();
        System.out.println("======Async====");
        System.out.println("------We got asy thread:"+ asyThreadName + "-" +  asyThreadId + "Trace Id:" + Tracer.id() + "----------");
    }
}

运行这段代码后,咱们看看控制台理论的输入后果:

------We got main thread: main - 1  Trace Id: -5292097998940230785----------
======Async====
------We got asy thread: SimpleAsyncTaskExecutor-1 - 630  Trace Id: null----------

至此咱们能够发现 TraceId 是在 @Async 异步传递的过程中产生失落景象,明确了造成这一景象的起因后,咱们开始思考:

  • MTrace(美团外部自研的分布式链路追踪零碎)这类分布式链路追踪零碎是如何设计的?
  • @Async 异步办法是如何实现的?
  • InheritableThreadLocal、TransmittableThreadLocal 和 TransmissibleThreadLocal 有什么区别?
  • 为什么 MTrace 的跨线程传递计划“生效”了?
  • 如何解决 @Async 场景下“弄丢”TraceId 的问题?
  • 目前有哪些分布式链路追踪零碎?它们又是如何解决跨线程传递问题的?

2. 深度剖析

2.1 MTrace 与 Google Dapper

MTrace 是美团参考 Google Dapper 对服务间调用链信息收集和整顿的分布式链路追踪零碎,目标是帮忙开发人员剖析零碎各项性能和疾速排查告警问题。要想理解 MTrace 是如何设计分布式链路追踪零碎的,首先看看 Google Dapper 是如何在大型分布式环境下实现分布式链路追踪。咱们先来看看下图一个残缺的分布式申请:

用户发送一个申请到前端 A,而后申请散发到两个不同的中间层服务 B 和 C,服务 B 在解决完申请后将后果返回,同时服务 C 须要持续调用后端服务 D 和 E 再将解决后的申请后果进行返回,最初由前端 A 汇总来响应用户的这次申请。

回顾这次残缺的申请咱们不难发现,要想直观牢靠的追踪多项服务的分布式申请,咱们最关注的是每组客户端和服务端之间的申请响应以及响应耗时,因而,Google Dapper 采取对每一个申请和响应设置标识符和工夫戳的形式实现链路追踪,基于这一设计思维的根本追踪树模型如下图所示:

追踪树模型由 span 组成,其中每个 span 蕴含 span name、span id、parent id 和 trace id,进一步剖析跟踪树模型中各个 span 之间的调用关系能够发现,其中没有 parent id 且 span id 为 1 代表根服务调用,span id 越小代表服务在调用链的过程中离根服务就越近,将模型中各个绝对独立的 span 分割在一起就形成了一次残缺的链路调用记录,咱们再持续深刻看看 span 外部的细节信息:

除了最根本的 span name、span id 和 parent id 之外,Annotations 扮演着重要的角色,Annotations 包含 \<Strat>、Client Send、Server Recv、Server Send、Client Recv 和 \<End> 这些注解,记录了 RPC 申请中 Client 发送申请到 Server 的解决响应工夫戳信息,其中 foo 注解代表能够自定义的业务数据,这些也会一并记录到 span 中,提供给开发人员记录业务信息;在这当中有 64 位整数形成的 trace id 作为全局的惟一标识存储在 span 中。

至此咱们曾经理解到,Google Dapper 次要是在每个申请中配置 span 信息来实现对分布式系统的追踪,那么又是用什么形式在分布式申请中植入这些追踪信息呢?

为满足低损耗、利用通明和大范畴部署的设计指标,Google Dapper 反对利用开发者依赖于大量通用组件库,实现简直零投入的老本对分布式链路进行追踪,当一个服务线程在链路中调用其余服务之前,会在 ThreadLocal 中保留本次跟踪的上下文信息,次要包含一些轻量级且易复制的信息(相似 spand id 和 trace id),当服务线程收到响应之后,利用开发者能够通过回调函数进行服务信息日志打印。

MTrace 是美团参考 Google Dapper 的设计思路并联合本身业务进行了改良和欠缺后的自研产品,具体的实现流程这里就不再赘述了,咱们重点看看 MTrace 做了哪些改良:

  • 在美团的各个中间件中埋点,来采集产生调用的调用时长和调用后果等信息,埋点的上下文次要包含传递信息、调用信息、机器相干信息和自定义信息,各个调用链路之间有一个全局且惟一的变量 TraceId 来记录一次残缺的调用状况和追踪数据。
  • 在网络间的数据传递中,MTrace 次要传递应用 UUID 异或生成的 TraceId 和示意层级和前后关系的 SpanId,反对批量压缩上报、TraceId 做聚合和 SpanId 构建状态。
  • 目前,产品曾经笼罩到 RPC 服务、HTTP 服务、MySQL、Cache 缓存和 MQ,根本实现了全笼罩。
  • MTrace 反对跨线程传递和代理来优化埋点形式,加重开发人员的应用老本。

2.2 @Async 的异步过程追溯

从 Spring3 开始提供了 @Async 注解,该注解的应用须要留神以下几点:

  1. 须要在配置类上减少 @EnableAsync 注解;
  2. @Async 注解能够标记一个异步执行的办法,也能够用来标记一个类表明该类的所有办法都是异步执行;
  3. 能够在 @Async 中自定义执行器。

咱们以 @EnableAsync 为入口开始剖析异步过程,除了根本的配置办法外,咱们重点关注下配置类 AsyncConfigurationSelector 的外部逻辑,因为默认条件下咱们应用 JDK 接口代理,这里重点看看 ProxyAsyncConfiguration 类的代码逻辑:

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        // 新建一个异步注解 bean 后置处理器
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        // 如果 @EnableAsync 注解中有自定义 annotation 配置则进行设置
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        if (this.executor != null) {
            // 设置线程处理器
            bpp.setExecutor(this.executor);
        }
        if (this.exceptionHandler != null) {
            // 设置异样处理器
            bpp.setExceptionHandler(this.exceptionHandler);
        }
        // 设置是否须要创立 CGLIB 子类代理,默认为 false
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        // 设置异步注解 bean 处理器应该遵循的执行程序,默认最低的优先级
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }
}

ProxyAsyncConfiguration 继承了父类 AbstractAsyncConfiguration 的办法,重点定义了一个 AsyncAnnotationBeanPostProcessor 的异步注解 bean 后置处理器。看到这里咱们能够晓得,@Async 次要是通过后置处理器生成一个代理对象来实现异步的执行逻辑,接下来咱们重点关注 AsyncAnnotationBeanPostProcessor 是如何实现异步的:

从类图中咱们能够直观地看到 AsyncAnnotationBeanPostProcessor 同时实现了 BeanFactoryAware 的接口,因而咱们进入 setBeanFactory() 办法,能够看到对 AsyncAnnotationAdvisor 异步注解切面进行了结构,再接着进入 AsyncAnnotationAdvisor 的 buildAdvice() 办法中能够看 AsyncExecutionInterceptor 类,再看类图发现 AsyncExecutionInterceptor 实现了 MethodInterceptor 接口,而 MethodInterceptor 是 AOP 中切入点的处理器,对于 interceptor 类型的对象,处理器中最终被调用的是 invoke 办法,所以咱们重点看看 invoke 的代码逻辑:

public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
  // 首先获取到一个线程池
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
    }
  // 封装 Callable 对象到线程池执行
    Callable<Object> task = () -> {
        try {Object result = invocation.proceed();
            if (result instanceof Future) {return ((Future<?>) result).get();}
        }
        catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
        }
        catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());
        }
        return null;
    };
  // 工作提交到线程池
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

咱们再接着看看 @Async 用了什么线程池,重点关注 determineAsyncExecutor 办法中 getExecutorQualifier 指定获取的默认线程池是哪一个:

@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);   
    return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); // 其中默认线程池是 SimpleAsyncTaskExecutor
}

至此,咱们理解到在未指定线程池的状况下调用被标记为 @Async 的办法时,Spring 会主动创立 SimpleAsyncTaskExecutor 线程池来执行该办法,从而实现异步执行过程。

2.3.“失落”TraceId 的起因

回顾咱们之前对 MTrace 的学习和理解,TraceId 等信息是在 ThreadLocal 中进行传递和保留,那么当异步办法切换线程的时候,就会呈现下图中上下文信息传递失落的问题:

上面咱们探索一下 ThreadLocal 有哪些跨线程传递计划?MTrace 又提供哪些跨线程传递计划?SimpleAsyncTaskExecutor 又有什么不一样?逐渐找到“失落”TraceId 的起因。

2.3.1 InheritableThreadLocal、TransmittableThreadLocal 和 TransmissibleThreadLocal

在后面的剖析中,咱们发现跨线程场景下上下文信息是保留在 ThreadLocal 中产生失落,那么咱们接下来看看 ThreadLocal 的特点及其延长进去的类,是否能够解决这一问题:

  • ThreadLocal 次要是为每个 ThreadLocal 对象创立一个 ThreadLocalMap 来保留对象和线程中的值的映射关系。当创立一个 ThreadLocal 对象时会调用 get() 或 set() 办法,在以后线程的中查找这个 ThreadLocal 对象对应的 Entry 对象,如果存在,就获取或设置 Entry 中的值;否则,在 ThreadLocalMap 中创立一个新的 Entry 对象。ThreadLocal 类的实例被多个线程共享,每个线程都领有本人的 ThreadLocalMap 对象,存储着本人线程中的所有 ThreadLocal 对象的键值对。ThreadLocal 的实现比较简单,但须要留神的是,如果使用不当,可能会呈现内存透露问题,因为 ThreadLocalMap 中的 Entry 对象并不会主动删除。
  • InheritableThreadLocal 的实现形式和 ThreadLocal 相似,但不同之处在于,当一个线程创立子线程时会调用 init() 办法:
private void init(ThreadGroup g, Runnable target, String name,long stackSize, AccessControlContext acc,Boolean inheritThreadLocals) {if (inheritThreadLocals && parent.inheritableThreadLocals != null)
  // 拷贝父线程的变量
    this.inheritableThreadLocals =ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);    
    this.stackSize = stackSize;
    tid = nextThreadID();}

这意味着子线程能够拜访父线程中的 InheritableThreadLocal 实例,而且在子线程中调用 set() 办法时,会在子线程本人的 inheritableThreadLocals 字段中创立一个新的 Entry 对象,而不会影响父线程中的 Entry 对象。同时,依据源码咱们也能够看到 Thread 的 init() 办法是在线程构造方法中拷贝的,在线程复用的线程池中是没有方法应用的。

  • TransmittableThreadLocal 是阿里巴巴提供的解决跨线程传递上下文的 InheritableThreadLocal 子类,引入了 holder 来保留须要在线程间进行传递的变量,大抵流程咱们能够参考上面给出的时序图剖析:

步骤能够总结为:① 装璜 Runnable,将主线程的 TTL 传入到 TtlRunnable 的构造方法中;② 将子线程的 TTL 的值进行备份,将主线程的 TTL 设置到子线程中(value 是对象援用,可能存在线程平安问题);③ 执行子线程逻辑;④ 删除子线程新增的 TTL,将备份还原从新设置到子线程的 TTL 中,从而保障了 ThreadLocal 的值在多线程环境下的传递性。

TransmittableThreadLocal 尽管解决了 InheritableThreadLocal 的继承问题,然而因为须要在序列化和反序列化时对 ThreadLocalMap 进行解决,会减少对象创立和序列化的老本,并且须要反对的序列化框架较少,不够灵便。

  • TransmissibleThreadLocal 是继承了 InheritableThreadLocal 类并重写了 get()、set() 和 remove() 办法,TransmissibleThreadLocal 的实现形式和 TransmittableThreadLocal 相似,次要的执行逻辑在 Transmitter 的 capture() 办法复制 holder 中的变量,replay() 办法过滤非父线程的 holder 的变量,restore() 来复原通过 replay() 过滤后 holder 的变量:
public class TransmissibleThreadLocal<T> extends InheritableThreadLocal<T> {
    public static class Transmitter {public static Object capture() {Map<TransmissibleThreadLocal<?>, Object> captured = new HashMap<TransmissibleThreadLocal<?>, Object>();
      // 获取所有存储在 holder 中的变量
            for (TransmissibleThreadLocal<?> threadLocal : holder.get().keySet()) {captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }
        public static Object replay(Object captured) {@SuppressWarnings("unchecked")
            Map<TransmissibleThreadLocal<?>, Object> capturedMap = (Map<TransmissibleThreadLocal<?>, Object>) captured;
            Map<TransmissibleThreadLocal<?>, Object> backup = new HashMap<TransmissibleThreadLocal<?>, Object>();
            for (Iterator<? extends Map.Entry<TransmissibleThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();iterator.hasNext();) {Map.Entry<TransmissibleThreadLocal<?>, ?> next = iterator.next();
                TransmissibleThreadLocal<?> threadLocal = next.getKey();
                // backup
                backup.put(threadLocal, threadLocal.get());
                // clear the TTL value only in captured
                // avoid extra TTL value in captured, when run task.
        // 过滤非传递的变量
                if (!capturedMap.containsKey(threadLocal)) {iterator.remove();
                    threadLocal.superRemove();}
            }
            // set value to captured TTL
            for (Map.Entry<TransmissibleThreadLocal<?>, Object> entry : capturedMap.entrySet()) {@SuppressWarnings("unchecked")
                TransmissibleThreadLocal<Object> threadLocal = (TransmissibleThreadLocal<Object>) entry.getKey();
                threadLocal.set(entry.getValue());
            }
            // call beforeExecute callback
            doExecuteCallback(true);
            return backup;
        }
        public static void restore(Object backup) {@SuppressWarnings("unchecked")
            Map<TransmissibleThreadLocal<?>, Object> backupMap = (Map<TransmissibleThreadLocal<?>, Object>) backup;
            // call afterExecute callback
            doExecuteCallback(false);
            for (Iterator<? extends Map.Entry<TransmissibleThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                                         iterator.hasNext();) {Map.Entry<TransmissibleThreadLocal<?>, ?> next = iterator.next();
                TransmissibleThreadLocal<?> threadLocal = next.getKey();
                // clear the TTL value only in backup
                // avoid the extra value of backup after restore
                if (!backupMap.containsKey(threadLocal)) {iterator.remove();
                    threadLocal.superRemove();}
            }
            // restore TTL value
            for (Map.Entry<TransmissibleThreadLocal<?>, Object> entry : backupMap.entrySet()) {@SuppressWarnings("unchecked")
                TransmissibleThreadLocal<Object> threadLocal = (TransmissibleThreadLocal<Object>) entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }
    }
}

TransmissibleThreadLocal 岂但能够解决跨线程的传递问题,还能保障子线程和主线程之间的隔离,然而目前跨线程拷贝 span 数据时,采纳浅拷贝有失落数据的危险。最初,咱们能够依据下表综合比照:

思考到 TransmittableThreadLocal 并非规范的 Java API,而是第三方库提供的,存在与其它库的兼容性问题,无形中减少了代码的复杂性和应用难度。因而,MTrace 抉择自定义实现的 TransmissibleThreadLocal 类能够不便地在跨线程和跨服务的状况下传递追踪信息,通明主动实现所有异步执行上下文的可定制、规范化的捕获传递,使得整个跟踪信息更加残缺和精确。

2.3.2 Mtrace 的跨线程传递计划

这一问题 MTrace 其实曾经提供解决方案,次要的设计思路是在子线程初始化 Runnable 对象的时候首先会去父线程的 ThreadLocal 中拿到保留的 trace 信息,而后作为参数传递给子线程,子线程在初始化的时候设置 trace 信息来防止失落。上面咱们看看具体实现。

父线程新建工作时捕获所有 TransmissibleThreadLocal 中的变量信息,如下图所示:

子线程执行工作时复制父线程捕获的 TransmissibleThreadLocal 变量信息,并返回备份的 TransmissibleThreadLocal 变量信息,如下图所示:

在子线程执行完业务流程后会复原之前备份的 TransmissibleThreadLocal 变量信息,如下图所示:

这种计划能够解决跨线程传递上下文失落的问题,然而须要代码层面的开发会减少开发人员的工作量,对于一个分布式追踪零碎而言并不是最优解:

TraceRunnable command = new TraceRunnable(runnable);
newThread(command).start();
executorService.execute(command);

因而,MTrace 同时提供无侵入形式的 javaagent&instrument 技术,能够简略了解成一个类加载时的 AOP 性能,只有在 JVM 参数增加 javaagent 的配置,不须要润饰 Runnable 或是线程池的代码,就能够在启动时加强实现跨线程传递问题。

回归到本次的问题中来,目前应用的 MDP 自身就曾经集成了 MTrace-agent 的模式,然而为什么还是会“弄丢”TraceId 呢?查看 MTrace 的 ThreadPoolTransformer 类和 ForkJoinPoolTransformer 类咱们能够晓得,MTrace 批改了 ThreadPoolExecutor 类、ScheduledThreadPoolExecutor 类和 ForkJoinTask 类的字节码,顺着这个思路咱们再看看 @Async 用到的 SimpleAsyncTaskExecutor 线程池是怎么一回事。

2.3.3 SimpleAsyncTaskExecutor 是怎么一回事

咱们先深刻 SimpleAsyncTaskExecutor 的代码中,看看执行逻辑:

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable {
    private ThreadFactory threadFactory;
    public void execute(Runnable task, long startTimeout) {Assert.notNull(task, "Runnable must not be null");
    //isThrottleActive 是否开启限流(默认 concurrencyLimit=-1,不开启限流)if(this.isThrottleActive() && startTimeout > 0L) {this.concurrencyThrottle.beforeAccess();
            this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));
            this.concurrencyThrottle.beforeAccess();
            this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));
            this.concurrencyThrottle.beforeAccess();
            this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));
        } else {this.doExecute(task);
        }
    }
    protected void doExecute(Runnable task) {
    // 没有线程工厂的话默认创立线程
        Thread thread = this.threadFactory != null?this.threadFactory.newThread(task):this.createThread(task);        
        thread.start();}
    public Thread createThread(Runnable runnable) {
    // 和线程池不同,每次都是创立新的线程
        Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
        thread.setPriority(getThreadPriority());
        thread.setDaemon(isDaemon());
        return thread;
    }
}

看到这里咱们能够得出以下几个个性:

  • SimpleAsyncTaskExecutor 每次执行提交给它的工作时,会启动新的线程,并不是严格意义上的线程池,达不到线程复用的性能。
  • 容许开发者管制并发线程的下限(concurrencyLimit)起到肯定的资源节流作用,但默认 concurrencyLimit 取值为 -1,即不启用资源节流,有引发内存透露的危险。
  • 阿里技术编码规约要求用 ThreadPoolExecutor 的形式来创立线程池,躲避资源耗尽的危险。

联合之前说过的 MTrace 线程池代理模型,咱们持续再来看看 SimpleAsyncTaskExecutor 的类图:

能够发现,其继承了 spring 的 TaskExecutor 接口,其实质是 java.util.concurrent.Executor,联合咱们这次“失落”的 TraceId 问题来看,咱们曾经找到了 Mtrace 的跨线程传递计划“生效”的起因:尽管 MTrace 曾经通过 javaagent&instrument 技术能够实现 Trace 信息跨线程传递,然而目前只笼罩到 ThreadPoolExecutor 类、ScheduledThreadPoolExecutor 类和 ForkJoinTask 类的字节码,而 @Async 在未指定线程池的状况下默认会启用 SimpleAsyncTaskExecutor,其本质是 java.util.concurrent.Executor 没有被笼罩到,就会造成 ThreadLocal 中的 get 办法获取信息为空,导致最终 TraceId 传递失落。

3. 解决方案

实际上 @Async 反对咱们应用自定义的线程池,能够手动自定义 Configuration 来配置 ThreadPoolExecutor 线程池,而后在注解外面指定 bean 的名称,就能够切换到对应的线程池去,能够看看上面的代码:

@Configuration
public class ThreadPoolConfig {@Bean("taskExecutor")
        public Executor taskExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

而后在注解中标注这个线程池:

@SpringBootTest
@RunWith(SpringRunner.class)
@EnableAsync
public class DemoServiceTest extends TestCase {
    @Resource
      private DemoService demoService;
    @Test
      public void testTestAsy() {Tracer.serverRecv("test");
        String mainThreadName = Thread.currentThread().getName();
        long mainThreadId = Thread.currentThread().getId();
        System.out.println("------We got main thread:"+ mainThreadName + "-" +  mainThreadId + "Trace Id:" + Tracer.id() + "----------");
        demoService.testAsy();}
}
@Component
public class DemoService {@Async("taskExecutor")
      public void testAsy(){String asyThreadName = Thread.currentThread().getName();
        long asyThreadId = Thread.currentThread().getId();
        System.out.println("======Async====");
        System.out.println("------We got asy thread:"+ asyThreadName + "-" +  asyThreadId + "Trace Id:" + Tracer.id() + "----------");
    }
}

看看输入台的打印:

------We got main thread: main - 1  Trace Id: -3495543588231940494----------
======Async====
------We got asy thread: SimpleAsyncTaskExecutor-1 - 658  Trace Id: 3495543588231940494----------

最终,咱们能够通过这一形式“找回”在 @Async 注解下跨线程传递而“失落”的 TraceId。

4. 其余计划比照

分布式追踪零碎从诞生之际到有实质性的冲破,很大水平受到 Google Dapper 的影响,目前常见的分布式追踪零碎有 Twitter 的 Zipkin、SkyWalking、阿里的 EagleEye、PinPoint 和美团的 MTrace 等,这些大多都是基于 Google Dapper 的设计思维,思考到设计思路和架构特点,咱们重点介绍 Zipkin、SkyWalking 和 EagleEye 的根本框架和跨线程解决方案(以下内容次要起源官网及作者总结,仅供参考,不形成技术倡议)。

4.1 Zipkin

Zipkin 是由 Twitter 公司奉献开发的一款开源的分布式追踪零碎,官网提供有基于 Finagle 框架(Scala 语言)的接口,而其余框架的接口由社区奉献,目前能够反对 Java、Python、Ruby 和 C# 等支流开发语言和框架,其次要性能是汇集来自各个异构零碎的实时监控数据。次要由 4 个外围组件形成,如下图所示:

  • Collector:收集器组件,它次要用于解决从内部零碎发送过去的跟踪信息,将这些信息转换为 Zipkin 外部解决的 Span 格局,以反对后续的存储、剖析、展现等性能。
  • Storage:存储组件,它次要对解决收集器接管到的跟踪信息,默认会将这些信息存储起来,同时反对批改存储策略。
  • API:API 组件,它次要用来提供内部拜访接口,比方给客户端展现跟踪信息,或是外接零碎拜访以实现监控等。
  • UI:UI 组件,基于 API 组件实现的下层利用,通过 UI 组件用户能够不便而有直观地查问和剖析跟踪信息。

当用户发动一次调用的时候,Zipkin 的客户端会在入口处先记录这次申请相干的 trace 信息,而后在调用链路上传递 trace 信息并执行理论的业务流程,为避免追踪零碎发送提早与发送失败导致用户零碎的提早与中断,采纳异步的形式发送 trace 信息给 Zipkin Collector,Zipkin Server 在收到 trace 信息后,将其存储起来。随后 Zipkin 的 Web UI 会通过 API 拜访的形式从存储中将 trace 信息提取进去剖析并展现。

最初,咱们看看 Zipkin 的跨线程传递计划的优缺点:在单个线程的调用中 Zipkin 通过定义一个 ThreadLocal\<TraceContext> local 来实现在整个线程执行过程中获取雷同的 Trace 值,然而当新起一个线程的时候 ThreadLocal 就会生效,对于这种场景,Zipkin 对于不提交线程池的场景提供 InheritableThreadLocal\<TraceContext> 来解决父子线程 trace 信息传递失落的问题。

而对于 @Async 的应用场景,Zipkin 提供 CurrentTraceContext 类首先获取父线程的 trace 信息,而后将 trace 信息复制到子线程来,其基本思路和上文 MTrace 的统一,然而须要代码开发,具备较强的侵入性。

4.2 SkyWalking

SkyWalking 是 Apache 基金会上面的一个开源的应用程序性能监控零碎,提供了一种简便的形式来清晰地观测云原生和基于容器的分布式系统。具备反对多种语言探针;微内核 + 插件的架构;存储、集群治理和应用插件汇合都能够自由选择;反对告警;优良的可视化成果的特点。其次要由 4 个外围组件形成,如下图所示:

  • 探针 :基于不同的起源可能是不一样的,但作用都是收集数据,将数据格式化为 SkyWalking 实用的格局。
  • 平台后端 :反对数据聚合,数据分析以及驱动数据流从探针到用户界面的流程。剖析包含 Skywalking 原生追踪和性能指标以及第三方起源,包含 Istio、Envoy telemetry、Zipkin 追踪格式化等。
  • 存储 :通过凋谢的插件化的接口寄存 SkyWalking 数据。用户能够抉择一个既有的存储系统,如 ElasticSearch、H2 或 MySQL 集群(Sharding-Sphere 治理),也能够指定抉择实现一个存储系统。
  • UI:一个基于接口高度定制化的 Web 零碎,用户能够可视化查看和治理 SkyWalking 数据。

SkyWalking 的工作原理和 Zipkin 相似,然而相比拟于 Zipkin 接入零碎的形式,SkyWalking 应用了插件化 +javaagent 的模式来实现:通过虚拟机提供的用于批改代码的接口来动静退出打点的代码,如通过 javaagent premain 来批改 Java 类,在零碎运行时操作代码,让用户能够在不须要批改代码的状况下进行链路追踪,对业务的代码无侵入性,同时应用字节码操作技术(Byte-Buddy)和 AOP 概念来实现拦挡追踪上下文的 trace 信息,这样一来每个用户只须要依据本人的需用定义拦挡点,就能够实现对一些模块施行分布式追踪。

最初,咱们总结一下 SkyWalking 的跨线程传递计划的优缺点:和支流的分布式追踪零碎相似,SkyWalking 也是借助 ThreadLocal 来存储上下文信息,当遇到跨线程传输时也面临传递失落的场景,针对这一问题 SkyWalking 会在父线程调用 ContextManager.capture() 将 trace 信息保留到一个 ContextSnapshot 的实例中并返回,ContextSnapshott 则被附加到工作对象的特定属性中,那么当子线程解决工作对象的时会先取出 ContextSnapshott 对象,将其作为入参调用 ContextManager.continued(contextSnapshot) 来保留到子线程中。

整体思路其实和支流的分布式追踪零碎的类似,SkyWalking 目前只针对带有 @TraceCrossThread 注解的 Callable、Runnable 和 Supplier 这三种接口的实现类进行加强拦挡,通过应用 xxxWrapper.of 的包装形式,防止开发者须要大的代码改变。

4.3 EagleEye

EagleEye 阿里巴巴开源的利用性能监控工具,提供了多维度、实时、自动化的利用性能监控和剖析能力。它能够帮忙开发人员实时监控应用程序的性能指标、日志、异样信息等,并提供相应的性能剖析和报告,帮忙开发人员疾速定位和解决问题。次要由以下 5 局部组成:

  • 代理 :代理是鹰眼的数据采集组件,通过代理能够采集应用程序的性能指标、日志、异样信息等数据,并将其传输到鹰眼的存储和剖析组件中。代理反对多种协定,如 HTTP、Dubbo、RocketMQ、Kafka 等,可能满足不同场景下的数据采集需要。
  • 存储 :存储是鹰眼的数据存储组件,负责存储代理采集的数据,并提供高可用、高性能、高牢靠的数据存储服务。存储反对多种存储引擎,如 HBase、Elasticsearch、TiDB 等,能够依据理论状况进行抉择和配置。
  • 剖析 :剖析是鹰眼的数据分析组件,负责对代理采集的数据进行实时剖析和解决,并生成相应的监控指标和性能报告。剖析反对多种剖析引擎,如 Apache Flink、Apache Spark 等,能够依据理论状况进行抉择和配置。
  • 可视化 :可视化是鹰眼的数据展现组件,负责将剖析产生的监控指标和性能报告以图形化的形式展现进去,以便用户可能直观地理解零碎的运行状态和性能指标。
  • 告警 :告警是鹰眼的告警组件,负责依据用户的配置进行异样检测和告警,及时发现和解决零碎的异常情况,避免零碎呈现故障。

不同于 SkyWalking 的开源社区,EagleEye 重点面向阿里外部环境开发,针对海量实时监控的痛点,对底层的流计算、多维时序指标与交互体系等进行了大量优化,同时引入了时序检测、根因剖析、业务链路特色等技术,将问题发现与定位由被动转为被动。

EagleEye 采纳了 StreamLib 实时流式解决技术晋升流计算性能,对采集的数据进行实时剖析和解决,当监控一个电商网站时,能够实时地剖析用户拜访的日志数据,并依据剖析后果来优化网站的性能和用户体验;参考 Apache Flink 的 Snapshot 优化齐全度算法来保障监控零碎确定性;为了满足不同的个性化需要,把一些可复用的逻辑变成了“积木块”,让用户依照本人的需要,拼装流计算的 pipeline。

最初总结一下 EagleEye 的跨线程传递计划优缺点:EagleEye 的解决思路和大多数分布式追踪零碎统一,都是通过 javaagent 的形式批改线程池的实现,进而子线程能够获取到父线程到 trace 信息,不同于 SkyWalking 这种开源零碎采纳的字节码加强,EagleEye 大多数场景是外部应用,所以采纳间接编码的形式,保护和性能耗费方面也是十分有劣势的,但扩展性和开放性并不是十分敌对。

5. 总结

本文意在从日常工作中一个很轻微的问题登程,探索剖析背地的设计思维和底层起因,次要波及以下方面:

  • 抓住问题实质 :在业务零碎报警中抓住问题的外围代码并尝试再次复现问题,找到真正出问题的模块。
  • 深刻了解设计思维 :在查阅公司中间件的产品文档的根底上再持续追根溯源,学习业内领先者最开始的分布式链路追踪零碎的设计思维和实现路径。
  • 结合实际问题提出疑难 :联合理解到的分布式链路追踪零碎的实现流程和设计思维,回归到一开始咱们要解决的 TraceId 失落状况剖析是在什么环节呈现问题。
  • 浏览源码找到底层逻辑 :从 @Async 注解、SimpleAsyncTaskExecutor 和 ThreadLocal 类源码进行层层追踪,剖析底层真正的实现逻辑和特点。
  • 比照剖析找到解决方案 :剖析为什么 Mtrace 的跨线程传递计划“生效”了,找到起因提供解决方案并总结其余分布式追踪零碎。

从本文能够看出,中间件的呈现不仅为咱们保护零碎的稳固提供无力的反对,还曾经为应用中可能产生的问题提供了更高效的解决方案,作为开发人员在享受这一极大便当的同时,还是要沉下心来认真思考其中的实现逻辑和应用场景,如果只是一味的抬头应用不求甚解,那么在一些特定问题上往往会显得非常被动,无奈施展中间件真正的价值,甚至在没有中间件撑持时无奈高效的解决问题。

本文作者

李祯,美团到店事业群 / 充电宝业务部工程师。

参考资料

  • [1] Dapper, a Large-Scale Distributed Systems Tracing Infrastructure
  • [2] ThreadLocal
  • [3] Annotation Interface Async
  • [4] SkyWalking 8 官网中文文档
  • [5] Zipkin Architecture
  • [6] 阿里巴巴鹰眼技术解密

| 在美团公众号菜单栏对话框回复【2022 年货】、【2021 年货】、【2020 年货】、【2019 年货】、【2018 年货】、【2017 年货】等关键词,可查看美团技术团队历年技术文章合集。

| 本文系美团技术团队出品,著作权归属美团。欢送出于分享和交换等非商业目标转载或应用本文内容,敬请注明“内容转载自美团技术团队”。本文未经许可,不得进行商业性转载或者应用。任何商用行为,请发送邮件至 tech@meituan.com 申请受权。

正文完
 0