本文从线程池和线程变量的原理和应用登程,联合实例给出最佳应用实际,帮忙各开发人员构建出稳固、高效的java应用服务。

背景

随着计算技术的一直倒退,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐步面临微小的物理瓶颈,通过多核处理器技术来晋升服务器的性能成为晋升算力的次要方向。

在服务器畛域,基于java构建的后端服务器占据着领先地位,因而,把握java并发编程技术,充分利用CPU的并发解决能力是一个开发人员必修的基本功,本文联合线程池源码和实际,简要介绍了线程池和线程变量的应用。

线程池概述

▐ 什么是线程池

线程池是一种“池化”的线程应用模式,通过创立肯定数量的线程,让这些线程处于就绪状态来进步零碎响应速度,在线程应用实现后偿还到线程池来达到反复利用的指标,从而升高系统资源的耗费。

▐ 为什么要应用线程池

总体来说,线程池有如下的劣势:

升高资源耗费。通过反复利用已创立的线程升高线程创立和销毁造成的耗费。

进步响应速度。当工作达到时,工作能够不须要等到线程创立就能立刻执行。

进步线程的可管理性。线程是稀缺资源,如果无限度的创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行对立的调配,调优和监控。

线程池的应用

▐ 线程池创立&外围参数设置

在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit timeUnit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler)

能够通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创立一个线程池。

corePoolSize参数

在构造函数中,corePoolSize为线程池外围线程数。默认状况下,外围线程会始终存活,然而当将allowCoreThreadTimeout设置为true时,外围线程超时也会回收。

maximumPoolSize参数

在构造函数中,maximumPoolSize为线程池所能包容的最大线程数。

keepAliveTime参数

在构造函数中,keepAliveTime示意线程闲置超时时长。如果线程闲置工夫超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,外围线程也会超时回收。

timeUnit参数

在构造函数中,timeUnit示意线程闲置超时时长的工夫单位。罕用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

blockingQueue参数

在构造函数中,blockingQueue示意工作队列,线程池工作队列的罕用实现类有:

ArrayBlockingQueue :一个数组实现的有界阻塞队列,此队列依照FIFO的准则对元素进行排序,反对偏心拜访队列。

LinkedBlockingQueue :一个由链表构造组成的可选有界阻塞队列,如果不指定大小,则应用Integer.MAX_VALUE作为队列大小,依照FIFO的准则对元素进行排序。

PriorityBlockingQueue :一个反对优先级排序的无界阻塞队列,默认状况下采纳天然顺序排列,也能够指定Comparator。

DelayQueue:一个反对延时获取元素的无界阻塞队列,创立元素时能够指定多久当前能力从队列中获取以后元素,罕用于缓存零碎设计与定时任务调度等。

SynchronousQueue:一个不存储元素的阻塞队列。存入操作必须期待获取操作,反之亦然。

LinkedTransferQueue:一个由链表构造组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer办法,该办法在有消费者期待接管元素时会立刻将元素传递给消费者。

LinkedBlockingDeque:一个由链表构造组成的双端阻塞队列,能够从队列的两端插入和删除元素。

threadFactory参数

在构造函数中,threadFactory示意线程工厂。用于指定为线程池创立新线程的形式,threadFactory能够设置线程名称、线程组、优先级等参数。如通过Google工具包能够设置线程池里的线程名:

new ThreadFactoryBuilder().setNameFormat("general-detail-batch-%d").build()

RejectedExecutionHandler参数在构造函数中,rejectedExecutionHandler示意回绝策略。当达到最大线程数且队列工作已满时须要执行的回绝策略,常见的回绝策略如下:

ThreadPoolExecutor.AbortPolicy:默认策略,当工作队列满时抛出RejectedExecutionException异样。

ThreadPoolExecutor.DiscardPolicy:抛弃掉不能执行的新工作,不抛任何异样。

ThreadPoolExecutor.CallerRunsPolicy:当工作队列满时应用调用者的线程间接执行该工作。

ThreadPoolExecutor.DiscardOldestPolicy:当工作队列满时抛弃阻塞队列头部的工作(即最老的工作),而后增加当前任务。

▐  线程池状态转移图

ThreadPoolExecutor线程池有如下几种状态:

RUNNING:运行状态,承受新工作,继续解决工作队列里的工作;

