乐趣区

关于java:合理使用线程池以及线程变量

本文从线程池和线程变量的原理和应用登程,联合实例给出最佳应用实际,帮忙各开发人员构建出稳固、高效的 java 应用服务。

背景

随着计算技术的一直倒退,3 纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐步面临微小的物理瓶颈,通过多核处理器技术来晋升服务器的性能成为晋升算力的次要方向。

在服务器畛域,基于 java 构建的后端服务器占据着领先地位,因而,把握 java 并发编程技术,充分利用 CPU 的并发解决能力是一个开发人员必修的基本功,本文联合线程池源码和实际,简要介绍了线程池和线程变量的应用。

线程池概述

▐ 什么是线程池

线程池是一种“池化”的线程应用模式,通过创立肯定数量的线程,让这些线程处于就绪状态来进步零碎响应速度,在线程应用实现后偿还到线程池来达到反复利用的指标,从而升高系统资源的耗费。

▐ 为什么要应用线程池

总体来说,线程池有如下的劣势:

升高资源耗费。通过反复利用已创立的线程升高线程创立和销毁造成的耗费。

进步响应速度。当工作达到时,工作能够不须要等到线程创立就能立刻执行。

进步线程的可管理性。线程是稀缺资源,如果无限度的创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行对立的调配,调优和监控。

线程池的应用

▐ 线程池创立 & 外围参数设置

在 java 中,线程池的实现类是 ThreadPoolExecutor,构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit timeUnit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

能够通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创立一个线程池。

corePoolSize 参数

在构造函数中,corePoolSize 为线程池外围线程数。默认状况下,外围线程会始终存活,然而当将 allowCoreThreadTimeout 设置为 true 时,外围线程超时也会回收。

maximumPoolSize 参数

在构造函数中,maximumPoolSize 为线程池所能包容的最大线程数。

keepAliveTime 参数

在构造函数中,keepAliveTime 示意线程闲置超时时长。如果线程闲置工夫超过该时长,非核心线程就会被回收。如果将 allowCoreThreadTimeout 设置为 true 时,外围线程也会超时回收。

timeUnit 参数

在构造函数中,timeUnit 示意线程闲置超时时长的工夫单位。罕用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

blockingQueue 参数

在构造函数中,blockingQueue 示意工作队列,线程池工作队列的罕用实现类有:

ArrayBlockingQueue:一个数组实现的有界阻塞队列,此队列依照 FIFO 的准则对元素进行排序,反对偏心拜访队列。

LinkedBlockingQueue:一个由链表构造组成的可选有界阻塞队列,如果不指定大小,则应用 Integer.MAX_VALUE 作为队列大小,依照 FIFO 的准则对元素进行排序。

PriorityBlockingQueue:一个反对优先级排序的无界阻塞队列,默认状况下采纳天然顺序排列,也能够指定 Comparator。

DelayQueue:一个反对延时获取元素的无界阻塞队列,创立元素时能够指定多久当前能力从队列中获取以后元素,罕用于缓存零碎设计与定时任务调度等。

SynchronousQueue:一个不存储元素的阻塞队列。存入操作必须期待获取操作,反之亦然。

LinkedTransferQueue:一个由链表构造组成的无界阻塞队列,与 LinkedBlockingQueue 相比多了 transfer 和 tryTranfer 办法,该办法在有消费者期待接管元素时会立刻将元素传递给消费者。

LinkedBlockingDeque:一个由链表构造组成的双端阻塞队列,能够从队列的两端插入和删除元素。

threadFactory 参数

在构造函数中,threadFactory 示意线程工厂。用于指定为线程池创立新线程的形式,threadFactory 能够设置线程名称、线程组、优先级等参数。如通过 Google 工具包能够设置线程池里的线程名:

new ThreadFactoryBuilder().setNameFormat("general-detail-batch-%d").build()

RejectedExecutionHandler 参数在构造函数中,rejectedExecutionHandler 示意回绝策略。当达到最大线程数且队列工作已满时须要执行的回绝策略,常见的回绝策略如下:

