乐趣区

关于springboot:用这4招优雅的实现Spring-Boot-异步线程间数据传递

Spring Boot 自定义线程池实现异步开发置信大家都理解,然而在理论开发中须要在父子线程之间传递一些数据,比方用户信息,链路信息等等

比方用户登录信息应用 ThreadLocal 寄存保障线程隔离,代码如下:

/**
 * @author 公众号:码猿技术专栏
 * @description 用户上下文信息
 */
public class OauthContext {private static  final  ThreadLocal<LoginVal> loginValThreadLocal=new ThreadLocal<>();

    public static  LoginVal get(){return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){loginValThreadLocal.set(loginVal);
    }
    public static void clear(){loginValThreadLocal.remove();
    }
}

那么子线程想要获取这个 LoginVal 如何做呢?

明天就来介绍几种优雅的形式实现 Spring Boot 外部的父子线程的数据传递。

1. 手动设置

每执行一次异步线程都要分为两步:

  1. 获取父线程的 LoginVal
  2. 将 LoginVal 设置到子线程,达到复用

代码如下:

public void handlerAsync() {
        //1\. 获取父线程的 loginVal
        LoginVal loginVal = OauthContext.get();
        log.info("父线程的值:{}",OauthContext.get());
        CompletableFuture.runAsync(()->{
            //2\. 设置子线程的值,复用
           OauthContext.set(loginVal);
           log.info("子线程的值:{}",OauthContext.get());
        });
    }

尽管可能实现目标,然而每次开异步线程都须要手动设置,反复代码太多,看了头疼,你认为优雅吗?

2. 线程池设置 TaskDecorator

TaskDecorator 是什么?官网 api 的大抵意思:这是一个执行回调办法的装璜器,次要利用于传递上下文,或者提供工作的监控 / 统计信息。

晓得有这么一个货色,如何去应用?

TaskDecorator 是一个接口,首先须要去实现它,代码如下:

/**
 * @author 公众号:码猿技术专栏
 * @description 上下文装璜器
 */
public class ContextTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        // 获取父线程的 loginVal
        LoginVal loginVal = OauthContext.get();
        return () -> {
            try {
                // 将主线程的申请信息,设置到子线程中
                OauthContext.set(loginVal);
                // 执行子线程,这一步不要忘了
                runnable.run();} finally {
                // 线程完结,清空这些信息,否则可能造成内存透露
                OauthContext.clear();}
        };
    }
}

这里我只是设置了 LoginVal,理论开发中其余的共享数据,比方 SecurityContext,RequestAttributes….

TaskDecorator 须要联合线程池应用,理论开发中异步线程倡议应用线程池,只须要在对应的线程池配置一下,代码如下:

@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
        poolTaskExecutor.setCorePoolSize(xx);
        poolTaskExecutor.setMaxPoolSize(xx);
        // 设置线程沉闷工夫(秒)poolTaskExecutor.setKeepAliveSeconds(xx);
        // 设置队列容量
        poolTaskExecutor.setQueueCapacity(xx);
        // 设置 TaskDecorator,用于解决父子线程间的数据复用
        poolTaskExecutor.setTaskDecorator(new ContextTaskDecorator());
        poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 期待所有工作完结后再敞开线程池
        poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return poolTaskExecutor;
    }

此时业务代码就不须要去设置子线程的值,间接应用即可,代码如下:

public void handlerAsync() {log.info("父线程的用户信息:{}", OauthContext.get());
        // 执行异步工作,须要指定的线程池
        CompletableFuture.runAsync(()-> log.info("子线程的用户信息:{}", OauthContext.get()),taskExecutor);
    }

来看一下后果,如下图:

这里应用的是 CompletableFuture 执行异步工作,应用 @Async 这个注解同样是可行的。

留神 :无论应用何种形式,都须要指定线程池

3. InheritableThreadLocal

这种计划不倡议应用,InheritableThreadLocal 尽管可能实现父子线程间的复用,然而在线程池中应用会存在复用的问题,具体的能够看陈某之前的文章:微服务中应用阿里开源的 TTL,优雅的实现身份信息的线程间复用

这种计划应用也是非常简单,间接用 InheritableThreadLocal 替换 ThreadLocal 即可,代码如下:

/**
 * @author 公众号:码猿技术专栏
 * @description 用户上下文信息
 */
