一、引言
海内商城从印度做起,缓缓的会有一些其余国家的诉求,这个时候须要咱们针对以后的商城做一个革新,能够撑持多个国家的商城,这里会波及多个问题,多语言,多国家,多时区,本地化等等。在多国家的状况下如何把辨认进去的国家信息传递上来,一层一层直到代码执行的最初一步。甚至还有一些多线程的场景须要解决。
二、背景技术
2.1 ThreadLocal
ThreadLocal是最容易想到了,入口辨认到国家信息后,丢进ThreadLocal,这样后续代码、redis、DB等做国家辨别的时候都能应用到。
这里先简略介绍一下ThreadLocal:
/** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value);} /** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue();} /** * Get the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @return the map */ThreadLocalMap getMap(Thread t) { return t.threadLocals;} /** * Get the entry associated with key. This method * itself handles only the fast path: a direct hit of existing * key. It otherwise relays to getEntryAfterMiss. This is * designed to maximize performance for direct hits, in part * by making this method readily inlinable. * * @param key the thread local object * @return the entry associated with key, or null if no such */private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e);}
- 每一个Thread线程都有属于本人的threadLocals(ThreadLocalMap),外面有一个弱援用的Entry(ThreadLocal,Object)。
- get办法首先通过Thread.currentThread失去以后线程,而后拿到线程的threadLocals(ThreadLocalMap),再从Entry中获得以后线程存储的value。
- set值的时候更改以后线程的threadLocals(ThreadLocalMap)中Entry对应的value值。
理论应用中除了同步办法之外,还有起异步线程解决的场景,这个时候就须要把ThreadLocal的内容从父线程传递给子线程,这个怎么办呢?
不急,Java 还有InheritableThreadLocal来帮咱们解决这个问题。
2.2 InheritableThreadLoca
public class InheritableThreadLocal<T> extends ThreadLocal<T> { /** * Computes the child's initial value for this inheritable thread-local * variable as a function of the parent's value at the time the child * thread is created. This method is called from within the parent * thread before the child is started. * <p> * This method merely returns its input argument, and should be overridden * if a different behavior is desired. * * @param parentValue the parent thread's value * @return the child thread's initial value */ protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); }}
- java.lang.Thread#init(java.lang.ThreadGroup, java.lang.Runnable, java.lang.String, long, java.security.AccessControlContext, boolean)
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
- InheritableThreadLocal操作的是inheritableThreadLocals这个变量,而不是ThreadLocal操作的threadLocals变量。
- 创立新线程的时候会查看父线程中parent.inheritableThreadLocals变量是否为null,如果不为null则复制一份parent.inheritableThreadLocals的数据到子线程的this.inheritableThreadLocals中去。
- 因为复写了getMap(Thread)和CreateMap()办法间接操作inheritableThreadLocals,这样就实现了在子线程中获取父线程ThreadLocal值。
当初在应用多线程的时候,都是通过线程池来做的,这个时候用InheritableThreadLocal能够吗?会有什么问题吗?先看下上面的代码的执行状况:
- test
static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); inheritableThreadLocal.set("i am a inherit parent"); executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); } }); TimeUnit.SECONDS.sleep(1); inheritableThreadLocal.set("i am a new inherit parent");// 设置新的值 executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); } });} i am a inherit parenti am a inherit parent public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); inheritableThreadLocal.set("i am a inherit parent"); executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); inheritableThreadLocal.set("i am a old inherit parent");// 子线程中设置新的值 } }); TimeUnit.SECONDS.sleep(1); inheritableThreadLocal.set("i am a new inherit parent");// 主线程设置新的值 executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); } });} i am a inherit parenti am a old inherit parent
这里看第一个执行后果,发现主线程第二次设置的值,没有改掉,还是第一次设置的值“i am a inherit parent”,这是什么起因呢?
再看第二个例子的执行后果,发现在第一个工作中设置的“i am a old inherit parent"的值,在第二个工作中打印进去了。这又是什么起因呢?
回过头来看看下面的源码,在线程池的状况下,第一次创立线程的时候会从父线程中copy inheritableThreadLocals中的数据,所以第一个工作胜利拿到了父线程设置的”i am a inherit parent“,第二个工作执行的时候复用了第一个工作的线程,并不会触发复制父线程中的inheritableThreadLocals操作,所以即便在主线程中设置了新的值,也会不失效。同时get()办法是间接操作inheritableThreadLocals这个变量的,所以就间接拿到了第一个工作设置的值。
那遇到线程池应该怎么办呢?
2.3 TransmittableThreadLocal
TransmittableThreadLocal(TTL)这个时候就派上用场了。这是阿里开源的一个组件,咱们来看看它怎么解决线程池的问题,先来一段代码,在下面的根底上批改一下,应用TransmittableThreadLocal。
static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();// 应用TransmittableThreadLocal public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService = TtlExecutors.getTtlExecutorService(executorService); // 用TtlExecutors装璜线程池 transmittableThreadLocal.set("i am a transmittable parent"); executorService.execute(new Runnable() { @Override public void run() { System.out.println(transmittableThreadLocal.get()); transmittableThreadLocal.set("i am a old transmittable parent");// 子线程设置新的值 } }); System.out.println(transmittableThreadLocal.get()); TimeUnit.SECONDS.sleep(1); transmittableThreadLocal.set("i am a new transmittable parent");// 主线程设置新的值 executorService.execute(new Runnable() { @Override public void run() { System.out.println(transmittableThreadLocal.get()); } });} i am a transmittable parenti am a transmittable parenti am a new transmittable parent
执行代码后发现,应用TransmittableThreadLocalTtlExecutors.getTtlExecutorService(executorService)装璜线程池之后,在每次调用工作的时,都会将以后的主线程的TransmittableThreadLocal数据copy到子线程外面,执行实现后,再革除掉。同时子线程外面的批改回到主线程时其实并没有失效。这样能够保障每次工作执行的时候都是互不干涉的。这是怎么做到的呢?来看源码。
- TtlExecutors和TransmittableThreadLocal源码
private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this.capturedRef = new AtomicReference<Object>(capture()); this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;} com.alibaba.ttl.TtlRunnable#run/** * wrap method {@link Runnable#run()}. */@Overridepublic void run() { Object captured = capturedRef.get();// 获取线程的ThreadLocalMap if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } Object backup = replay(captured);// 暂存以后子线程的ThreadLocalMap到backup try { runnable.run(); } finally { restore(backup);// 复原线程执行时被改版的Threadlocal对应的值 }} com.alibaba.ttl.TransmittableThreadLocal.Transmitter#replay /** * Replay the captured {@link TransmittableThreadLocal} values from {@link #capture()}, * and return the backup {@link TransmittableThreadLocal} values in current thread before replay. * * @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()} * @return the backup {@link TransmittableThreadLocal} values before replay * @see #capture() * @since 2.3.0 */public static Object replay(Object captured) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); // backup backup.put(threadLocal, threadLocal.get()); // clear the TTL value only in captured // avoid extra TTL value in captured, when run task. if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // set value to captured TTL for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : capturedMap.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); } // call beforeExecute callback doExecuteCallback(true); return backup;} com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore /** * Restore the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}. * * @param backup the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)} * @since 2.3.0 */public static void restore(Object backup) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // call afterExecute callback doExecuteCallback(false); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); // clear the TTL value only in backup // avoid the extra value of backup after restore if (!backupMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // restore TTL value for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : backupMap.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); }}
能够看下整个过程的残缺时序图:
OK,既然问题都解决了,来看看理论应用吧,有两种应用,先看第一种,波及HTTP申请、Dubbo申请和 job,采纳的是数据级别的隔离。
三、 TTL 在海内商城的理论利用
3.1 不分库,分数据行 + SpringMVC
用户 HTTP 申请,首先咱们要从url或者cookie中解析出国家编号,而后在TransmittableThreadLocal中寄存国家信息,在 MyBatis 的拦截器中读取国家数据,进行sql革新,最终操作指定的国家数据,多线程场景下用TtlExecutors包装原有自定义线程池,保障在应用线程池的时候可能正确将国家信息传递上来。
- HTTP 申请
public class ShopShardingHelperUtil { private static TransmittableThreadLocal<String> countrySet = new TransmittableThreadLocal<>(); /** * 获取threadLocal中设置的国家标记 * @return */ public static String getCountry() { return countrySet.get(); } /** * 设置threadLocal中设置的国家 */ public static void setCountry (String country) { countrySet.set(country.toLowerCase()); } /** * 革除标记 */ public static void clear () { countrySet.remove(); }} /** 拦截器对cookie和url综合判断国家信息,放入到TransmittableThreadLocal中 **/// 设置线程中的国家标记String country = localeContext.getLocale().getCountry().toLowerCase(); ShopShardingHelperUtil.setCountry(country); /** 自定义线程池,用TtlExecutors包装原有自定义线程池 **/public static Executor getExecutor() { if (executor == null) { synchronized (TransmittableExecutor.class) { if (executor == null) { executor = TtlExecutors.getTtlExecutor(initExecutor());// 用TtlExecutors装璜Executor,联合TransmittableThreadLocal解决异步线程threadlocal传递问题 } } } return executor;} /** 理论应用线程池的中央,间接调用执行即可**/TransmittableExecutor.getExecutor().execute(new BatchExeRunnable(param1,param2)); /** mybatis的Interceptor代码, 应用TransmittableThreadLocal的国家信息,革新原有sql,加上国家参数,在增删改查sql中辨别国家数据 **/public Object intercept(Invocation invocation) throws Throwable { StatementHandler statementHandler = (StatementHandler) invocation.getTarget(); BoundSql boundSql = statementHandler.getBoundSql(); String originalSql = boundSql.getSql(); Statement statement = (Statement) CCJSqlParserUtil.parse(originalSql); String threadCountry = ShopShardingHelperUtil.getCountry(); // 线程中的国家不为空才进行解决 if (StringUtils.isNotBlank(threadCountry)) { if (statement instanceof Select) { Select selectStatement = (Select) statement; VivoSelectVisitor vivoSelectVisitor = new VivoSelectVisitor(threadCountry); vivoSelectVisitor.init(selectStatement); } else if (statement instanceof Insert) { Insert insertStatement = (Insert) statement; VivoInsertVisitor vivoInsertVisitor = new VivoInsertVisitor(threadCountry); vivoInsertVisitor.init(insertStatement); } else if (statement instanceof Update) { Update updateStatement = (Update) statement; VivoUpdateVisitor vivoUpdateVisitor = new VivoUpdateVisitor(threadCountry); vivoUpdateVisitor.init(updateStatement); } else if (statement instanceof Delete) { Delete deleteStatement = (Delete) statement; VivoDeleteVisitor vivoDeleteVisitor = new VivoDeleteVisitor(threadCountry); vivoDeleteVisitor.init(deleteStatement); } Field boundSqlField = BoundSql.class.getDeclaredField("sql"); boundSqlField.setAccessible(true); boundSqlField.set(boundSql, statement.toString()); } else { logger.error("----------- intercept not-add-country sql.... ---------" + statement.toString()); } logger.info("----------- intercept query new sql.... ---------" + statement.toString()); // 调用办法,实际上就是拦挡的办法 Object result = invocation.proceed(); return result;}
对于 Dubbo 接口和无奈判断国家信息的 HTTP 接口,在入参局部减少国家信息参数,通过拦截器或者手动set国家信息到TransmittableThreadLocal。
对于定时工作 job,因为所有国家都须要执行,所以会把所有国家进行遍历执行,这也能够通过简略的注解来解决。
这个版本的革新,点检测试也根本通过了,自动化脚本验证也是没问题的,不过因为业务倒退问题最终没上线。
3.2 分库 + SpringBoot
后续在建设新的国家商城的时候,分库分表计划调整为每个国家独立数据库,同时整体开发框架降级到SpringBoot,咱们把这套计划做了降级,总体思路是一样的,只是在实现细节上略有不同。
SpringBoot 外面的异步个别通过@Async这个注解来实现,通过自定义线程池来包装,应用时在 HTTP 申请判断locale信息的写入国家信息,后续实现切DB的操作。
对于 Dubbo 接口和无奈判断国家信息的 HTTP 接口,在入参局部减少国家信息参数,通过拦截器或者手动set国家信息到TransmittableThreadLocal。
@Beanpublic ThreadPoolTaskExecutor threadPoolTaskExecutor(){ return TtlThreadPoolExecutors.getAsyncExecutor();} public class TtlThreadPoolExecutors { private static final String COMMON_BUSINESS = "COMMON_EXECUTOR"; public static final int QUEUE_CAPACITY = 20000; public static ExecutorService getExecutorService() { return TtlExecutorServiceMananger.getExecutorService(COMMON_BUSINESS); } public static ExecutorService getExecutorService(String threadGroupName) { return TtlExecutorServiceMananger.getExecutorService(threadGroupName); } public static ThreadPoolTaskExecutor getAsyncExecutor() { // 用TtlExecutors装璜Executor,联合TransmittableThreadLocal解决异步线程threadlocal传递问题 return getTtlThreadPoolTaskExecutor(initTaskExecutor()); } private static ThreadPoolTaskExecutor initTaskExecutor () { return initTaskExecutor(TtlThreadPoolFactory.DEFAULT_CORE_SIZE, TtlThreadPoolFactory.DEFAULT_POOL_SIZE, QUEUE_CAPACITY); } private static ThreadPoolTaskExecutor initTaskExecutor (int coreSize, int poolSize, int executorQueueCapacity) { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(coreSize); taskExecutor.setMaxPoolSize(poolSize); taskExecutor.setQueueCapacity(executorQueueCapacity); taskExecutor.setKeepAliveSeconds(120); taskExecutor.setAllowCoreThreadTimeOut(true); taskExecutor.setThreadNamePrefix("TaskExecutor-ttl"); taskExecutor.initialize(); return taskExecutor; } private static ThreadPoolTaskExecutor getTtlThreadPoolTaskExecutor(ThreadPoolTaskExecutor executor) { if (null == executor || executor instanceof ThreadPoolTaskExecutorWrapper) { return executor; } return new ThreadPoolTaskExecutorWrapper(executor); }} /** * @ClassName : LocaleContextHolder * @Description : 本地化信息上下文holder */public class LocalizationContextHolder { private static TransmittableThreadLocal<LocalizationContext> localizationContextHolder = new TransmittableThreadLocal<>(); private static LocalizationInfo defaultLocalizationInfo = new LocalizationInfo(); private LocalizationContextHolder(){} public static LocalizationContext getLocalizationContext() { return localizationContextHolder.get(); } public static void resetLocalizationContext () { localizationContextHolder.remove(); } public static void setLocalizationContext (LocalizationContext localizationContext) { if(localizationContext == null) { resetLocalizationContext(); } else { localizationContextHolder.set(localizationContext); } } public static void setLocalizationInfo (LocalizationInfo localizationInfo) { LocalizationContext localizationContext = getLocalizationContext(); String brand = (localizationContext instanceof BrandLocalizationContext ? ((BrandLocalizationContext) localizationContext).getBrand() : null); if(StringUtils.isNotEmpty(brand)) { localizationContext = new SimpleBrandLocalizationContext(localizationInfo, brand); } else if(localizationInfo != null) { localizationContext = new SimpleLocalizationContext(localizationInfo); } else { localizationContext = null; } setLocalizationContext(localizationContext); } public static void setDefaultLocalizationInfo(@Nullable LocalizationInfo localizationInfo) { LocalizationContextHolder.defaultLocalizationInfo = localizationInfo; } public static LocalizationInfo getLocalizationInfo () { LocalizationContext localizationContext = getLocalizationContext(); if(localizationContext != null) { LocalizationInfo localizationInfo = localizationContext.getLocalizationInfo(); if(localizationInfo != null) { return localizationInfo; } } return defaultLocalizationInfo; } public static String getCountry(){ return getLocalizationInfo().getCountry(); } public static String getTimezone(){ return getLocalizationInfo().getTimezone(); } public static String getBrand(){ return getBrand(getLocalizationContext()); } public static String getBrand(LocalizationContext localizationContext) { if(localizationContext == null) { return null; } if(localizationContext instanceof BrandLocalizationContext) { return ((BrandLocalizationContext) localizationContext).getBrand(); } throw new LocaleException("unsupported localizationContext type"); }} @Override public LocaleContext resolveLocaleContext(final HttpServletRequest request) { parseLocaleCookieIfNecessary(request); LocaleContext localeContext = new TimeZoneAwareLocaleContext() { @Override public Locale getLocale() { return (Locale) request.getAttribute(LOCALE_REQUEST_ATTRIBUTE_NAME); } @Override public TimeZone getTimeZone() { return (TimeZone) request.getAttribute(TIME_ZONE_REQUEST_ATTRIBUTE_NAME); } }; // 设置线程中的国家标记 setLocalizationInfo(request, localeContext.getLocale()); return localeContext; } private void setLocalizationInfo(HttpServletRequest request, Locale locale) { String country = locale!=null?locale.getCountry():null; String language = locale!=null?(locale.getLanguage() + "_" + locale.getVariant()):null; LocaleRequestMessage localeRequestMessage = localeRequestParser.parse(request); final String countryStr = country; final String languageStr = language; final String brandStr = localeRequestMessage.getBrand(); LocalizationContextHolder.setLocalizationContext(new BrandLocalizationContext() { @Override public String getBrand() { return brandStr; } @Override public LocalizationInfo getLocalizationInfo() { return LocalizationInfoAssembler.assemble(countryStr, languageStr); } }); }
对于定时工作job,因为所有国家都须要执行,所以会把所有国家进行遍历执行,这也能够通过简略的注解和AOP来解决。
四、总结
本文从业务拓展的角度论述了在简单业务场景下如何通过ThreadLocal,过渡到InheritableThreadLocal,再通过TransmittableThreadLocal解决理论业务问题。因为海内的业务在一直的摸索中后退,技术也在一直的摸索中演进,面对这种复杂多变的状况,咱们的应答策略是先做国际化,再做本地化,more global能力more local,多国家的隔离只是国际化最根本的终点,将来还有很多业务和技术等着咱们去挑战。
作者:vivo 官网商城开发团队