之所以存在线程池是基于以下两个起因:
- 线程的创立和销毁是须要有资源耗费的,多线程环境下频繁创立、销毁线程会影响零碎性能
- 对于一个须要频繁创立工作、线程的利用来说,创立的工作数、线程数须要受到管制或治理
有了线程池,尤其是相似ThreadPoolExecutor这种能够通过参数调整其行为的线程池,能够近乎完满的解决上述两个问题。
线程池工作原理
简略来说线程池的工作原理就是:提前或者在执行工作的时候创立线程,执行完工作之后不销毁线程而是将线程偿还到线程池中,后续有工作提交上来之后就能够不再创立线程、而是由线程池中闲暇的线程执行工作。
这样一来就能够防止频繁创立和销毁线程,并且也能够控制线程池中线程的数量,同时如果提交工作的速度太快、线程池中的线程来不及执行工作的话,能够将工作放在队列中期待,等后面的工作执行实现、线程偿还到线程池中之后,再从队列中获取工作继续执行。
其实以上就是ThreadPoolExecutor性能的简略形容。
当然ThreadPoolExecutor的性能要比这个形容弱小的多也简单的多。咱们就从以下几个方面来详细分析一下ThreadPoolExecutor的性能和底层原理:
- 根本属性
- 工作队列
- 创立线程池
- 提交工作
- 执行工作
- 回绝工作
- 钩子函数
根本属性
corePoolSize&maximumPoolSize:ThreadPoolExecutor有外围线程数(corePoolSize)和最大线程数(maximumPoolSize)的概念,新工作提交后,如果以后线程数小于corePoolSize,即便线程池中有闲暇线程,ThreadPoolExecutor也会立刻创立一个线程去执行工作。如果以后线程数大于corePoolSize且小于maximumPoolSize,则只有队列满的状况下才会创立线程、否则工作入队列排队。
ctl:绑定了状态runState和线程数workerCount两个属性的AtomicInteger变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
了解ctl的工作原理是读懂ThreadPoolExecutor源码的必要前提。
先看两个辅助变量:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
COUNT_BITS是Integer.size-3=31-3=29,CAPACITY是1向左位移29位后减1,用二进制示意就是:
0001 1111 1111 1111 1111 1111 1111 1111
~CAPACITY用二进制示意就是:
1110 0000 0000 0000 0000 0000 0000 0000
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
runState有RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED等5个状态,压入ctl示意的时候全副要左移29位。意思是:ctl是依照2进制位来表白含意的,高位的3位用来示意状态runstate,低位的29位用来示意线程数workerCount。
ctl通过ctlOf函数(runState的理论状态值左移29位,挪动到高3位后,和workerCount按位或操作)失去。
runStateOf函数:ctl和~CAPACITY进行按位与,~CAPACITY的二级制示意为:
1110 0000 0000 0000 0000 0000 0000 0000
按位与操作失去的就是ctl的高3位,对应的就是runState。
workerCountOf函数:ctl和CAPACITY进行按位与,CAPACITY的二进制示意为:
0001 1111 1111 1111 1111 1111 1111 1111
按位与失去的就是低29位,对应的就是workerCount。
Keep-alive:如果以后线程数超过了corePoolSize,那么超出的线程如果闲暇工夫超过了keep-alive会被回收(terminate)。外围线程是没有超时概念也不会被回收的,然而能够通过设置allowCoreThreadTimeOut为true,使得外围线程也受到参数Keep-alive管制从而被回收。
工作队列
创立线程池的时候,通过ThreadPoolExutor构造方法指定工作队列,能够反对任何BlockingQueue。通过Executors工具创立ThreadPoolExutor的话,反对SynchronousQueue、LinkedBlockingQueue和 ArrayBlockingQueue三种阻塞队列。
工作队列是ThreadPoolExutor的重要参数,与corePoolSize和MaxPoolSize配合应用会创立出体现齐全不同的线程池:有界还是无界队列会影响到线程池接管工作的能力或体现,FIFO还是LIFO会影响到工作执行程序,等等。
创立线程池
ThreadPoolExecutor提供了4个构造方法,然而如果你不打算替换默认的ThreadFactory和RejectedExecutionHandler的话,最罕用的构造方法其实只有一个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
调用到另外一个构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
从构造方法中咱们能够看到,ThreadPoolExecutor创立后除了设置重要属性之外啥也没干,外围线程也并没有启动!
提交工作
默认状况下,ThreadPoolExecutor提交工作的过程也同时是创立线程的过程,因为缺省状况下ThreadPoolExecutor创立的时候并不创立线程。
ThreadPoolExecutor实现了Executor接口,Executor通过其惟一办法execute来提交工作。
ThreadPoolExecutor的execute办法接管一个Runable参数作为工作,依照如下逻辑实现工作的提交:
- 如果线程池的线程数量小于corePoolSize,则通过addWorker(command,true)创立并启动一个新线程来执行工作
- 否则,尝试将工作退出队列,如果胜利,再次查看线程池状态,如果线程池曾经进行运行则工作出队,回绝工作。再次查看如果以后线程数为0则调用addWorker(null,false)创立新线程
- 否则,超出外围线程数且队列满,如果尚未超出最大线程数则通过addWorker(command,false)创立新线程,否则超出最大线程数,回绝工作
源码比较简单,须要留神的是退出队列局部:
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
调用阻塞队列的非阻塞办法offer入队,想想为啥不必阻塞办法?
线程启动机会
默认状况下ThreadPoolExecutor创立之后不会启动任何线程,包含外围线程。所有线程都是在工作提交后启动。
能够通过调用prestartCoreThread()或prestartAllCoreThreads()随时启动一个或所有外围线程。
启动线程#addWorker
线程通过addWorker(Runnable firstTask, boolean core)办法启动。firstTask是该线程的第一个工作,firstTask=null示意只启动线程、无工作。core示意要启动的是外围线程、还是一般线程,用来判断线程数是否已达下限。
addWorker办法首先做必要的合法性判断:以后线程池状态,线程数是否已达下限等,满足启动条件则更新以后线程数WorkerCount。
而后创立线程对象Worker,取得锁,加锁操作:Worker退出workers缓存(worker存储以后线程池的所有尚未执行工作的线程)……操作实现之后,解锁,并启动线程。
Worker对象
Worker是实现了Runnable接口的外部类,次要属性:
- thread:线程池的线程本尊
- firstTask: 线程创立时绑定的工作,该线程如果是工作提交的时候创立的,firstTask就是被提交的工作,如果线程创立胜利,则firstTask具备优先执行权
- completedTasks:以后线程实现执行的工作数
初始化办法通过线程工厂创立一个线程:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
this作为Runnable参数传递给Thread的构造方法,线程启动的时候就回调this的run办法,所以Worker的run办法就是线程池中的线程执行工作的入口办法。
执行工作Worker.run
run办法调用了runWorker办法:
public void run() {
runWorker(this);
}
持续跟踪runWorker办法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//无限执行firstTask,firstTask为空的话通过getTask()从队列中获取task
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//调用工作执行前钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行工作
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//调用工作执行后钩子函数
afterExecute(task, thrown);
}
} finally {
//开释工作
task = null;
//以后线程执行工作数更新 +1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程池相干数据更新
processWorkerExit(w, completedAbruptly);
}
}
从源码能够看到,Worker的firstTask会失去以后线程的优先执行,因为代码中获取并执行工作的循环条件中的task的初始值就是Worker的firstTask:
while (task != null || (task = getTask()) != null) {
firstTask执行实现之后,开释工作(task=null)。线程持续运行,下次循环时会通过getTask办法从队列获取工作。这个动作相当于:线程执行完一个工作之后并没有完结或销毁,而是交还给线程池,通过getTask持续从队列领工作,领到工作后继续执行。
getTask办法
getTask办法从队列获取排队期待执行的工作。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果以后线程池已进行,或者处于SHUTDOWN状态且队列为空则返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
//是否须要限定工夫
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果超过最大线程数且队列空,或期待超时且(线程数>1或者队列空),返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果须要限时,则通过限时参数从阻塞队列获取工作,否则不须要限时的话,阻塞直到获取到工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//从队列拿到工作就返回该工作
if (r != null)
return r;
//否则没有拿到工作,设置超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
咱们发现线程池的keepAliveTime参数就是在这个getTask办法中失效的。
如果线程数小于外围线程数,并且allowCoreThreadTimeOut设置为false的话,线程不受期待工作时长的限度,则采纳阻塞队列的take办法、无限期期待直到能够从队列中获取工作。
如果线程数大于外围线程数,或者参数设置外围线程也须要受到超时管制,就会设置获取工作的限时时长为keepAliveTime,如果在keepAliveTime工夫范畴内依然没有从阻塞队列中拿到工作,则返回null。
超过keepAliveTime时长没拿到工作将导致在runWorker办法的while循环满足完结条件而退出循环:
while (task != null || (task = getTask()) != null) {
//获取工作...
} finally {
processWorkerExit(w, completedAbruptly);
}
退出循环后,调用processWorkerExit办法完结线程、退出线程池。
keepAliveTime=0L的状况
如果设置keepAliveTime=0L,并且线程数超出外围线程数,会是什么状况?
上面这一行代码交代的清清楚楚了:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
调用poll办法、等待时间为0去阻塞队列获取工作,能够去看一下阻塞队列的限定期待时长的poll办法的源码,期待时长为0则成果等同于非阻塞办法:获取不到数据立即返回null。
所以keepAliveTime=0L示意:超出外围线程数后,在执行完工作之后容许闲暇工夫为0!即:如果没有新的工作提交上来的话,只保留corePoolSize个线程持续留在线程池期待工作,其余线程立刻销毁、退出线程池。
回绝工作
采纳有界阻塞队列的线程池,在队列已满、且超出最大线程数后提交上来的工作会被回绝,回绝后的解决形式由ThreadPoolExecutor的回绝策略RejectedExecutionHandler决定。回绝策略在ThreadPoolExecutor创立时指定。次要包含:
- AbortPolicy:间接抛异样RejectedExecutionException
- CallerRunsPolicy:调用方解决,即交给提交execute办法的线程本人执行工作
- DiscardPolicy:间接扔掉该工作
- DiscardOldestPolicy:扔掉队列中等待时间最久的工作,执行当前任务
- 自定义:实现RejectedExecutionHandler接口,自定义回绝策略
钩子函数
工作执行前和工作执行后别离调用beforeExecute/afterExecute办法,这两个办法在ThreadPoolExecutor中默认都是哑实现,什么都没做。如果你既想要采纳ThreadPoolExecutor作为线程池、又想在工作执行前后做额定的动作,能够继承ThreadPoolExecutor并笼罩他的beforeExecute/afterExecute办法。
小结
线程池ThreadPoolExecutor源码剖析实现,其实从利用层面来讲,绝大部分线程池需要都能够通过ThreadPoolExecutor失去满足,而且能够利用Executors工具类创立满足各种不同场景的线程池。
Thanks a lot!
上一篇 BlockingQueue – ArrayBlockingQueue
发表回复