之所以存在线程池是基于以下两个起因:
- 线程的创立和销毁是须要有资源耗费的,多线程环境下频繁创立、销毁线程会影响零碎性能
- 对于一个须要频繁创立工作、线程的利用来说,创立的工作数、线程数须要受到管制或治理
有了线程池,尤其是相似ThreadPoolExecutor这种能够通过参数调整其行为的线程池,能够近乎完满的解决上述两个问题。
线程池工作原理
简略来说线程池的工作原理就是:提前或者在执行工作的时候创立线程,执行完工作之后不销毁线程而是将线程偿还到线程池中,后续有工作提交上来之后就能够不再创立线程、而是由线程池中闲暇的线程执行工作。
这样一来就能够防止频繁创立和销毁线程,并且也能够控制线程池中线程的数量,同时如果提交工作的速度太快、线程池中的线程来不及执行工作的话,能够将工作放在队列中期待,等后面的工作执行实现、线程偿还到线程池中之后,再从队列中获取工作继续执行。
其实以上就是ThreadPoolExecutor性能的简略形容。
当然ThreadPoolExecutor的性能要比这个形容弱小的多也简单的多。咱们就从以下几个方面来详细分析一下ThreadPoolExecutor的性能和底层原理:
- 根本属性
- 工作队列
- 创立线程池
- 提交工作
- 执行工作
- 回绝工作
- 钩子函数
根本属性
corePoolSize&maximumPoolSize:ThreadPoolExecutor有外围线程数(corePoolSize)和最大线程数(maximumPoolSize)的概念,新工作提交后,如果以后线程数小于corePoolSize,即便线程池中有闲暇线程,ThreadPoolExecutor也会立刻创立一个线程去执行工作。如果以后线程数大于corePoolSize且小于maximumPoolSize,则只有队列满的状况下才会创立线程、否则工作入队列排队。
ctl:绑定了状态runState和线程数workerCount两个属性的AtomicInteger变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
了解ctl的工作原理是读懂ThreadPoolExecutor源码的必要前提。
先看两个辅助变量:
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;
COUNT_BITS是Integer.size-3=31-3=29,CAPACITY是1向左位移29位后减1,用二进制示意就是:
0001 1111 1111 1111 1111 1111 1111 1111
~CAPACITY用二进制示意就是:
1110 0000 0000 0000 0000 0000 0000 0000
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
runState有RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED等5个状态,压入ctl示意的时候全副要左移29位。意思是:ctl是依照2进制位来表白含意的,高位的3位用来示意状态runstate,低位的29位用来示意线程数workerCount。
ctl通过ctlOf函数(runState的理论状态值左移29位,挪动到高3位后,和workerCount按位或操作)失去。
runStateOf函数:ctl和~CAPACITY进行按位与,~CAPACITY的二级制示意为:
1110 0000 0000 0000 0000 0000 0000 0000
按位与操作失去的就是ctl的高3位,对应的就是runState。
workerCountOf函数:ctl和CAPACITY进行按位与,CAPACITY的二进制示意为:
0001 1111 1111 1111 1111 1111 1111 1111
按位与失去的就是低29位,对应的就是workerCount。
Keep-alive:如果以后线程数超过了corePoolSize,那么超出的线程如果闲暇工夫超过了keep-alive会被回收(terminate)。外围线程是没有超时概念也不会被回收的,然而能够通过设置allowCoreThreadTimeOut为true,使得外围线程也受到参数Keep-alive管制从而被回收。
工作队列
创立线程池的时候,通过ThreadPoolExutor构造方法指定工作队列,能够反对任何BlockingQueue。通过Executors工具创立ThreadPoolExutor的话,反对SynchronousQueue、LinkedBlockingQueue和 ArrayBlockingQueue三种阻塞队列。
工作队列是ThreadPoolExutor的重要参数,与corePoolSize和MaxPoolSize配合应用会创立出体现齐全不同的线程池:有界还是无界队列会影响到线程池接管工作的能力或体现,FIFO还是LIFO会影响到工作执行程序,等等。
创立线程池
ThreadPoolExecutor提供了4个构造方法,然而如果你不打算替换默认的ThreadFactory和RejectedExecutionHandler的话,最罕用的构造方法其实只有一个:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
调用到另外一个构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
从构造方法中咱们能够看到,ThreadPoolExecutor创立后除了设置重要属性之外啥也没干,外围线程也并没有启动!
提交工作
默认状况下,ThreadPoolExecutor提交工作的过程也同时是创立线程的过程,因为缺省状况下ThreadPoolExecutor创立的时候并不创立线程。
ThreadPoolExecutor实现了Executor接口,Executor通过其惟一办法execute来提交工作。
ThreadPoolExecutor的execute办法接管一个Runable参数作为工作,依照如下逻辑实现工作的提交:
- 如果线程池的线程数量小于corePoolSize,则通过addWorker(command,true)创立并启动一个新线程来执行工作
- 否则,尝试将工作退出队列,如果胜利,再次查看线程池状态,如果线程池曾经进行运行则工作出队,回绝工作。再次查看如果以后线程数为0则调用addWorker(null,false)创立新线程
- 否则,超出外围线程数且队列满,如果尚未超出最大线程数则通过addWorker(command,false)创立新线程,否则超出最大线程数,回绝工作
源码比较简单,须要留神的是退出队列局部:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
调用阻塞队列的非阻塞办法offer入队,想想为啥不必阻塞办法?
线程启动机会
默认状况下ThreadPoolExecutor创立之后不会启动任何线程,包含外围线程。所有线程都是在工作提交后启动。
能够通过调用prestartCoreThread()或prestartAllCoreThreads()随时启动一个或所有外围线程。
启动线程#addWorker
线程通过addWorker(Runnable firstTask, boolean core)办法启动。firstTask是该线程的第一个工作,firstTask=null示意只启动线程、无工作。core示意要启动的是外围线程、还是一般线程,用来判断线程数是否已达下限。
addWorker办法首先做必要的合法性判断:以后线程池状态,线程数是否已达下限等,满足启动条件则更新以后线程数WorkerCount。
而后创立线程对象Worker,取得锁,加锁操作:Worker退出workers缓存(worker存储以后线程池的所有尚未执行工作的线程)......操作实现之后,解锁,并启动线程。
Worker对象
Worker是实现了Runnable接口的外部类,次要属性:
- thread:线程池的线程本尊
- firstTask: 线程创立时绑定的工作,该线程如果是工作提交的时候创立的,firstTask就是被提交的工作,如果线程创立胜利,则firstTask具备优先执行权
- completedTasks:以后线程实现执行的工作数
初始化办法通过线程工厂创立一个线程:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
this作为Runnable参数传递给Thread的构造方法,线程启动的时候就回调this的run办法,所以Worker的run办法就是线程池中的线程执行工作的入口办法。
执行工作Worker.run
run办法调用了runWorker办法:
public void run() { runWorker(this); }
持续跟踪runWorker办法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //无限执行firstTask,firstTask为空的话通过getTask()从队列中获取task while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //调用工作执行前钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { //执行工作 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //调用工作执行后钩子函数 afterExecute(task, thrown); } } finally { //开释工作 task = null; //以后线程执行工作数更新 +1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //线程池相干数据更新 processWorkerExit(w, completedAbruptly); } }
从源码能够看到,Worker的firstTask会失去以后线程的优先执行,因为代码中获取并执行工作的循环条件中的task的初始值就是Worker的firstTask:
while (task != null || (task = getTask()) != null) {
firstTask执行实现之后,开释工作(task=null)。线程持续运行,下次循环时会通过getTask办法从队列获取工作。这个动作相当于:线程执行完一个工作之后并没有完结或销毁,而是交还给线程池,通过getTask持续从队列领工作,领到工作后继续执行。
getTask办法
getTask办法从队列获取排队期待执行的工作。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. //如果以后线程池已进行,或者处于SHUTDOWN状态且队列为空则返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //线程数 int wc = workerCountOf(c); // Are workers subject to culling? //是否须要限定工夫 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果超过最大线程数且队列空,或期待超时且(线程数>1或者队列空),返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果须要限时,则通过限时参数从阻塞队列获取工作,否则不须要限时的话,阻塞直到获取到工作 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //从队列拿到工作就返回该工作 if (r != null) return r; //否则没有拿到工作,设置超时 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
咱们发现线程池的keepAliveTime参数就是在这个getTask办法中失效的。
如果线程数小于外围线程数,并且allowCoreThreadTimeOut设置为false的话,线程不受期待工作时长的限度,则采纳阻塞队列的take办法、无限期期待直到能够从队列中获取工作。
如果线程数大于外围线程数,或者参数设置外围线程也须要受到超时管制,就会设置获取工作的限时时长为keepAliveTime,如果在keepAliveTime工夫范畴内依然没有从阻塞队列中拿到工作,则返回null。
超过keepAliveTime时长没拿到工作将导致在runWorker办法的while循环满足完结条件而退出循环:
while (task != null || (task = getTask()) != null) { //获取工作... } finally { processWorkerExit(w, completedAbruptly); }
退出循环后,调用processWorkerExit办法完结线程、退出线程池。
keepAliveTime=0L的状况
如果设置keepAliveTime=0L,并且线程数超出外围线程数,会是什么状况?
上面这一行代码交代的清清楚楚了:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
调用poll办法、等待时间为0去阻塞队列获取工作,能够去看一下阻塞队列的限定期待时长的poll办法的源码,期待时长为0则成果等同于非阻塞办法:获取不到数据立即返回null。
所以keepAliveTime=0L示意:超出外围线程数后,在执行完工作之后容许闲暇工夫为0!即:如果没有新的工作提交上来的话,只保留corePoolSize个线程持续留在线程池期待工作,其余线程立刻销毁、退出线程池。
回绝工作
采纳有界阻塞队列的线程池,在队列已满、且超出最大线程数后提交上来的工作会被回绝,回绝后的解决形式由ThreadPoolExecutor的回绝策略RejectedExecutionHandler决定。回绝策略在ThreadPoolExecutor创立时指定。次要包含:
- AbortPolicy:间接抛异样RejectedExecutionException
- CallerRunsPolicy:调用方解决,即交给提交execute办法的线程本人执行工作
- DiscardPolicy:间接扔掉该工作
- DiscardOldestPolicy:扔掉队列中等待时间最久的工作,执行当前任务
- 自定义:实现RejectedExecutionHandler接口,自定义回绝策略
钩子函数
工作执行前和工作执行后别离调用beforeExecute/afterExecute办法,这两个办法在ThreadPoolExecutor中默认都是哑实现,什么都没做。如果你既想要采纳ThreadPoolExecutor作为线程池、又想在工作执行前后做额定的动作,能够继承ThreadPoolExecutor并笼罩他的beforeExecute/afterExecute办法。
小结
线程池ThreadPoolExecutor源码剖析实现,其实从利用层面来讲,绝大部分线程池需要都能够通过ThreadPoolExecutor失去满足,而且能够利用Executors工具类创立满足各种不同场景的线程池。
Thanks a lot!
上一篇 BlockingQueue - ArrayBlockingQueue