public class OauthContext {private static  final  InheritableThreadLocal<LoginVal> loginValThreadLocal=new InheritableThreadLocal<>();

    public static  LoginVal get(){return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){loginValThreadLocal.set(loginVal);
    }
    public static void clear(){loginValThreadLocal.remove();
    }
}

4. TransmittableThreadLocal

TransmittableThreadLocal 是阿里开源的工具,补救了 InheritableThreadLocal 的缺点,在应用线程池等会池化复用线程的执行组件状况下,提供 ThreadLocal 值的传递性能,解决异步执行时上下文传递的问题。

应用起来也是非常简单,增加依赖如下:

<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>transmittable-thread-local</artifactId>
 <version>2.14.2</version>
</dependency>

OauthContext 革新代码如下:

/**
 * @author 公众号:码猿技术专栏
 * @description 用户上下文信息
 */
public class OauthContext {private static  final TransmittableThreadLocal<LoginVal> loginValThreadLocal=new TransmittableThreadLocal<>();

    public static  LoginVal get(){return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){loginValThreadLocal.set(loginVal);
    }
    public static void clear(){loginValThreadLocal.remove();
    }
}

对于 TransmittableThreadLocal 想深刻理解其原理能够看陈某之前的文章:微服务中应用阿里开源的 TTL,优雅的实现身份信息的线程间复用,利用还是十分宽泛的

TransmittableThreadLocal 原理

从定义来看,TransimittableThreadLocal 继承于 InheritableThreadLocal,并实现 TtlCopier 接口,它外面只有一个 copy 办法。所以次要是对 InheritableThreadLocal 的扩大。

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> 

在 TransimittableThreadLocal 中增加 holder 属性。这个属性的作用就是被标记为具备线程传递资格的对象都会被增加到这个对象中。

要标记一个类,比拟容易想到的形式,就是给这个类新增一个 Type 字段,还有一个办法就是将具备这种类型的的对象都增加到一个动态全局汇合中。之后应用时,这个汇合里的所有值都具备这个标记。

// 1\. holder 自身是一个 InheritableThreadLocal 对象
// 2\. 这个 holder 对象的 value 是 WeakHashMap<TransmittableThreadLocal<Object>, ?>
//   2.1 WeekHashMap 的 value 总是 null, 且不可能被应用。//    2.2 WeekHasshMap 反对 value=null
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
  @Override
  protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  }

  /**
   * 重写了 childValue 办法,实现上间接将父线程的属性作为子线程的本地变量对象。*/
  @Override
  protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
  }
};

利用代码是通过 TtlExecutors 工具类对线程池对象进行包装。工具类只是简略的判断,输出的线程池是否曾经被包装过、非空校验等,而后返回包装类 ExecutorServiceTtlWrapper。依据不同的线程池类型,有不同和的包装类。

@Nullable
public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) {if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) {return executorService;}
  return new ExecutorServiceTtlWrapper(executorService);
}

进入包装类 ExecutorServiceTtlWrapper。能够留神到不论是通过 ExecutorServiceTtlWrapper#submit 办法或者是 ExecutorTtlWrapper#execute 办法,都会将线程对象包装成 TtlCallable 或者 TtlRunnable,用于在真正执行 run 办法前做一些业务逻辑。

/**
 * 在 ExecutorServiceTtlWrapper 实现 submit 办法
 */
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {return executorService.submit(TtlCallable.get(task));
}

/**
 * 在 ExecutorTtlWrapper 实现 execute 办法
 */
@Override
public void execute(@NonNull Runnable command) {executor.execute(TtlRunnable.get(command));
}

所以,重点的外围逻辑应该是在 TtlCallable#call() 或者 TtlRunnable#run() 中。以下以 TtlCallable 为例,TtlRunnable 同理相似。在剖析 call() 办法之前,先看一个类 Transmitter

public static class Transmitter {
  /**
    * 捕捉以后线程中的是所有 TransimittableThreadLocal 和注册 ThreadLocal 的值。*/
  @NonNull
  public static Object capture() {return new Snapshot(captureTtlValues(), captureThreadLocalValues());
  }

