本文分享 ThreadLocal 遇到 Hystrix 时上下文信息传递的计划。

一、背景

笔者在业务开发中波及到应用 ThreadLocal 来寄存上下文链路中一些要害信息,其中一些业务实现对外部接口依赖,对这些依赖接口应用了Hystrix作熔断爱护,但在应用Hystrix作熔断爱护的办法中发现了获取 ThreadLocal 信息与预期不统一问题,本文旨在探讨如何解决这一问题。

二、ThreadLocal

在Java编程语言里ThreadLocal是用来不便开发人员在同一线程上下文中不同类、不同办法中共享信息的,ThreadLocal变量不受其余线程的影响,不同线程间互相隔离,也就是线程平安的。在理论的业务链路中从入口到具体的业务实现有时候须要共享某些通用信息,比方用户惟一标识、链路追踪惟一标识等,这些信息就能够应用ThreadLocal来存储实现,上面就是一个简略的同一链路中共享traceId的示例代码。

public class ThreadLocalUtil {     private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();     public static void setTraceId(String traceId) {        TRACE_ID.set(traceId);    }     public static String getTraceId() {        return TRACE_ID.get();    }     public static void clearTraceId() {        TRACE_ID.remove();    }}

三、Hystrix

在分布式环境中,每个零碎所依赖的内部服务不可避免的会呈现失败或超时的状况,Hystrix 通过减少对依赖服务的延时容错及失败容错逻辑,也就是所谓的「熔断」,以帮忙开发人员去灵便管制所依赖的分布式服务。

Hystrix通过隔离服务间的拜访点,阻断服务间的级联故障,并提供降级选项,这一切都是为了提供零碎整体的健壮性,在大规模分布式服务中,零碎的健壮性尤其重要。Hystrix具体的介绍能够看:Hystrix介绍

四、ThreadLocal遇上Hystrix

当业务链路中的具体实现有依赖内部服务,且作了相干熔断爱护,那么本文的两个配角就这么遇上了。

依据Hystrix的相干文档介绍咱们理解到,Hystrix提供两种线程隔离模式:信号量和线程池。

信号量模式下执行业务逻辑时处于同一线程上下文,而线程池模式则应用Hystrix提供的线程池去执行相干业务逻辑。在日常业务开发中更多须要熔断的是波及到内部网络IO调用的(如RPC调用),Hystrix存在的一个目标就是想缩小内部依赖的调用对服务容器线程的耗费,信号量模式显然不太适宜,因而咱们在绝大部分场景下应用的都是线程池模式,而Hystrix默认状况下启用的也是线程池模式。

本文想要解决的也正是在这种默认模式下才会有的问题:

1、InheritableThreadLocal

有人可能会想到是不是能够用InheritableThreadLocal去解决?

InheritableThreadLocal能够将以后线程中的线程变量信息共享到以后线程所创立的「子线程」中,但这边疏忽了一个很重要的信息,Hystrix中的线程模式底层应用的是本人保护的一个线程池,也就是其中的线程会呈现复用的状况,那么就会呈现每个线程所共享的信息都是之前首次获取到的「父线程」的共享信息,这显然不是咱们所期待的,所以InheritableThreadLocal被排除。

那么想要在Hystrix中解决这个问题怎么办?

优良的Hystrix曾经帮大家提供了相干解决方案,而且是插件化,按需定制。Hystrix的插件具体介绍请看这:Hystrix插件介绍,本文给大家介绍两种计划。

如何让ThreadLocal变量信息在HystrixCommand执行时能在Hystrix线程中正确的传递?

2、Concurrency Strategy

应用 HystrixConcurrencyStrategy插件能够来包装Hystrix线程所执行的办法,具体间接看示例代码:

public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {     @Override    public <T> Callable<T> wrapCallable(Callable<T> callable) {        String traceId = ThreadLocalUtil.getTraceId();        return () -> {            ThreadLocalUtil.setTraceId(traceId);            try {                return callable.call();            } finally {                ThreadLocalUtil.clearTraceId();            }        };    }}  // 业务代码中某处适合的中央注册下以后的策略插件HystrixPlugins.getInstance().registerConcurrencyStrategy(new MyHystrixConcurrencyStrategy());

应用这种形式非常简单,只有开发人员将本人关注的ThreadLocal值进行「复制」即可,那是不是应用这种形式就行了?

咱们留意到这种形式实质是针对HystrixCommand的run()办法(也就是加了@HystrixCommand注解的业务办法)拦挡解决,但它可能会超时或失败,那么就会去执行fallback办法,如果在 fallback办法中也想共享相干上下文信息,这时就无奈笼罩到这种场景了。

如果在你的业务中fallback不须要关注上下文信息这块的内容,那么上述这种计划就能够满足需要了,也很简略。但如果在fallback办法中也须要上下文信息,那么能够应用Hystrix提供的上面这种插件形式。

3、Command Execution Hook

应用HystrixCommandExecutionHook能够实现对Hystrix执行流程的齐全管制,你能够覆写它的一些要害节点的回调办法,以实现你的定制需要。想要更多的理解能够看下这:Command Execution Hook介绍,上面列举出HystrixCommandExecutionHook的一些罕用的要害办法:

在理解上述这些要害办法后,能够发现实现也很简略,只有在onStart()的时候「复制」下关注的上下文信息,而后在onExecutionStart()和onFallbackStart()两个办法开始执行前「粘贴」下关注的上下文信息,最初在作相应的清理行为,就能够满足需要了,示例代码如下所示:

public class MyHystrixHook extends HystrixCommandExecutionHook {         private String traceId;     @Override    public <T> void onStart(HystrixInvokable<T> commandInstance) {        copyTraceId();    }     @Override    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {        pasteTraceId();    }     @Override    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {        pasteTraceId();    }// 上面option1和option2抉择其中一种覆写就能够了//------------------------------------option1------------------------------------    @Override    public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {        ThreadLocalUtil.clearTraceId();        super.onExecutionSuccess(commandInstance);    }     @Override    public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {        ThreadLocalUtil.clearTraceId();        return super.onExecutionError(commandInstance, e);    }     @Override    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {        ThreadLocalUtil.clearTraceId();        super.onFallbackSuccess(commandInstance);    }          @Override    public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {        ThreadLocalUtil.clearTraceId();        return super.onFallbackError(commandInstance, e);    }//------------------------------------option1------------------------------------ //------------------------------------option2------------------------------------        @Override    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {        ThreadLocalUtil.clearTraceId();        super.onSuccess(commandInstance);    }     @Override    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {        ThreadLocalUtil.clearTraceId();        return super.onError(commandInstance, failureType, e);    }//------------------------------------option2------------------------------------          private void copyTraceId() {        this.traceId = ThreadLocalUtil.getTraceId();    }     private void pasteTraceId() {        ThreadLocalUtil.setTraceId(traceId);    } } // 业务代码中某处适合的的中央注册下Hook插件HystrixPlugins.getInstance().registerCommandExecutionHook(new MyHystrixHook());

那是不是这样的实现形式就解决问题了?认真想下会不会有什么问题?

咱们晓得HystrixCommandExecutionHook插件注册后,所有HystrixCommand在被调用执行的时候都会通过这些覆写的办法,也就会呈现多线程覆写traceId,那么对于这个Hook下的traceId随时可能被扭转了。假如有这样场景:

  1. 调用者线程1上下文的traceId为"t1",在调用其依赖的Hystrix办法时,traceId被设为"t1"
  2. 同一时刻调用者线程2上下文的traceId为"t2",在调用其依赖的Hystrix办法时,也会触发更改traceId为"t2"
  3. 在hystrix线程1开始执行具体业务办法时,其想「粘贴」的traceId曾经被改成"t2",而不是初始调用者线程1时所设置"t1"

为了解决下面遇到的问题,Hystrix为开发人员提供了通过HystrixRequestContext和HystrixRequestVariableDefault这两个要害类解决。

HystrixRequestContext用于记录每次Hystrix申请的上下文信息,其中有两个要害信息:

static ThreadLocal<HystrixRequestContext> requestVariables: 用于记录每次HystrixCommand执行时的上下文。

ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state:用于记录上下文真正的数据。

HystrixRequestVariableDefault的用法有点似于ThreadLocal,提供了get(),set()办法,具体能力的实现借助于HystrixRequestContext。

HystrixCommandExecutionHook插件终极解决形式的实现的示例代码如下:

public class MyHystrixHook extends HystrixCommandExecutionHook {         private HystrixRequestVariableDefault<String> requestVariable = new HystrixRequestVariableDefault<>();     public <T> void onStart(HystrixInvokable<T> commandInstance) {        HystrixRequestContext.initializeContext();                copyTraceId();    }     @Override    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {        pasteTraceId();    }     @Override    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {        pasteTraceId();    }         @Override    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {        HystrixRequestContext.getContextForCurrentThread().shutdown();        super.onSuccess(commandInstance);    }     @Override    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {        HystrixRequestContext.getContextForCurrentThread().shutdown();        return super.onError(commandInstance, failureType, e);    }          private void copyTraceId() {        requestVariable.set(ThreadLocalUtil.getTraceId());    }     private void pasteTraceId() {        ThreadLocalUtil.setTraceId(requestVariable.get());    }}

在每次Hook执行onStart()办法的时候,须要先执行HystrixRequestContext的初始化操作,而后对关注的上下文信息进行「复制」,要害代码如下:

public void set(T value) {    HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));}

把关注的信息复制到一个线程相干的ConcurrentHashMap中了,依据后面对HystrixCommandExecutionHook的介绍咱们晓得,onStart()的时候以后线程为调用者线程;

在真正开始执行HystrixCommand业务方办法的时候,此时须要进行「粘贴」上下文信息,从requestVariable.get()获取,get操作要害代码如下:

public T get() {      if (HystrixRequestContext.getContextForCurrentThread() == null) {          throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");      }      ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;           // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap      LazyInitializer<?> v = variableMap.get(this);      if (v != null) {          return (T) v.get();      }       // 省略一部分      ....}

从代码能够看出get与set操作绝对应,也是从线程相干的ConcurrentHashMap获取相应的值,从前序介绍咱们也得悉以后线程是Hystrix提供的线程池线程,与调用者线程不是同一个线程,那么这个业务关注的上下文信息还能正确的传递到Hystrix线程中吗?通过测试它的确「神奇」的正确传递了,那到底是怎么做到的呢?

原来是Hystrix「默默」的帮咱们做了,通过调试咱们看到如下一段要害代码:

    this.actual = action;    // 调用者线程HystrixRequestContext信息    this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();     this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {         @Override        public Void call() throws Exception {            HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();            try {                // 帮咱们做了一步拷贝操作                HystrixRequestContext.setContextOnCurrentThread(parentThreadState);                // 开始真正的执行业务定义的办法,此时上下文信息曾经统一了                actual.call();                return null;            } finally {                HystrixRequestContext.setContextOnCurrentThread(existingState);            }        }    });}

在执行业务定义的HystrixCommand办法前,Hystrix封装的对象帮咱们把调用者线程的上下文信息「拷贝」过去了,其实这个解决的思路有点相似于咱们前一个插件HystrixConcurrencyStrategy。

五、总结

HystrixConcurrencyStrategy 和HystrixCommandExecutionHook两者插件形式大家能够依据理论状况去断定,如果确定不须要在fallback中关注上下文传递信息,那用前者就能够了,也很简便,但如果你想解决的更彻底点,那么用后一种形式就能够了。

作者:vivo 官网商城开发团队