ThreadPoolExecutor.AbortPolicy:默认策略,当工作队列满时抛出 RejectedExecutionException 异样。

ThreadPoolExecutor.DiscardPolicy:抛弃掉不能执行的新工作,不抛任何异样。

ThreadPoolExecutor.CallerRunsPolicy:当工作队列满时应用调用者的线程间接执行该工作。

ThreadPoolExecutor.DiscardOldestPolicy:当工作队列满时抛弃阻塞队列头部的工作(即最老的工作),而后增加当前任务。

▐  线程池状态转移图

ThreadPoolExecutor 线程池有如下几种状态:

RUNNING:运行状态,承受新工作,继续解决工作队列里的工作;

SHUTDOWN:不再承受新工作,但要解决工作队列里的工作;

STOP:不再承受新工作,不再解决工作队列里的工作,中断正在进行中的工作;

TIDYING:示意线程池正在进行运作,停止所有工作,销毁所有工作线程,当线程池执行 terminated()办法时进入 TIDYING 状态;

TERMINATED:示意线程池已进行运作,所有工作线程已被销毁,所有工作已被清空或执行结束,terminated()办法执行实现;

▐ 线程池任务调度机制

线程池提交一个工作时任务调度的次要步骤如下:

当线程池里存活的外围线程数小于 corePoolSize 外围线程数参数的值时,线程池会创立一个外围线程去解决提交的工作;

如果线程池外围线程数已满,即线程数曾经等于 corePoolSize,新提交的工作会被尝试放进工作队列 workQueue 中期待执行;

当线程池外面存活的线程数曾经等于 corePoolSize 了,且工作队列 workQueue 已满,再判断以后线程数是否已达到 maximumPoolSize,即最大线程数是否已满,如果没达到,创立一个非核心线程执行提交的工作;

如果以后的线程数已达到了 maximumPoolSize,还有新的工作提交过去时,执行回绝策略进行解决。

外围代码如下:

public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

▐ Tomcat 线程池剖析

Tomcat 申请处理过程

Tomcat 的整体架构蕴含连接器和容器两大部分,其中连接器负责与内部通信,容器负责外部逻辑解决。在连接器中:

应用 ProtocolHandler 接口来封装 I / O 模型和应用层协定的差别,其中 I / O 模型能够抉择非阻塞 I /O、异步 I / O 或 APR,应用层协定能够抉择 HTTP、HTTPS 或 AJP。ProtocolHandler 将 I / O 模型和应用层协定进行组合,让 EndPoint 只负责字节流的收发,Processor 负责将字节流解析为 Tomcat Request/Response 对象,实现功能模块的高内聚和低耦合,ProtocolHandler 接口继承关系如下图示。

通过适配器 Adapter 将 Tomcat Request 对象转换为规范的 ServletRequest 对象。

Tomcat 为了实现申请的疾速响应,应用线程池来进步申请的解决能力。上面咱们以 HTTP 非阻塞 I / O 为例对 Tomcat 线程池进行简要的剖析。

Tomcat 线程池创立

在 Tomcat 中,通过 AbstractEndpoint 类提供底层的网络 I / O 的解决,若用户没有配置自定义公共线程池,则 AbstractEndpoint 通过 createExecutor 办法来创立 Tomcat 默认线程池。

外围局部代码如下:

public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent((ThreadPoolExecutor) executor);
    }

其中,TaskQueue、ThreadPoolExecutor 别离为 Tomcat 自定义工作队列、线程池实现。

Tomcat 自定义 ThreadPoolExecutor

Tomcat 自定义线程池继承于 java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计曾经提交但尚未实现的工作数量(submittedCount),包含曾经在队列中的工作和曾经交给工作线程但还未开始执行的工作。

/**
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
 * If a RejectedExecutionHandler is not specified a default one will be configured
 * and that one will always throw a RejectedExecutionException
 *
 */
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

    /**
     * The number of tasks submitted but not yet finished. This includes tasks
     * in the queue and tasks that have been handed to a worker thread but the
     * latter did not start executing the task yet.
     * This number is always greater or equal to {@link #getActiveCount()}.
     */
    // 新增的 submittedCount 成员变量,用于统计已提交但还未实现的工作数
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
    // 构造函数
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        // 预启动所有外围线程
        prestartAllCoreThreads();}

}