SHUTDOWN:不再承受新工作,但要解决工作队列里的工作;

STOP:不再承受新工作,不再解决工作队列里的工作,中断正在进行中的工作;

TIDYING:示意线程池正在进行运作,停止所有工作,销毁所有工作线程,当线程池执行terminated()办法时进入TIDYING状态;

TERMINATED:示意线程池已进行运作,所有工作线程已被销毁,所有工作已被清空或执行结束,terminated()办法执行实现;

▐ 线程池任务调度机制

线程池提交一个工作时任务调度的次要步骤如下:

当线程池里存活的外围线程数小于corePoolSize外围线程数参数的值时,线程池会创立一个外围线程去解决提交的工作;

如果线程池外围线程数已满,即线程数曾经等于corePoolSize,新提交的工作会被尝试放进工作队列workQueue中期待执行;

当线程池外面存活的线程数曾经等于corePoolSize了,且工作队列workQueue已满,再判断以后线程数是否已达到maximumPoolSize,即最大线程数是否已满,如果没达到,创立一个非核心线程执行提交的工作;

如果以后的线程数已达到了maximumPoolSize,还有新的工作提交过去时,执行回绝策略进行解决。

外围代码如下:

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }

▐ Tomcat线程池剖析

Tomcat申请处理过程

Tomcat 的整体架构蕴含连接器和容器两大部分,其中连接器负责与内部通信,容器负责外部逻辑解决。在连接器中:

应用 ProtocolHandler 接口来封装I/O模型和应用层协定的差别,其中I/O模型能够抉择非阻塞I/O、异步I/O或APR,应用层协定能够抉择HTTP、HTTPS或AJP。ProtocolHandler将I/O模型和应用层协定进行组合,让EndPoint只负责字节流的收发,Processor负责将字节流解析为Tomcat Request/Response对象,实现功能模块的高内聚和低耦合,ProtocolHandler接口继承关系如下图示。

通过适配器 Adapter 将Tomcat Request对象转换为规范的ServletRequest对象。

Tomcat为了实现申请的疾速响应,应用线程池来进步申请的解决能力。上面咱们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的剖析。

Tomcat线程池创立

在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的解决,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor办法来创立Tomcat默认线程池。

外围局部代码如下:

public void createExecutor() {        internalExecutor = true;        TaskQueue taskqueue = new TaskQueue();        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);        taskqueue.setParent( (ThreadPoolExecutor) executor);    }

其中,TaskQueue、ThreadPoolExecutor别离为Tomcat自定义工作队列、线程池实现。

Tomcat自定义ThreadPoolExecutor

Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计曾经提交但尚未实现的工作数量(submittedCount),包含曾经在队列中的工作和曾经交给工作线程但还未开始执行的工作。

/** * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient * {@link #getSubmittedCount()} method, to be used to properly handle the work queue. * If a RejectedExecutionHandler is not specified a default one will be configured * and that one will always throw a RejectedExecutionException * */public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {    /**     * The number of tasks submitted but not yet finished. This includes tasks     * in the queue and tasks that have been handed to a worker thread but the     * latter did not start executing the task yet.     * This number is always greater or equal to {@link #getActiveCount()}.     */    // 新增的submittedCount成员变量,用于统计已提交但还未实现的工作数    private final AtomicInteger submittedCount = new AtomicInteger(0);    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);    // 构造函数    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,            RejectedExecutionHandler handler) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);        // 预启动所有外围线程        prestartAllCoreThreads();    }}

Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()办法,并实现对提交执行的工作进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异样后,会调用force()办法再次向TaskQueue中进行增加工作的尝试。如果增加失败,则submittedCount减一后,再抛出RejectedExecutionException。

@Override    public void execute(Runnable command) {        execute(command,0,TimeUnit.MILLISECONDS);    }    public void execute(Runnable command, long timeout, TimeUnit unit) {        submittedCount.incrementAndGet();        try {            super.execute(command);        } catch (RejectedExecutionException rx) {            if (super.getQueue() instanceof TaskQueue) {                final TaskQueue queue = (TaskQueue)super.getQueue();                try {                    if (!queue.force(command, timeout, unit)) {                        submittedCount.decrementAndGet();                        throw new RejectedExecutionException("Queue capacity is full.");                    }                } catch (InterruptedException x) {                    submittedCount.decrementAndGet();                    throw new RejectedExecutionException(x);                }            } else {                submittedCount.decrementAndGet();                throw rx;            }        }    }

Tomcat自定义工作队列

在Tomcat中从新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,外围线程数默认值为10,最大线程数默认为200,为了防止线程达到外围线程数后后续工作放入队列期待,Tomcat通过自定义工作队列TaskQueue重写offer办法实现了外围线程池数达到配置数后线程的创立。

具体地,从线程池任务调度机制实现可知,当offer办法返回false时,线程池将尝试创立新新线程,从而实现工作的疾速响应。TaskQueue外围实现代码如下:

/** * As task queue specifically designed to run with a thread pool executor. The * task queue is optimised to properly utilize threads within a thread pool * executor. If you use a normal queue, the executor will spawn threads when * there are idle threads and you wont be able to force items onto the queue * itself. */public class TaskQueue extends LinkedBlockingQueue<Runnable> {    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {        if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected    }    @Override    public boolean offer(Runnable o) {        // 1. parent为线程池,Tomcat中为自定义线程池实例      //we can't do any checks        if (parent==null) return super.offer(o);        // 2. 当线程数达到最大线程数时,新提交工作入队        //we are maxed out on threads, simply queue the object        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);        // 3. 当提交的工作数小于线程池中已有的线程数时,即有闲暇线程,工作入队即可        //we have idle threads, just add it to the queue        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);        // 4. 【关键点】如果以后线程数量未达到最大线程数,间接返回false,让线程池创立新线程        //if we have less threads than maximum force creation of a new thread        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;        // 5. 最初的兜底,放入队列        //if we reached here, we need to add it to the queue        return super.offer(o);    }   }

Tomcat自定义工作线程

Tomcat中通过自定义工作线程TaskThread实现对每个线程创立工夫的记录;应用动态外部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异样类型的解决。

/** * A Thread implementation that records the time at which it was created. * */public class TaskThread extends Thread {    private final long creationTime;    public TaskThread(ThreadGroup group, Runnable target, String name) {        super(group, new WrappingRunnable(target), name);        this.creationTime = System.currentTimeMillis();    }    /**     * Wraps a {@link Runnable} to swallow any {@link StopPooledThreadException}     * instead of letting it go and potentially trigger a break in a debugger.     */    private static class WrappingRunnable implements Runnable {        private Runnable wrappedRunnable;        WrappingRunnable(Runnable wrappedRunnable) {            this.wrappedRunnable = wrappedRunnable;        }        @Override        public void run() {            try {                wrappedRunnable.run();            } catch(StopPooledThreadException exc) {                //expected : we just swallow the exception to avoid disturbing                //debuggers like eclipse's                log.debug("Thread exiting on purpose", exc);            }        }    }}

思考&小结

Tomcat为什么要自定义线程池和工作队列实现?

JUC原生线程池在提交工作时,当工作线程数达到外围线程数后,持续提交工作会尝试将工作放入阻塞队列中,只有以后运行线程数未达到最大设定值且在工作队列工作满后,才会持续创立新的工作线程来解决工作,因而JUC原生线程池无奈满足Tomcat疾速响应的诉求。

Tomcat为什么应用无界队列?

Tomcat在EndPoint中通过acceptCount和maxConnections两个参数来防止过多申请积压。其中maxConnections为Tomcat在任意时刻接管和解决的最大连接数,当Tomcat接管的连接数达到maxConnections时,Acceptor不会读取accept队列中的连贯;这时accept队列中的线程会始终阻塞着,直到Tomcat接管的连接数小于maxConnections(maxConnections默认为10000,如果设置为-1,则连接数不受限制)。acceptCount为accept队列的长度,当accept队列中连贯的个数达到acceptCount时,即队列满,此时进来的申请一律被回绝,默认值是100(基于Tomcat 8.5.43版本)。因而,通过acceptCount和maxConnections两个参数作用后,Tomcat默认的无界工作队列通常不会造成OOM。

/** * Allows the server developer to specify the acceptCount (backlog) that * should be used for server sockets. By default, this value * is 100. */private int acceptCount = 100;private int maxConnections = 10000;

▐ 最佳实际

防止用Executors 的创立线程池

Executors罕用办法有以下几个:

newCachedThreadPool():创立一个可缓存的线程池,调用 execute 将重用以前结构的线程(如果线程可用)。如果没有可用的线程,则创立一个新线程并增加到线程池中。终止并从缓存中移除那些已有 60 秒钟未被应用的线程。CachedThreadPool实用于并发执行大量短期耗时短的工作,或者负载较轻的服务器;

newFiexedThreadPool(int nThreads):创立固定数目线程的线程池,线程数小于nThreads时,提交新的工作会创立新的线程,当线程数等于nThreads时,提交新的工作后工作会被退出到阻塞队列,正在执行的线程执行结束后从队列中取工作执行,FiexedThreadPool实用于负载略重但工作不是特地多的场景,为了正当利用资源,须要限度线程数量;

newSingleThreadExecutor() 创立一个单线程化的 Executor,SingleThreadExecutor实用于串行执行工作的场景,每个工作按程序执行,不须要并发执行;

newScheduledThreadPool(int corePoolSize) 创立一个反对定时及周期性的工作执行的线程池,少数状况下可用来代替 Timer 类。ScheduledThreadPool中,返回了一个ScheduledThreadPoolExecutor实例,而ScheduledThreadPoolExecutor实际上继承了ThreadPoolExecutor。从代码中能够看出,ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小为传入的corePoolSize,maximumPoolSize大小为Integer.MAX_VALUE,超时工夫为0,workQueue为DelayedWorkQueue。实际上ScheduledThreadPool是一个调度池,其实现了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个办法,能够实现提早执行、周期执行等操作;

newSingleThreadScheduledExecutor() 创立一个corePoolSize为1的ScheduledThreadPoolExecutor;

newWorkStealingPool(int parallelism)返回一个ForkJoinPool实例,ForkJoinPool 次要用于实现“分而治之”的算法,适宜于计算密集型的工作。

Executors类看起来性能比拟弱小、用起来还比拟不便,但存在如下弊病:

FiexedThreadPool和SingleThreadPool工作队列长度为Integer.MAX_VALUE,可能会沉积大量的申请,从而导致OOM;

CachedThreadPool和ScheduledThreadPool容许创立的线程数量为Integer.MAX_VALUE,可能会创立大量的线程,从而导致OOM;

应用线程时,能够间接调用 ThreadPoolExecutor 的构造函数来创立线程池,并依据业务理论场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。

防止应用部分线程池

应用部分线程池时,若工作执行完后没有执行shutdown()办法或有其余不当援用,极易造成系统资源耗尽。

正当设置线程池参数

在工程实际中,通常应用下述公式来计算外围线程数:

nThreads=(w+c)/cnu=(w/c+1)nu

其中,w为等待时间,c为计算工夫,n为CPU外围数(通常可通过 Runtime.getRuntime().availableProcessors()办法获取),u为CPU指标利用率(取值区间为[0, 1]);在最大化CPU利用率的状况下,当解决的工作为计算密集型工作时,即等待时间w为0,此时外围线程数等于CPU外围数。

上述计算公式是现实状况下的倡议外围线程数,而不同零碎/利用在运行不同的工作时可能会有肯定的差别,因而最佳线程数参数还须要依据工作的理论运行状况和压测体现进行微调。

减少异样解决

为了更好地发现、剖析和解决问题,倡议在应用多线程时减少对异样的解决,异样解决通常有下述计划:

在工作代码处减少try...catch异样解决

如果应用的Future形式,则可通过Future对象的get办法接管抛出的异样

为工作线程设置setUncaughtExceptionHandler,在uncaughtException办法中解决异样

优雅敞开线程池

public void destroy() {        try {            poolExecutor.shutdown();            if (!poolExecutor.awaitTermination(AWAIT_TIMEOUT, TimeUnit.SECONDS)) {                poolExecutor.shutdownNow();            }        } catch (InterruptedException e) {            // 如果以后线程被中断,从新勾销所有工作            pool.shutdownNow();            // 放弃中断状态            Thread.currentThread().interrupt();        }    }

为了实现优雅停机的指标,咱们该当先调用shutdown办法,调用这个办法也就意味着,这个线程池不会再接管任何新的工作,然而曾经提交的工作还会继续执行。之后咱们还该当调用awaitTermination办法,这个办法能够设定线程池在敞开之前的最大超时工夫,如果在超时工夫完结之前线程池可能失常敞开则会返回true,否则,超时会返回false。通常咱们须要依据业务场景预估一个正当的超时工夫,而后调用该办法。

如果awaitTermination办法返回false,但又心愿尽可能在线程池敞开之后再做其余资源回收工作,能够思考再调用一下shutdownNow办法,此时队列中所有尚未被解决的工作都会被抛弃,同时会设置线程池中每个线程的中断标记位。shutdownNow并不保障肯定能够让正在运行的线程进行工作,除非提交给线程的工作可能正确响应中断。

鹰眼上下文参数传递

/*** 在主线程中,开启鹰眼异步模式,并将ctx传递给多线程工作**/// 避免鹰眼链路失落,须要传递RpcContext_inner ctx = EagleEye.getRpcContext();// 开启异步模式ctx.setAsyncMode(true);/*** 在线程池工作线程中,设置鹰眼rpc环境**/private void runTask() {    try {        EagleEye.setRpcContext(ctx);        // do something...    } catch (Exception e) {        log.error("requestError, params: {}", this.params, e);    } finally {        // 判断当前任务是否是主线程在运行,当Rejected策略为CallerRunsPolicy的时候,核查以后线程        if (mainThread != Thread.currentThread()) {            EagleEye.clearRpcContext();        }    }}

ThreadLocal线程变量概述

▐ 什么是ThreadLocal

ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于一般的变量,拜访线程本地变量的每个线程(通过其get或set办法)都有其本人的独立初始化的变量正本,因而ThreadLocal没有多线程竞争的问题,不须要独自进行加锁。

▐ ThreadLocal应用场景

每个线程都须要有属于本人的实例数据(线程隔离);

框架跨层数据的传递;

须要参数全局传递的简单调用链路的场景;

数据库连贯的治理,在AOP的各种嵌套调用中保障事务的一致性;

ThreadLocal的原理与实际

对于ThreadLocal而言,罕用的办法有get/set/initialValue 3个办法。

家喻户晓,在java中SimpleDateFormat有线程平安问题,为了平安地应用SimpleDateFormat,除了1)创立SimpleDateFormat局部变量;和2)加同步锁 两种计划外,咱们还能够应用3)ThreadLocal的计划:

/*** 应用 ThreadLocal 定义一个全局的 SimpleDateFormat*/private static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal = newThreadLocal<SimpleDateFormat>() {@Overrideprotected SimpleDateFormat initialValue() {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");}};// 用法String dateString = simpleDateFormatThreadLocal.get().format(calendar.getTime());

▐ ThreadLocal原理

Thread 外部保护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。

threadLocal.get()办法

/**     * Returns the value in the current thread's copy of this     * thread-local variable.  If the variable has no value for the     * current thread, it is first initialized to the value returned     * by an invocation of the {@link #initialValue} method.     *     * @return the current thread's value of this thread-local     */public T get() {    // 1. 获取以后线程    Thread t = Thread.currentThread();    // 2. 获取以后线程外部的ThreadLocalMap变量t.threadLocals;    ThreadLocalMap map = getMap(t);    // 3. 判断map是否为null    if (map != null) {        // 4. 应用以后threadLocal变量获取entry        ThreadLocalMap.Entry e = map.getEntry(this);        // 5. 判断entry是否为null        if (e != null) {            // 6.返回Entry.value            @SuppressWarnings("unchecked")            T result = (T)e.value;            return result;        }    }    // 7. 如果map/entry为null设置初始值    return setInitialValue();} /**     * Variant of set() to establish initialValue. Used instead     * of set() in case user has overridden the set() method.     *     * @return the initial value     */private T setInitialValue() {    // 1. 初始化value,如果重写就用重写后的value,默认null    T value = initialValue();    // 2. 获取以后线程    Thread t = Thread.currentThread();    // 3. 获取以后线程外部的ThreadLocalMap变量    ThreadLocalMap map = getMap(t);    if (map != null)        // 4. 不为null就set, key: threadLocal, value: value        map.set(this, value);    else        // 5. map若为null则创立ThreadLocalMap对象        createMap(t, value);    return value;}/** * Create the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the map */void createMap(Thread t, T firstValue) {    t.threadLocals = new ThreadLocalMap(this, firstValue);}/** * Construct a new map initially containing (firstKey, firstValue). * ThreadLocalMaps are constructed lazily, so we only create * one when we have at least one entry to put in it. */ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {    // 1. 初始化entry数组,size: 16    table = new Entry[INITIAL_CAPACITY];    // 2. 计算value的index    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);    // 3. 在对应index地位赋值    table[i] = new Entry(firstKey, firstValue);    // 4. entry size    size = 1;    // 5. 设置threshold: threshold = len * 2 / 3;    setThreshold(INITIAL_CAPACITY);}/** * Set the resize threshold to maintain at worst a 2/3 load factor. */private void setThreshold(int len) {    threshold = len * 2 / 3;}

threadLocal.set()办法

/**     * Sets the current thread's copy of this thread-local variable     * to the specified value.  Most subclasses will have no need to     * override this method, relying solely on the {@link #initialValue}     * method to set the values of thread-locals.     *     * @param value the value to be stored in the current thread's copy of     *        this thread-local.     */    public void set(T value) {        // 1. 获取以后线程        Thread t = Thread.currentThread();        // 2. 获取以后线程外部的ThreadLocalMap变量        ThreadLocalMap map = getMap(t);        if (map != null)            // 3. 设置value            map.set(this, value);        else            // 4. 若map为null则创立ThreadLocalMap            createMap(t, value);    }

ThreadLocalMap

从JDK源码可见,ThreadLocalMap中的Entry是弱援用类型的,这就意味着如果这个ThreadLocal只被这个Entry援用,而没有被其余对象强援用时,就会在下一次GC的时候回收掉。

static class ThreadLocalMap {        /**         * The entries in this hash map extend WeakReference, using         * its main ref field as the key (which is always a         * ThreadLocal object).  Note that null keys (i.e. entry.get()         * == null) mean that the key is no longer referenced, so the         * entry can be expunged from table.  Such entries are referred to         * as "stale entries" in the code that follows.         */        static class Entry extends WeakReference<ThreadLocal<?>> {            /** The value associated with this ThreadLocal. */            Object value;            Entry(ThreadLocal<?> k, Object v) {                super(k);                value = v;            }        }        // ...}

▐ ThreadLocal示例

鹰眼链路ThreadLocal的应用

EagleEye(鹰眼)作为全链路监控零碎在团体外部被宽泛应用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,局部相干代码如下:

EagleEyeHttpRequest eagleEyeHttpRequest = this.convertHttpRequest(httpRequest);// 1. 初始化,将traceId、rpcId等数据存储到鹰眼的ThreadLocal变量中EagleEyeRequestTracer.startTrace(eagleEyeHttpRequest, false);try {    chain.doFilter(httpRequest, httpResponse);} finally {    // 2. 清理ThreadLocal变量值    EagleEyeRequestTracer.endTrace(this.convertHttpResponse(httpResponse));}

在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace办法进行初始化,在前置入参转换后,通过startTrace重载办法将鹰眼上下文参数存入ThreadLocal中,相干代码如下:


EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace办法完结调用链,通过clear办法将ThreadLocal中的数据进行清理,相干代码实现如下:

Bad case:XX我的项目权利支付失败问题

在某权利支付原有链路中,通过app关上一级页面后能力发动权利支付申请,申请通过淘系无线网关(Mtop)后达到服务端,服务端通过mtop sdk获取以后会话信息。

在XX我的项目中,对权利支付链路进行了降级革新,在一级页面申请时,通过服务端同时发动权利支付申请。具体地,服务端在解决一级页面申请时,同时通过调用hsf/dubbo接口来进行权利支付,因而在发动rpc调用时须要携带用户以后会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而能力通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:

问题1:因ThreadLocal初始化机会不当,造成获取不到会话信息,进而导致权利支付失败;

问题2:申请实现时,因未清理ThreadLocal中的变量值,导致脏数据;

【问题1:权利支付失败剖析】

在权利支付服务中,该利用构建了一套高效和线程平安的依赖注入框架,基于该框架的业务逻辑模块通常形象为xxxModule模式,Module间为网状依赖关系,框架会按依赖关系主动调用init办法(其中,被依赖的module 的init办法先执行)。

在利用中,权利支付接口的主入口为CommonXXApplyModule类,CommonXXApplyModule依赖XXSessionModule。当申请来长期,会按依赖关系顺次调用init办法,因而XXSessionModule的init办法会优先执行;而开发同学在CommonXXApplyModule类中的init办法中通过调用recoverMtopContext()办法来冀望复原mtop上下文,因recoverMtopContext()办法的调用机会过晚,从而导致XXSessionModule模块获取不到正确的会话id等信息而导致权利支付失败。

【问题2:脏数据分析】

权利支付服务在解决申请时,若以后线程已经解决过权利支付申请,因ThreadLocal变量值未被清理,此时XXSessionModule通过mtop SDK获取会话信息时失去的是前一次申请的会话信息,从而造成脏数据。

【解决方案】

在依赖注入框架入口处AbstractGate#visit(或在XXSessionModule中)通过recoverMtopContext办法注入mtop上下文信息,并在入口办法的finally代码块清理以后申请的threadlocal变量值。

▐ 思考&小结

ThreadLocalMap中的Entry为什么要设计为弱援用类型?

若应用强援用类型,则threadlocal的援用链为:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal对象)和value;在这种场景下,只有这个线程还在运行(如线程池场景),若不调用remove办法,则该对象及关联的所有强援用对象都不会被垃圾回收器回收。

应用static和不应用static润饰threadlocal变量有和区别?

若应用static关键字进行润饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存透露,如下述示例:

public class ThreadLocalTest {    public static class ThreadLocalDemo {        private ThreadLocal<String> threadLocalHolder = new ThreadLocal();        public void setValue(String value) {            threadLocalHolder.set(value);        }        public String getValue() {            return threadLocalHolder.get();        }    }    public static void main(String[] args) {        int count = 3;        List<ThreadLocalDemo> list = new LinkedList<>();        for (int i = 0; i < count; i++) {            ThreadLocalDemo demo = new ThreadLocalDemo();            demo.setValue("demo-" + i);            list.add(demo);        }        System.out.println();    }}

在上述main办法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实际中,应用threadlocal时通常冀望一个线程只有一个threadlocal实例,因而,若不应用static润饰,冀望的语义产生了变动,同时易引起内存透露。

▐ 最佳实际

ThreadLocal变量值初始化和清理倡议成对呈现

如果不执行清理操作,则可能会呈现:

内存透露:因为ThreadLocalMap的中key是弱援用,而Value是强援用。这就导致了一个问题,ThreadLocal在没有内部对象强援用时,产生GC时弱援用Key会被回收,而Value不会回收,从而Entry外面的元素呈现<null,value>的状况。如果创立ThreadLocal的线程始终继续运行,那么这个Entry对象中的value就有可能始终得不到回收,这样可能会导致内存泄露。

脏数据:因为线程复用,在用户1申请时,可能保留了业务数据在ThreadLocal中,若不清理,则用户2的申请进来时,可能会读到用户1的数据。

倡议应用try...finally 进行清理。

ThreadLocal变量倡议应用static进行润饰

咱们在应用ThreadLocal时,通常冀望的语义是perThread,若不应用static进行润饰,则语义变为perThread-perInstance;在线程池场景下,若不必static进行润饰,创立的线程相干实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存透露(https://errorprone.info/bugpa...)。

审慎应用ThreadLocal.withInitial

在利用中,审慎应用ThreadLocal.withInitial(Supplier<? extends S> supplier)这个工厂办法创立ThreadLocal对象,一旦不同线程的ThreadLocal应用了同一个Supplier对象,那么隔离也就无从谈起了,如:

// 反例,实际上应用了共享对象obj而并未隔离,private static ThreadLocal<Obj> threadLocal = ThreadLocal.withIntitial(() -> obj);

总结

在java工程实际中,线程池和线程变量被宽泛应用,因线程池和线程变量的不当应用常常造成平安生产事变,因而,正确应用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的应用登程,简要介绍了线程池和线程变量的原理和应用实际,各开发人员可联合最佳实际和理论利用场景,正确地应用线程和线程变量,构建出稳固、高效的java应用服务。

团队介绍

咱们来自大淘宝技术-淘宝交易平台,负责淘宝业务的商品详情、购物车、下单、订单、物流、退款等从购前、购中到购后履约的根底链路相干业务。这里有百亿级别的数据、有超过百万QPS的高并发流量、有丰盛的业务场景,服务于10亿级的消费者,撑持淘宝天猫前后端各种行业的根底业务、玩法、规定及业务拓展。这里有微小的挑战等着你来,若有趣味可将简历发至lican.lc@alibaba-inc.com,期待您的退出!