线程池
池化技术的益处
- 升高资源耗费:能够反复利用已创立的线程升高线程创立和销毁造成的耗费。
- 进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。
- 进步线程的可管理性:线程是稀缺资源,如果无限度地创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行统一分配、调优和监控。
线程池的利用场景
- 服务器承受到大量申请时,应用线程池技术时十分适合的,它能够大大减少线程的创立和销毁次数,进步服务器的工作效率
- 实际上,在开发中,如果须要创立5个以上的线程,那么就能够应用线程池来治理
线程池的类关系图
线程池的结构器参数
参数名 | 类型 | 含意 |
---|---|---|
corePoolSize | int | 外围线程数 |
maxPoolSize | int | 最大线程数 |
keepAliveTime | long | 放弃存活工夫 |
workQueue | BlockingQueue | 工作存储队列 |
threadFactory | ThreadFactory | 当线程池须要新的线程的时候,会应用threadFactory来生成新的线程 |
Handler | RejectedExecutionHandler | 因为线程池无奈承受所提交的工作的回绝策略 |
corePoolSize和maxPoolSize
corePoolSize
指的是外围线程数:线程池在实现初始化后,默认状况下,还没有创立任何线程,线程池会期待有工作到来时,再创立新线程去执行工作,直到达到外围线程数,之后外围线程会始终放弃这个数量;当工作数量超过外围线程数,将工作放在阻塞队列workQueue
中,期待外围线程闲暇后处理- 如果外围线程全副在工作中,而且队列也满了,线程池就会在外围线程的根底上,额定减少一些线程,这些新减少的线程数最大下限就是
maxPoolSize
线程创立规定
- 如果线程数小于corePoolSize, 即便其余线程处于闲暇状态,也会创立一个新线程(外围线程)来运行新工作
- 如果线程数等于(或大于)corePoolSize但少于maxPoolSize,则将工作放入队列
- 如果队列已满,并且线程数小于maxPoolSize,则创立一个新线程来运行工作
- 如果队列已满,并且线程数大于或等于maxPoolSize则回绝该工作
增减线程的特点
- 通过设置corePoolSize和maxPoolSize为雷同数量,就能够创立固定大小的线程池,即便队列满了也不会在拓展线程
- 线程池心愿放弃较少的线程数,并且只有在负载变得很大时才减少它,这就是队列的用意
- 通过设置maxPoolSize为很高的只,例如Integer.MAX_VALUE,能够容许线程池包容任意数量的并发工作
- 是只有在队列填满时才创立多于corePoolSize的线程,所以如果应用无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize
keepAliveTime
闲暇的非核心线程的存活工夫,用于回收线程
- 如果线程池以后的线程数多于corePoolSize,那么如果多余的线程闲暇工夫超过keepAliveTime,它们就会被终止
ThreadFactory
线程工厂,用于创立线程
- 新的线程是由ThreadFactory创立的,默认应用的线程工厂是
Executors.defaultThreadFactory()
,创立进去的线程都在同一个线程组,领有同样的NORM_PRIORITY
优先级并且都不是守护线程;如果本人定义ThreadFactory,那么就能够扭转线程名,线程组,优先级,是否是守护线程等 - 通常应用默认的就能够,源码如下:
workQueue
有三种最常见的队列类型:
- 间接交接: SynchronousQueue 无容量
- 无界队列: LinkedBlockingQueue 有限容量,有内存溢出的危险
- 有界队列: ArrayBlockingQueue 可设置容量
ThreadPoolExecutor的启动
/**
* -通过 new 创立线程池时, 除非调用 prestartAllCoreThreads / prestartCoreThread 办法启动外围线程,
* -否则即便工作队列中存在工作,同样也不会执行.
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 20, 3L, TimeUnit.SECONDS, linkedBlockingDeque);
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
* -启动所有外围线程,让它们无闲暇的期待工作。 这将笼罩仅在执行新工作时启动外围线程的默认策略。
* -手动启动线程池.
* @return the number of threads started
*/
threadPoolExecutor.prestartAllCoreThreads();
JDK内置线程池
线程池应该手动创立还是主动创立
手动创立,能够让咱们更加明确线程池的容许规定,防止资源耗尽的危险
主动创立,也就是间接调用JDK封装号的构造函数,可能会带来一些问题:
Executors.newFixedThreadPool(int nThreads)
数量固定的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
corePoolSize和maxPoolSize被设置为雷同的nThreads参数,并应用了无界队列LinkedBlockingQueue,不会拓展线程所以也没有存活工夫
当工作在队列中沉积过多,可能就会造成OOM
Executors.newSingleThreadExecutor()
只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Executors.newCachedThreadPool()
可缓存线程
无界限程池,具备主动回收多余线程的性能
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最大的线程数被设置为Integer.MAX_VALUE,线程闲暇60秒后回收,不应用队列(SynchronousQueue)
Executors.newScheduledTreadPool()
反对定时及周期性工作执行的线程池,应用提早队列(DelayedWorkQueue)
public static void main(String[] args) {
ScheduledExecutorService threadPool =
Executors.newScheduledThreadPool(10);
//提早5秒执行工作
threadPool.schedule(new EveryTaskOneThread.Task(),5, TimeUnit.SECONDS);
//1秒之后每个3秒执行一次工作
threadPool.scheduleAtFixedRate(new EveryTaskOneThread.Task(),
1, 3,TimeUnit.SECONDS);
}
所以,还是更具业务的并发量手动创立线程池吧
JDK1.8后退出workStealingPool
- 子工作
- 窃取
线程数量怎么设定?
- CPU密集型(加密,即便hash等) : 最佳线程数为CPU外围数的1-2倍左右
- 耗时I/O型(读写数据库,文件,网络传输等): 最佳线程数个别会大于CPU外围数很多倍,以JVM线程监控显示忙碌状况为根据,保障线程闲暇能够连接上,参考Brain Goetz举荐的计算方法:
==线程数=CPU外围数 * (1+均匀等待时间/均匀工作工夫))==
- 实际上最靠谱的还是通过压力测试得出适合的线程数
进行线程池的正确形式
- shutdown 执行该办法后,线程池会将以后队列中的工作执行结束,并且在次期间回绝新工作进入,执行完后进行线程池
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++){
executorService.execute(new ShutDownTask());
}
System.out.println(executorService.isShutdown());
Thread.sleep(1500);
executorService.shutdown();
//是否进入进行状态
System.out.println(executorService.isShutdown());
//回绝新工作
executorService.execute(new ShutDownTask());
//是否真正意义上的敞开
System.out.println(executorService.isTerminated());
}
static class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
awaitTermination(timeout)
:在一段时间内所有工作是否被执行结束
- shutdownNow 将所有线程中断,并且队列中还未执行的工作作为一个列表返回
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++){
executorService.execute(new ShutDownTask());
}
System.out.println(executorService.isShutdown());
Thread.sleep(1500);
//发送中断信号,并返回runnableList
List<Runnable> runnableList =
executorService.shutdownNow();
}
static class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
工作回绝策略
-
回绝机会
- 当Executor敞开(shutdown)时,提交新工作会被回绝
- 当Executor对最大线程和队列容量应用有限度并且曾经饱和时
4种回绝策略
- AbortPolicy: 默认,间接抛出RejectedExecutionException回绝异样
- DiscardPolicy: 默默的把被回绝的工作抛弃
- DiscardOldestPolicy: 当有新工作时,会抛弃工作队列中存在最久的老工作,以腾出地位给新工作
- CallerRunsPolicy: 将被线程池回绝的工作交给调用者(caller)主线程去执行
钩子办法
每个工作执行前后能够减少解决(日志,统计)
/**
* 演示每个工作执行前后都能够放钩子函数
*/
public class PauseableTreadPool extends ThreadPoolExecutor {
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
//标记线程是否处于暂停状态
private boolean isPaused;
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
//重写办法 before钩子
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while(isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
//暂停办法
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
//复原办法
private void resume() {
lock.lock();
try{
isPaused = false;
//唤醒全副
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableTreadPool pauseableTreadPool =
new PauseableTreadPool(10, 20,
10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Runnable runnable = new Runnable() { //线程体
@Override
public void run() {
System.out.println("被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++){
pauseableTreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableTreadPool.pause();
System.out.println("线程池被暂停");
Thread.sleep(1500);
pauseableTreadPool.resume();
System.out.println("线程池已复原");
}
}
原理&源码剖析
次要剖析ThreadPoolExecutor
线程池的组成部分
- 线程池管理器 ExecutorService控制线程池的启动和进行
- 工作线程 ThreadPoolExecutor中的外部类Worker
- 工作队列 线程平安的
BlockingQueue<Runnable> workQueue;
-
工作接口(Task)
/** * Creates with given first task and thread from ThreadFactory.
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
**线程池事项工作复用的原理**
- 用雷同的线程执行不同的工作
**ThreadPoolExecutor中的execute办法**
/**
* 在未来的某个工夫执行给定的工作,工作能够在新线程或池中现有的线程中执行
* 如果无奈将工作提交执行,起因之一是执行器已敞开或因为其容量已满,该工作由以后的 RejectedExecutionHandler 解决
* @param command 要执行的工作
*
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. 如果工作线程数量少于corePoolSize,尝试调用addWorker以给定的command启动一个新线程
*
* 2.如果一个工作能够胜利排队,那么咱们依然须要
* 仔细检查咱们是否应该增加线程
* (因为现有的自上次查看后死亡)或
* 自从进入此办法以来,该池已敞开。 所以咱们
* 从新查看状态,并在必要时回退排队
* 进行,如果没有,则启动一个新线程。
*
* 3.如果咱们无奈将工作排队,那么咱们尝试增加一个新的
* 线程。 如果失败,咱们晓得咱们曾经敞开或饱和
* 并因而回绝工作。
*/
int c = ctl.get(); //ctl记录了线程池状态和线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try { //循环获取工作执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
## 线程池的状态
- RUNNING: 承受型工作并解决排队工作
- SHUTDOWN: 不接受任务,但解决排队工作
- STOP: 不承受新工作,也不解决排队工作,并中断正在进行的工作
- TIDYING(整洁): 所有工作都已终止, workerCount为0时,线程会转换到TIDYING状态, 并将运行 terminate() 钩子办法
- TERMINATED: terminate() 运行实现
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
## 应用线程池的留神点
- 防止工作的沉积
FixedThreadPool SingleThreadExecutor
工作队列长度过大, 可能会沉积大量的申请, 从而导致OOM.
- 防止线程数适度减少
CachedThreadPool ScheduledThreadPool
容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。
- 排查线程透露
线程曾经执行结束,却没有正确的被回收,往往是工作的逻辑问题
发表回复