Tomcat 在自定义线程池 ThreadPoolExecutor 中重写了 execute()办法,并实现对提交执行的工作进行 submittedCount 加一。Tomcat 在自定义 ThreadPoolExecutor 中,当线程池抛出 RejectedExecutionException 异样后,会调用 force()办法再次向 TaskQueue 中进行增加工作的尝试。如果增加失败,则 submittedCount 减一后,再抛出 RejectedExecutionException。

@Override
    public void execute(Runnable command) {execute(command,0,TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();
        try {super.execute(command);
        } catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();
                try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

Tomcat 自定义工作队列

在 Tomcat 中从新定义了一个阻塞队列 TaskQueue,它继承于 LinkedBlockingQueue。在 Tomcat 中,外围线程数默认值为 10,最大线程数默认为 200,为了防止线程达到外围线程数后后续工作放入队列期待,Tomcat 通过自定义工作队列 TaskQueue 重写 offer 办法实现了外围线程池数达到配置数后线程的创立。

具体地,从线程池任务调度机制实现可知,当 offer 办法返回 false 时,线程池将尝试创立新新线程,从而实现工作的疾速响应。TaskQueue 外围实现代码如下:

/**
 * As task queue specifically designed to run with a thread pool executor. The
 * task queue is optimised to properly utilize threads within a thread pool
 * executor. If you use a normal queue, the executor will spawn threads when
 * there are idle threads and you wont be able to force items onto the queue
 * itself.
 */
public class TaskQueue extends LinkedBlockingQueue<Runnable> {public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
        // 1. parent 为线程池,Tomcat 中为自定义线程池实例
      //we can't do any checks
        if (parent==null) return super.offer(o);
        // 2. 当线程数达到最大线程数时,新提交工作入队
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        // 3. 当提交的工作数小于线程池中已有的线程数时,即有闲暇线程,工作入队即可
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        // 4.【关键点】如果以后线程数量未达到最大线程数,间接返回 false,让线程池创立新线程
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        // 5. 最初的兜底,放入队列
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }   
}

Tomcat 自定义工作线程

Tomcat 中通过自定义工作线程 TaskThread 实现对每个线程创立工夫的记录;应用动态外部类 WrappingRunnable 对 Runnable 进行包装,用于对 StopPooledThreadException 异样类型的解决。

/**
 * A Thread implementation that records the time at which it was created.
 *
 */
public class TaskThread extends Thread {

    private final long creationTime;

    public TaskThread(ThreadGroup group, Runnable target, String name) {super(group, new WrappingRunnable(target), name);
        this.creationTime = System.currentTimeMillis();}


    /**
     * Wraps a {@link Runnable} to swallow any {@link StopPooledThreadException}
     * instead of letting it go and potentially trigger a break in a debugger.
     */
    private static class WrappingRunnable implements Runnable {
        private Runnable wrappedRunnable;
        WrappingRunnable(Runnable wrappedRunnable) {this.wrappedRunnable = wrappedRunnable;}
        @Override
        public void run() {
            try {wrappedRunnable.run();
            } catch(StopPooledThreadException exc) {
                //expected : we just swallow the exception to avoid disturbing
                //debuggers like eclipse's
                log.debug("Thread exiting on purpose", exc);
            }
        }

    }

}

思考 & 小结

Tomcat 为什么要自定义线程池和工作队列实现?

JUC 原生线程池在提交工作时,当工作线程数达到外围线程数后,持续提交工作会尝试将工作放入阻塞队列中,只有以后运行线程数未达到最大设定值且在工作队列工作满后,才会持续创立新的工作线程来解决工作,因而 JUC 原生线程池无奈满足 Tomcat 疾速响应的诉求。

Tomcat 为什么应用无界队列?

Tomcat 在 EndPoint 中通过 acceptCount 和 maxConnections 两个参数来防止过多申请积压。其中 maxConnections 为 Tomcat 在任意时刻接管和解决的最大连接数,当 Tomcat 接管的连接数达到 maxConnections 时,Acceptor 不会读取 accept 队列中的连贯;这时 accept 队列中的线程会始终阻塞着,直到 Tomcat 接管的连接数小于 maxConnections(maxConnections 默认为 10000,如果设置为 -1,则连接数不受限制)。acceptCount 为 accept 队列的长度,当 accept 队列中连贯的个数达到 acceptCount 时,即队列满,此时进来的申请一律被回绝,默认值是 100(基于 Tomcat 8.5.43 版本)。因而,通过 acceptCount 和 maxConnections 两个参数作用后,Tomcat 默认的无界工作队列通常不会造成 OOM。

/**
 * Allows the server developer to specify the acceptCount (backlog) that
 * should be used for server sockets. By default, this value
 * is 100.
 */
private int acceptCount = 100;

private int maxConnections = 10000;

▐ 最佳实际

防止用 Executors 的创立线程池

Executors 罕用办法有以下几个:

newCachedThreadPool():创立一个可缓存的线程池,调用 execute 将重用以前结构的线程(如果线程可用)。如果没有可用的线程,则创立一个新线程并增加到线程池中。终止并从缓存中移除那些已有 60 秒钟未被应用的线程。CachedThreadPool 实用于并发执行大量短期耗时短的工作,或者负载较轻的服务器;

newFiexedThreadPool(int nThreads):创立固定数目线程的线程池,线程数小于 nThreads 时,提交新的工作会创立新的线程,当线程数等于 nThreads 时,提交新的工作后工作会被退出到阻塞队列,正在执行的线程执行结束后从队列中取工作执行,FiexedThreadPool 实用于负载略重但工作不是特地多的场景,为了正当利用资源,须要限度线程数量;

newSingleThreadExecutor() 创立一个单线程化的 Executor,SingleThreadExecutor 实用于串行执行工作的场景,每个工作按程序执行,不须要并发执行;

newScheduledThreadPool(int corePoolSize) 创立一个反对定时及周期性的工作执行的线程池,少数状况下可用来代替 Timer 类。ScheduledThreadPool 中,返回了一个 ScheduledThreadPoolExecutor 实例,而 ScheduledThreadPoolExecutor 实际上继承了 ThreadPoolExecutor。从代码中能够看出,ScheduledThreadPool 基于 ThreadPoolExecutor,corePoolSize 大小为传入的 corePoolSize,maximumPoolSize 大小为 Integer.MAX_VALUE,超时工夫为 0,workQueue 为 DelayedWorkQueue。实际上 ScheduledThreadPool 是一个调度池,其实现了 schedule、scheduleAtFixedRate、scheduleWithFixedDelay 三个办法,能够实现提早执行、周期执行等操作;

newSingleThreadScheduledExecutor() 创立一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor;

newWorkStealingPool(int parallelism)返回一个 ForkJoinPool 实例,ForkJoinPool 次要用于实现“分而治之”的算法,适宜于计算密集型的工作。

Executors 类看起来性能比拟弱小、用起来还比拟不便,但存在如下弊病:

FiexedThreadPool 和 SingleThreadPool 工作队列长度为 Integer.MAX_VALUE,可能会沉积大量的申请,从而导致 OOM;

CachedThreadPool 和 ScheduledThreadPool 容许创立的线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM;

应用线程时,能够间接调用 ThreadPoolExecutor 的构造函数来创立线程池,并依据业务理论场景来设置 corePoolSize、blockingQueue、RejectedExecuteHandler 等参数。

防止应用部分线程池

应用部分线程池时,若工作执行完后没有执行 shutdown()办法或有其余不当援用,极易造成系统资源耗尽。

正当设置线程池参数

在工程实际中,通常应用下述公式来计算外围线程数:

nThreads=(w+c)/cnu=(w/c+1)nu

其中,w 为等待时间,c 为计算工夫,n 为 CPU 外围数(通常可通过 Runtime.getRuntime().availableProcessors()办法获取),u 为 CPU 指标利用率(取值区间为[0, 1]);在最大化 CPU 利用率的状况下,当解决的工作为计算密集型工作时,即等待时间 w 为 0,此时外围线程数等于 CPU 外围数。

上述计算公式是现实状况下的倡议外围线程数,而不同零碎 / 利用在运行不同的工作时可能会有肯定的差别,因而最佳线程数参数还须要依据工作的理论运行状况和压测体现进行微调。

减少异样解决

为了更好地发现、剖析和解决问题,倡议在应用多线程时减少对异样的解决,异样解决通常有下述计划:

在工作代码处减少 try…catch 异样解决

如果应用的 Future 形式,则可通过 Future 对象的 get 办法接管抛出的异样

为工作线程设置 setUncaughtExceptionHandler,在 uncaughtException 办法中解决异样

优雅敞开线程池

public void destroy() {
        try {poolExecutor.shutdown();
            if (!poolExecutor.awaitTermination(AWAIT_TIMEOUT, TimeUnit.SECONDS)) {poolExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // 如果以后线程被中断,从新勾销所有工作
            pool.shutdownNow();
            // 放弃中断状态
            Thread.currentThread().interrupt();
        }
    }

为了实现优雅停机的指标,咱们该当先调用 shutdown 办法,调用这个办法也就意味着,这个线程池不会再接管任何新的工作,然而曾经提交的工作还会继续执行。之后咱们还该当调用 awaitTermination 办法,这个办法能够设定线程池在敞开之前的最大超时工夫,如果在超时工夫完结之前线程池可能失常敞开则会返回 true,否则,超时会返回 false。通常咱们须要依据业务场景预估一个正当的超时工夫,而后调用该办法。

如果 awaitTermination 办法返回 false,但又心愿尽可能在线程池敞开之后再做其余资源回收工作,能够思考再调用一下 shutdownNow 办法,此时队列中所有尚未被解决的工作都会被抛弃,同时会设置线程池中每个线程的中断标记位。shutdownNow 并不保障肯定能够让正在运行的线程进行工作,除非提交给线程的工作可能正确响应中断。

鹰眼上下文参数传递

/**
* 在主线程中,开启鹰眼异步模式,并将 ctx 传递给多线程工作
**/
// 避免鹰眼链路失落,须要传递
RpcContext_inner ctx = EagleEye.getRpcContext();
// 开启异步模式
ctx.setAsyncMode(true);


/**
* 在线程池工作线程中,设置鹰眼 rpc 环境
**/
private void runTask() {
    try {EagleEye.setRpcContext(ctx);
        // do something...

    } catch (Exception e) {log.error("requestError, params: {}", this.params, e);
    } finally {
        // 判断当前任务是否是主线程在运行,当 Rejected 策略为 CallerRunsPolicy 的时候,核查以后线程
        if (mainThread != Thread.currentThread()) {EagleEye.clearRpcContext();
        }
    }

}

ThreadLocal 线程变量概述

▐ 什么是 ThreadLocal

ThreadLocal 类提供了线程本地变量(thread-local variables),这些变量不同于一般的变量,拜访线程本地变量的每个线程(通过其 get 或 set 办法)都有其本人的独立初始化的变量正本,因而 ThreadLocal 没有多线程竞争的问题,不须要独自进行加锁。

▐ ThreadLocal 应用场景

每个线程都须要有属于本人的实例数据(线程隔离);

框架跨层数据的传递;

须要参数全局传递的简单调用链路的场景;

数据库连贯的治理,在 AOP 的各种嵌套调用中保障事务的一致性;

ThreadLocal 的原理与实际

对于 ThreadLocal 而言,罕用的办法有 get/set/initialValue 3 个办法。

家喻户晓,在 java 中 SimpleDateFormat 有线程平安问题,为了平安地应用 SimpleDateFormat,除了 1)创立 SimpleDateFormat 局部变量;和 2)加同步锁 两种计划外,咱们还能够应用 3)ThreadLocal 的计划:

/**
* 应用 ThreadLocal 定义一个全局的 SimpleDateFormat
*/
private static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal = new
ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
// 用法
String dateString = simpleDateFormatThreadLocal.get().format(calendar.getTime());

▐ ThreadLocal 原理

Thread 外部保护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。

threadLocal.get()办法

/**
     * Returns the value in the current thread's copy of this
     * thread-local variable.  If the variable has no value for the
     * current thread, it is first initialized to the value returned
     * by an invocation of the {@link #initialValue} method.
     *
     * @return the current thread's value of this thread-local
     */
public T get() {
    // 1. 获取以后线程
    Thread t = Thread.currentThread();
    // 2. 获取以后线程外部的 ThreadLocalMap 变量 t.threadLocals;
    ThreadLocalMap map = getMap(t);
    // 3. 判断 map 是否为 null
    if (map != null) {
        // 4. 应用以后 threadLocal 变量获取 entry
        ThreadLocalMap.Entry e = map.getEntry(this);
        // 5. 判断 entry 是否为 null
        if (e != null) {
            // 6. 返回 Entry.value
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 7. 如果 map/entry 为 null 设置初始值
    return setInitialValue();}

 /**
     * Variant of set() to establish initialValue. Used instead
     * of set() in case user has overridden the set() method.
     *
     * @return the initial value
     */
private T setInitialValue() {
    // 1. 初始化 value,如果重写就用重写后的 value,默认 null
    T value = initialValue();
    // 2. 获取以后线程
    Thread t = Thread.currentThread();
    // 3. 获取以后线程外部的 ThreadLocalMap 变量
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 4. 不为 null 就 set, key: threadLocal, value: value
        map.set(this, value);
    else
        // 5. map 若为 null 则创立 ThreadLocalMap 对象
        createMap(t, value);
    return value;
}

/**
 * Create the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param t the current thread
 * @param firstValue value for the initial entry of the map
 */
void createMap(Thread t, T firstValue) {t.threadLocals = new ThreadLocalMap(this, firstValue);
}

/**
 * Construct a new map initially containing (firstKey, firstValue).
 * ThreadLocalMaps are constructed lazily, so we only create
 * one when we have at least one entry to put in it.
 */
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    // 1. 初始化 entry 数组,size: 16
    table = new Entry[INITIAL_CAPACITY];
    // 2. 计算 value 的 index
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    // 3. 在对应 index 地位赋值
    table[i] = new Entry(firstKey, firstValue);
    // 4. entry size
    size = 1;
    // 5. 设置 threshold: threshold = len * 2 / 3;
    setThreshold(INITIAL_CAPACITY);
}
/**
 * Set the resize threshold to maintain at worst a 2/3 load factor.
 */
private void setThreshold(int len) {threshold = len * 2 / 3;}

threadLocal.set()办法

/**
     * Sets the current thread's copy of this thread-local variable
     * to the specified value.  Most subclasses will have no need to
     * override this method, relying solely on the {@link #initialValue}
     * method to set the values of thread-locals.
     *
     * @param value the value to be stored in the current thread's copy of
     *        this thread-local.
     */
    public void set(T value) {
        // 1. 获取以后线程
        Thread t = Thread.currentThread();
        // 2. 获取以后线程外部的 ThreadLocalMap 变量
        ThreadLocalMap map = getMap(t);
        if (map != null)
            // 3. 设置 value
            map.set(this, value);
        else
            // 4. 若 map 为 null 则创立 ThreadLocalMap
            createMap(t, value);
    }

ThreadLocalMap

从 JDK 源码可见,ThreadLocalMap 中的 Entry 是弱援用类型的,这就意味着如果这个 ThreadLocal 只被这个 Entry 援用,而没有被其余对象强援用时,就会在下一次 GC 的时候回收掉。

static class ThreadLocalMap {

        /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {super(k);
                value = v;
            }
        }
    
    // ...
}