    /**
    * 捕捉 TransimittableThreadLocal 的值, 将 holder 中的所有值都增加到 HashMap 后返回。*/
  private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {ttl2Value.put(threadLocal, threadLocal.copyValue());
    }
    return ttl2Value;
  }

  /**
    * 捕捉注册的 ThreadLocal 的值, 也就是本来线程中的 ThreadLocal, 能够注册到 TTL 中,在
    * 进行线程池本地变量传递时也会被传递。*/
  private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){final ThreadLocal<Object> threadLocal = entry.getKey();
      final TtlCopier<Object> copier = entry.getValue();
      threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
  }

  /**
    * 将捕捉到的本地变量进行替换子线程的本地变量,并且返回子线程现有的本地变量正本 backup。* 用于在执行 run/call 办法之后,将本地变量正本复原。*/
  @NonNull
  public static Object replay(@NonNull Object captured) {final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), 
                        replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
  }

  /**
    * 替换 TransmittableThreadLocal
    */
  @NonNull
  private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
    // 创立正本 backup
    HashMap<TransmittableThreadLocal<Object>, Object> backup = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();) {TransmittableThreadLocal<Object> threadLocal = iterator.next();
      // 对以后线程的本地变量进行正本拷贝
      backup.put(threadLocal, threadLocal.get());

      // 若呈现调用线程中不存在某个线程变量,而线程池中线程有,则删除线程池中对应的本地变量
      if (!captured.containsKey(threadLocal)) {iterator.remove();
        threadLocal.superRemove();}
    }
    // 将捕捉的 TTL 值打入线程池获取到的线程 TTL 中。setTtlValuesTo(captured);
    // 是一个扩大点,调用 TTL 的 beforeExecute 办法。默认实现为空
    doExecuteCallback(true);
    return backup;
  }

  private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
    final HashMap<ThreadLocal<Object>, Object> backup = 
      new HashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {final ThreadLocal<Object> threadLocal = entry.getKey();
      backup.put(threadLocal, threadLocal.get());
      final Object value = entry.getValue();
      if (value == threadLocalClearMark) threadLocal.remove();
      else threadLocal.set(value);
    }
    return backup;
  }

  /**
    * 革除复线线程的所有 TTL 和 TL,并返回革除之气的 backup
    */
  @NonNull
  public static Object clear() {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();

    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){final ThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal2Value.put(threadLocal, threadLocalClearMark);
    }
    return replay(new Snapshot(ttl2Value, threadLocal2Value));
  }

  /**
    * 还原
    */
  public static void restore(@NonNull Object backup) {final Snapshot backupSnapshot = (Snapshot) backup;
    restoreTtlValues(backupSnapshot.ttl2Value);
    restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
  }

  private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // 扩大点,调用 TTL 的 afterExecute
    doExecuteCallback(false);

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();) {TransmittableThreadLocal<Object> threadLocal = iterator.next();

      if (!backup.containsKey(threadLocal)) {iterator.remove();
        threadLocal.superRemove();}
    }

    // 将本地变量复原成备份版本
    setTtlValuesTo(backup);
  }

  private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {TransmittableThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal.set(entry.getValue());
    }
  }

  private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {final ThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal.set(entry.getValue());
    }
  }

  /**
   * 快照类,保留 TTL 和 TL
   */
  private static class Snapshot {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;

    private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value,
                     HashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
      this.ttl2Value = ttl2Value;
      this.threadLocal2Value = threadLocal2Value;
    }
  }

进入 TtlCallable#call() 办法。

@Override
public V call() throws Exception {Object captured = capturedRef.get();
  if (captured == null || releaseTtlValueReferenceAfterCall && 
      !capturedRef.compareAndSet(captured, null)) {throw new IllegalStateException("TTL value reference is released after call!");
  }
  // 调用 replay 办法将捕捉到的以后线程的本地变量,传递给线程池线程的本地变量,// 并且获取到线程池线程笼罩之前的本地变量正本。Object backup = replay(captured);
  try {
    // 线程办法调用
    return callable.call();} finally {
    // 应用正本进行复原。restore(backup);
  }
}

到这基本上线程池形式传递本地变量的外围代码曾经大略看完了。总的来说在创立 TtlCallable 对象是,调用 capture() 办法捕捉调用方的本地线程变量,在 call() 执行时,将捕捉到的线程变量,替换到线程池所对应获取到的线程的本地变量中,并且在执行实现之后,将其本地变量复原到调用之前。

总结

上述列举了 4 种计划,这里举荐计划 2 和计划 4,其中两种计划的毛病非常明显,理论开发中也是采纳的计划 2 或者计划 4

退出移动版