共计 8436 个字符,预计需要花费 22 分钟才能阅读完成。
本文分享 ThreadLocal 遇到 Hystrix 时上下文信息传递的计划。
一、背景
笔者在业务开发中波及到应用 ThreadLocal 来寄存上下文链路中一些要害信息,其中一些业务实现对外部接口依赖,对这些依赖接口应用了 Hystrix 作熔断爱护,但在应用 Hystrix 作熔断爱护的办法中发现了获取 ThreadLocal 信息与预期不统一问题,本文旨在探讨如何解决这一问题。
二、ThreadLocal
在 Java 编程语言里 ThreadLocal 是用来不便开发人员在同一线程上下文中不同类、不同办法中共享信息的,ThreadLocal 变量不受其余线程的影响,不同线程间互相隔离,也就是线程平安的。在理论的业务链路中从入口到具体的业务实现有时候须要共享某些通用信息,比方用户惟一标识、链路追踪惟一标识等,这些信息就能够应用 ThreadLocal 来存储实现,上面就是一个简略的同一链路中共享 traceId 的示例代码。
public class ThreadLocalUtil {private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();
public static void setTraceId(String traceId) {TRACE_ID.set(traceId);
}
public static String getTraceId() {return TRACE_ID.get();
}
public static void clearTraceId() {TRACE_ID.remove();
}
}
三、Hystrix
在分布式环境中,每个零碎所依赖的内部服务不可避免的会呈现失败或超时的状况,Hystrix 通过减少对依赖服务的延时容错及失败容错逻辑,也就是所谓的「熔断」,以帮忙开发人员去灵便管制所依赖的分布式服务。
Hystrix 通过隔离服务间的拜访点,阻断服务间的级联故障,并提供降级选项,这一切都是为了提供零碎整体的健壮性,在大规模分布式服务中,零碎的健壮性尤其重要。Hystrix 具体的介绍能够看:Hystrix 介绍
四、ThreadLocal 遇上 Hystrix
当业务链路中的具体实现有依赖内部服务,且作了相干熔断爱护,那么本文的两个配角就这么遇上了。
依据 Hystrix 的相干文档介绍咱们理解到,Hystrix 提供两种线程隔离模式:信号量和线程池。
信号量模式下执行业务逻辑时处于同一线程上下文,而线程池模式则应用 Hystrix 提供的线程池去执行相干业务逻辑。在日常业务开发中更多须要熔断的是波及到内部网络 IO 调用的(如 RPC 调用),Hystrix 存在的一个目标就是想缩小内部依赖的调用对服务容器线程的耗费,信号量模式显然不太适宜,因而咱们在绝大部分场景下应用的都是线程池模式,而 Hystrix 默认状况下启用的也是线程池模式。
本文想要解决的也正是在这种默认模式下才会有的问题:
1、InheritableThreadLocal
有人可能会想到是不是能够用 InheritableThreadLocal 去解决?
InheritableThreadLocal 能够将以后线程中的线程变量信息共享到以后线程所创立的「子线程」中,但这边疏忽了一个很重要的信息,Hystrix 中的线程模式底层应用的是本人保护的一个线程池,也就是其中的线程会呈现复用的状况,那么就会呈现每个线程所共享的信息都是之前首次获取到的「父线程」的共享信息,这显然不是咱们所期待的,所以 InheritableThreadLocal 被排除。
那么想要在 Hystrix 中解决这个问题怎么办?
优良的 Hystrix 曾经帮大家提供了相干解决方案,而且是插件化,按需定制。Hystrix 的插件具体介绍请看这:Hystrix 插件介绍,本文给大家介绍两种计划。
如何让 ThreadLocal 变量信息在 HystrixCommand 执行时能在 Hystrix 线程中正确的传递?
2、Concurrency Strategy
应用 HystrixConcurrencyStrategy 插件能够来包装 Hystrix 线程所执行的办法,具体间接看示例代码:
public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {String traceId = ThreadLocalUtil.getTraceId();
return () -> {ThreadLocalUtil.setTraceId(traceId);
try {return callable.call();
} finally {ThreadLocalUtil.clearTraceId();
}
};
}
}
// 业务代码中某处适合的中央注册下以后的策略插件
HystrixPlugins.getInstance().registerConcurrencyStrategy(new MyHystrixConcurrencyStrategy());
应用这种形式非常简单,只有开发人员将本人关注的 ThreadLocal 值进行「复制」即可,那是不是应用这种形式就行了?
咱们留意到这种形式实质是针对 HystrixCommand 的 run() 办法(也就是加了 @HystrixCommand 注解的业务办法)拦挡解决,但它可能会超时或失败,那么就会去执行 fallback 办法,如果在 fallback 办法中也想共享相干上下文信息,这时就无奈笼罩到这种场景了。
如果在你的业务中 fallback 不须要关注上下文信息这块的内容,那么上述这种计划就能够满足需要了,也很简略。但如果在 fallback 办法中也须要上下文信息,那么能够应用 Hystrix 提供的上面这种插件形式。
3、Command Execution Hook
应用 HystrixCommandExecutionHook 能够实现对 Hystrix 执行流程的齐全管制,你能够覆写它的一些要害节点的回调办法,以实现你的定制需要。想要更多的理解能够看下这:Command Execution Hook 介绍, 上面列举出 HystrixCommandExecutionHook 的一些罕用的要害办法:
在理解上述这些要害办法后,能够发现实现也很简略,只有在 onStart() 的时候「复制」下关注的上下文信息,而后在 onExecutionStart() 和 onFallbackStart() 两个办法开始执行前「粘贴」下关注的上下文信息,最初在作相应的清理行为,就能够满足需要了,示例代码如下所示:
public class MyHystrixHook extends HystrixCommandExecutionHook {
private String traceId;
@Override
public <T> void onStart(HystrixInvokable<T> commandInstance) {copyTraceId();
}
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {pasteTraceId();
}
@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {pasteTraceId();
}
// 上面 option1 和 option2 抉择其中一种覆写就能够了
//------------------------------------option1------------------------------------
@Override
public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {ThreadLocalUtil.clearTraceId();
super.onExecutionSuccess(commandInstance);
}
@Override
public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {ThreadLocalUtil.clearTraceId();
return super.onExecutionError(commandInstance, e);
}
@Override
public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {ThreadLocalUtil.clearTraceId();
super.onFallbackSuccess(commandInstance);
}
@Override
public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {ThreadLocalUtil.clearTraceId();
return super.onFallbackError(commandInstance, e);
}
//------------------------------------option1------------------------------------
//------------------------------------option2------------------------------------
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {ThreadLocalUtil.clearTraceId();
super.onSuccess(commandInstance);
}
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {ThreadLocalUtil.clearTraceId();
return super.onError(commandInstance, failureType, e);
}
//------------------------------------option2------------------------------------
private void copyTraceId() {this.traceId = ThreadLocalUtil.getTraceId();
}
private void pasteTraceId() {ThreadLocalUtil.setTraceId(traceId);
}
}
// 业务代码中某处适合的的中央注册下 Hook 插件
HystrixPlugins.getInstance().registerCommandExecutionHook(new MyHystrixHook());
那是不是这样的实现形式就解决问题了?认真想下会不会有什么问题?
咱们晓得 HystrixCommandExecutionHook 插件注册后,所有 HystrixCommand 在被调用执行的时候都会通过这些覆写的办法,也就会呈现多线程覆写 traceId,那么对于这个 Hook 下的 traceId 随时可能被扭转了。假如有这样场景:
- 调用者线程 1 上下文的 traceId 为 ”t1″,在调用其依赖的 Hystrix 办法时,traceId 被设为 ”t1″
- 同一时刻调用者线程 2 上下文的 traceId 为 ”t2″,在调用其依赖的 Hystrix 办法时,也会触发更改 traceId 为 ”t2″
- 在 hystrix 线程 1 开始执行具体业务办法时,其想「粘贴」的 traceId 曾经被改成 ”t2″,而不是初始调用者线程 1 时所设置 ”t1″
为了解决下面遇到的问题,Hystrix 为开发人员提供了通过 HystrixRequestContext 和 HystrixRequestVariableDefault 这两个要害类解决。
HystrixRequestContext 用于记录每次 Hystrix 申请的上下文信息,其中有两个要害信息:
static ThreadLocal<HystrixRequestContext> requestVariables: 用于记录每次 HystrixCommand 执行时的上下文。
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state:用于记录上下文真正的数据。
HystrixRequestVariableDefault 的用法有点似于 ThreadLocal,提供了 get(),set() 办法,具体能力的实现借助于 HystrixRequestContext。
HystrixCommandExecutionHook 插件终极解决形式的实现的示例代码如下:
public class MyHystrixHook extends HystrixCommandExecutionHook {private HystrixRequestVariableDefault<String> requestVariable = new HystrixRequestVariableDefault<>();
public <T> void onStart(HystrixInvokable<T> commandInstance) {HystrixRequestContext.initializeContext();
copyTraceId();}
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {pasteTraceId();
}
@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {pasteTraceId();
}
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {HystrixRequestContext.getContextForCurrentThread().shutdown();
super.onSuccess(commandInstance);
}
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {HystrixRequestContext.getContextForCurrentThread().shutdown();
return super.onError(commandInstance, failureType, e);
}
private void copyTraceId() {requestVariable.set(ThreadLocalUtil.getTraceId());
}
private void pasteTraceId() {ThreadLocalUtil.setTraceId(requestVariable.get());
}
}
在每次 Hook 执行 onStart() 办法的时候,须要先执行 HystrixRequestContext 的初始化操作,而后对关注的上下文信息进行「复制」,要害代码如下:
public void set(T value) {HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
}
把关注的信息复制到一个线程相干的 ConcurrentHashMap 中了,依据后面对 HystrixCommandExecutionHook 的介绍咱们晓得,onStart() 的时候以后线程为调用者线程;
在真正开始执行 HystrixCommand 业务方办法的时候,此时须要进行「粘贴」上下文信息,从 requestVariable.get() 获取,get 操作要害代码如下:
public T get() {if (HystrixRequestContext.getContextForCurrentThread() == null) {throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
// short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
LazyInitializer<?> v = variableMap.get(this);
if (v != null) {return (T) v.get();}
// 省略一部分
....
}
从代码能够看出 get 与 set 操作绝对应,也是从线程相干的 ConcurrentHashMap 获取相应的值,从前序介绍咱们也得悉以后线程是 Hystrix 提供的线程池线程,与调用者线程不是同一个线程,那么这个业务关注的上下文信息还能正确的传递到 Hystrix 线程中吗?通过测试它的确「神奇」的正确传递了,那到底是怎么做到的呢?
原来是 Hystrix「默默」的帮咱们做了,通过调试咱们看到如下一段要害代码:
this.actual = action;
// 调用者线程 HystrixRequestContext 信息
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// 帮咱们做了一步拷贝操作
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// 开始真正的执行业务定义的办法,此时上下文信息曾经统一了
actual.call();
return null;
} finally {HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
});
}
在执行业务定义的 HystrixCommand 办法前,Hystrix 封装的对象帮咱们把调用者线程的上下文信息「拷贝」过去了,其实这个解决的思路有点相似于咱们前一个插件 HystrixConcurrencyStrategy。
五、总结
HystrixConcurrencyStrategy 和 HystrixCommandExecutionHook 两者插件形式大家能够依据理论状况去断定,如果确定不须要在 fallback 中关注上下文传递信息,那用前者就能够了,也很简便,但如果你想解决的更彻底点,那么用后一种形式就能够了。
作者:vivo 官网商城开发团队