▐ ThreadLocal 示例

鹰眼链路 ThreadLocal 的应用

EagleEye(鹰眼)作为全链路监控零碎在团体外部被宽泛应用,traceId、rpcId、压测标等信息存储在 EagleEye 的 ThreadLocal 变量中,并在 HSF/Dubbo 服务调用间进行传递。EagleEye 通过 Filter 将数据初始化到 ThreadLocal 中,局部相干代码如下:

EagleEyeHttpRequest eagleEyeHttpRequest = this.convertHttpRequest(httpRequest);
// 1. 初始化,将 traceId、rpcId 等数据存储到鹰眼的 ThreadLocal 变量中
EagleEyeRequestTracer.startTrace(eagleEyeHttpRequest, false);

try {chain.doFilter(httpRequest, httpResponse);
} finally {
    // 2. 清理 ThreadLocal 变量值
    EagleEyeRequestTracer.endTrace(this.convertHttpResponse(httpResponse));
}

在 EagleEyeFilter 中,通过 EagleEyeRequestTracer.startTrace 办法进行初始化,在前置入参转换后,通过 startTrace 重载办法将鹰眼上下文参数存入 ThreadLocal 中,相干代码如下:

EagleEyeFilter 在 finally 代码块中,通过 EagleEyeRequestTracer.endTrace 办法完结调用链,通过 clear 办法将 ThreadLocal 中的数据进行清理,相干代码实现如下:

Bad case:XX 我的项目权利支付失败问题

在某权利支付原有链路中,通过 app 关上一级页面后能力发动权利支付申请,申请通过淘系无线网关 (Mtop) 后达到服务端,服务端通过 mtop sdk 获取以后会话信息。

在 XX 我的项目中,对权利支付链路进行了降级革新,在一级页面申请时,通过服务端同时发动权利支付申请。具体地,服务端在解决一级页面申请时,同时通过调用 hsf/dubbo 接口来进行权利支付,因而在发动 rpc 调用时须要携带用户以后会话信息,在服务提供端将会话信息进行提取并注入到 mtop 上下文,从而能力通过 mtop sdk 获取到会话 id 等信息。某开发同学在实现时,因 ThreadLocal 使用不当造成下述问题:

问题 1:因 ThreadLocal 初始化机会不当,造成获取不到会话信息,进而导致权利支付失败;

问题 2:申请实现时,因未清理 ThreadLocal 中的变量值,导致脏数据;

【问题 1:权利支付失败剖析】

在权利支付服务中,该利用构建了一套高效和线程平安的依赖注入框架,基于该框架的业务逻辑模块通常形象为 xxxModule 模式,Module 间为网状依赖关系,框架会按依赖关系主动调用 init 办法(其中,被依赖的 module 的 init 办法先执行)。

在利用中,权利支付接口的主入口为 CommonXXApplyModule 类,CommonXXApplyModule 依赖 XXSessionModule。当申请来长期,会按依赖关系顺次调用 init 办法,因而 XXSessionModule 的 init 办法会优先执行;而开发同学在 CommonXXApplyModule 类中的 init 办法中通过调用 recoverMtopContext()办法来冀望复原 mtop 上下文,因 recoverMtopContext()办法的调用机会过晚,从而导致 XXSessionModule 模块获取不到正确的会话 id 等信息而导致权利支付失败。

【问题 2:脏数据分析】

权利支付服务在解决申请时,若以后线程已经解决过权利支付申请,因 ThreadLocal 变量值未被清理,此时 XXSessionModule 通过 mtop SDK 获取会话信息时失去的是前一次申请的会话信息,从而造成脏数据。

【解决方案】

在依赖注入框架入口处 AbstractGate#visit(或在 XXSessionModule 中)通过 recoverMtopContext 办法注入 mtop 上下文信息,并在入口办法的 finally 代码块清理以后申请的 threadlocal 变量值。

▐ 思考 & 小结

ThreadLocalMap 中的 Entry 为什么要设计为弱援用类型?

若应用强援用类型,则 threadlocal 的援用链为:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal 对象)和 value;在这种场景下,只有这个线程还在运行(如线程池场景),若不调用 remove 办法,则该对象及关联的所有强援用对象都不会被垃圾回收器回收。

应用 static 和不应用 static 润饰 threadlocal 变量有和区别?

若应用 static 关键字进行润饰,则一个线程仅对应一个线程变量;否则,threadlocal 语义变为 perThread-perInstance,容易引发内存透露,如下述示例:

public class ThreadLocalTest {
    public static class ThreadLocalDemo {private ThreadLocal<String> threadLocalHolder = new ThreadLocal();

        public void setValue(String value) {threadLocalHolder.set(value);
        }

        public String getValue() {return threadLocalHolder.get();
        }
    }

    public static void main(String[] args) {
        int count = 3;
        List<ThreadLocalDemo> list = new LinkedList<>();
        for (int i = 0; i < count; i++) {ThreadLocalDemo demo = new ThreadLocalDemo();
            demo.setValue("demo-" + i);
            list.add(demo);
        }
        System.out.println();}
}

在上述 main 办法第 22 行 debug,可见线程的 threadLocals 变量中有 3 个 threadlocal 实例。在工程实际中,应用 threadlocal 时通常冀望一个线程只有一个 threadlocal 实例,因而,若不应用 static 润饰,冀望的语义产生了变动,同时易引起内存透露。

▐ 最佳实际

ThreadLocal 变量值初始化和清理倡议成对呈现

如果不执行清理操作,则可能会呈现:

内存透露:因为 ThreadLocalMap 的中 key 是弱援用,而 Value 是强援用。这就导致了一个问题,ThreadLocal 在没有内部对象强援用时,产生 GC 时弱援用 Key 会被回收,而 Value 不会回收,从而 Entry 外面的元素呈现 <null,value> 的状况。如果创立 ThreadLocal 的线程始终继续运行,那么这个 Entry 对象中的 value 就有可能始终得不到回收,这样可能会导致内存泄露。

脏数据:因为线程复用,在用户 1 申请时,可能保留了业务数据在 ThreadLocal 中,若不清理,则用户 2 的申请进来时,可能会读到用户 1 的数据。

倡议应用 try…finally 进行清理。

ThreadLocal 变量倡议应用 static 进行润饰

咱们在应用 ThreadLocal 时,通常冀望的语义是 perThread,若不应用 static 进行润饰,则语义变为 perThread-perInstance;在线程池场景下,若不必 static 进行润饰,创立的线程相干实例可能会达到 M * N 个(其中 M 为线程数,N 为对应类的实例数),易造成内存透露(https://errorprone.info/bugpa…)。

审慎应用 ThreadLocal.withInitial

在利用中,审慎应用 ThreadLocal.withInitial(Supplier<? extends S> supplier)这个工厂办法创立 ThreadLocal 对象,一旦不同线程的 ThreadLocal 应用了同一个 Supplier 对象,那么隔离也就无从谈起了,如:

// 反例,实际上应用了共享对象 obj 而并未隔离,private static ThreadLocal<Obj> threadLocal = ThreadLocal.withIntitial(() -> obj);

总结

在 java 工程实际中,线程池和线程变量被宽泛应用,因线程池和线程变量的不当应用常常造成平安生产事变,因而,正确应用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的应用登程,简要介绍了线程池和线程变量的原理和应用实际,各开发人员可联合最佳实际和理论利用场景,正确地应用线程和线程变量,构建出稳固、高效的 java 应用服务。

团队介绍

咱们来自大淘宝技术 - 淘宝交易平台,负责淘宝业务的商品详情、购物车、下单、订单、物流、退款等从购前、购中到购后履约的根底链路相干业务。这里有百亿级别的数据、有超过百万 QPS 的高并发流量、有丰盛的业务场景,服务于 10 亿级的消费者,撑持淘宝天猫前后端各种行业的根底业务、玩法、规定及业务拓展。这里有微小的挑战等着你来,若有趣味可将简历发至 lican.lc@alibaba-inc.com,期待您的退出!

退出移动版