线程池
池化技术的益处
- 升高资源耗费:能够反复利用已创立的线程升高线程创立和销毁造成的耗费。
- 进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。
- 进步线程的可管理性:线程是稀缺资源,如果无限度地创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行统一分配、调优和监控。
线程池的利用场景
- 服务器承受到大量申请时,应用线程池技术时十分适合的,它能够大大减少线程的创立和销毁次数,进步服务器的工作效率
- 实际上,在开发中,如果须要创立5个以上的线程,那么就能够应用线程池来治理
线程池的类关系图
线程池的结构器参数
参数名 | 类型 | 含意 |
---|---|---|
corePoolSize | int | 外围线程数 |
maxPoolSize | int | 最大线程数 |
keepAliveTime | long | 放弃存活工夫 |
workQueue | BlockingQueue | 工作存储队列 |
threadFactory | ThreadFactory | 当线程池须要新的线程的时候,会应用threadFactory来生成新的线程 |
Handler | RejectedExecutionHandler | 因为线程池无奈承受所提交的工作的回绝策略 |
corePoolSize和maxPoolSize
corePoolSize
指的是外围线程数:线程池在实现初始化后,默认状况下,还没有创立任何线程,线程池会期待有工作到来时,再创立新线程去执行工作,直到达到外围线程数,之后外围线程会始终放弃这个数量;当工作数量超过外围线程数,将工作放在阻塞队列workQueue
中,期待外围线程闲暇后处理- 如果外围线程全副在工作中,而且队列也满了,线程池就会在外围线程的根底上,额定减少一些线程,这些新减少的线程数最大下限就是
maxPoolSize
线程创立规定
- 如果线程数小于corePoolSize, 即便其余线程处于闲暇状态,也会创立一个新线程(外围线程)来运行新工作
- 如果线程数等于(或大于)corePoolSize但少于maxPoolSize,则将工作放入队列
- 如果队列已满,并且线程数小于maxPoolSize,则创立一个新线程来运行工作
- 如果队列已满,并且线程数大于或等于maxPoolSize则回绝该工作
增减线程的特点
- 通过设置corePoolSize和maxPoolSize为雷同数量,就能够创立固定大小的线程池,即便队列满了也不会在拓展线程
- 线程池心愿放弃较少的线程数,并且只有在负载变得很大时才减少它,这就是队列的用意
- 通过设置maxPoolSize为很高的只,例如Integer.MAX_VALUE,能够容许线程池包容任意数量的并发工作
- 是只有在队列填满时才创立多于corePoolSize的线程,所以如果应用无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize
keepAliveTime
闲暇的非核心线程的存活工夫,用于回收线程
- 如果线程池以后的线程数多于corePoolSize,那么如果多余的线程闲暇工夫超过keepAliveTime,它们就会被终止
ThreadFactory
线程工厂,用于创立线程
- 新的线程是由ThreadFactory创立的,默认应用的线程工厂是
Executors.defaultThreadFactory()
,创立进去的线程都在同一个线程组,领有同样的NORM_PRIORITY
优先级并且都不是守护线程;如果本人定义ThreadFactory,那么就能够扭转线程名,线程组,优先级,是否是守护线程等 - 通常应用默认的就能够,源码如下:
workQueue
有三种最常见的队列类型:
- 间接交接: SynchronousQueue 无容量
- 无界队列: LinkedBlockingQueue 有限容量,有内存溢出的危险
- 有界队列: ArrayBlockingQueue 可设置容量
ThreadPoolExecutor的启动
/*** -通过 new 创立线程池时, 除非调用 prestartAllCoreThreads / prestartCoreThread 办法启动外围线程,* -否则即便工作队列中存在工作,同样也不会执行.*/ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 20, 3L, TimeUnit.SECONDS, linkedBlockingDeque); /** * Starts all core threads, causing them to idly wait for work. This* overrides the default policy of starting core threads only when* new tasks are executed.* -启动所有外围线程,让它们无闲暇的期待工作。 这将笼罩仅在执行新工作时启动外围线程的默认策略。* -手动启动线程池.* @return the number of threads started*/threadPoolExecutor.prestartAllCoreThreads();
JDK内置线程池
线程池应该手动创立还是主动创立
手动创立,能够让咱们更加明确线程池的容许规定,防止资源耗尽的危险
主动创立,也就是间接调用JDK封装号的构造函数,可能会带来一些问题:
Executors.newFixedThreadPool(int nThreads)
数量固定的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
corePoolSize和maxPoolSize被设置为雷同的nThreads参数,并应用了无界队列LinkedBlockingQueue,不会拓展线程所以也没有存活工夫
当工作在队列中沉积过多,可能就会造成OOM
Executors.newSingleThreadExecutor()
只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Executors.newCachedThreadPool()
可缓存线程
无界限程池,具备主动回收多余线程的性能
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
最大的线程数被设置为Integer.MAX_VALUE,线程闲暇60秒后回收,不应用队列(SynchronousQueue)
Executors.newScheduledTreadPool()
反对定时及周期性工作执行的线程池,应用提早队列(DelayedWorkQueue)
public static void main(String[] args) { ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10); //提早5秒执行工作 threadPool.schedule(new EveryTaskOneThread.Task(),5, TimeUnit.SECONDS); //1秒之后每个3秒执行一次工作 threadPool.scheduleAtFixedRate(new EveryTaskOneThread.Task(), 1, 3,TimeUnit.SECONDS); }
所以,还是更具业务的并发量手动创立线程池吧
JDK1.8后退出workStealingPool
- 子工作
- 窃取
线程数量怎么设定?
- CPU密集型(加密,即便hash等) : 最佳线程数为CPU外围数的1-2倍左右
- 耗时I/O型(读写数据库,文件,网络传输等): 最佳线程数个别会大于CPU外围数很多倍,以JVM线程监控显示忙碌状况为根据,保障线程闲暇能够连接上,参考Brain Goetz举荐的计算方法:
==线程数=CPU外围数 * (1+均匀等待时间/均匀工作工夫))==
- 实际上最靠谱的还是通过压力测试得出适合的线程数
进行线程池的正确形式
- shutdown 执行该办法后,线程池会将以后队列中的工作执行结束,并且在次期间回绝新工作进入,执行完后进行线程池
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++){ executorService.execute(new ShutDownTask()); } System.out.println(executorService.isShutdown()); Thread.sleep(1500); executorService.shutdown(); //是否进入进行状态 System.out.println(executorService.isShutdown()); //回绝新工作 executorService.execute(new ShutDownTask()); //是否真正意义上的敞开 System.out.println(executorService.isTerminated()); } static class ShutDownTask implements Runnable { @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }
awaitTermination(timeout)
:在一段时间内所有工作是否被执行结束
- shutdownNow 将所有线程中断,并且队列中还未执行的工作作为一个列表返回
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++){ executorService.execute(new ShutDownTask()); } System.out.println(executorService.isShutdown()); Thread.sleep(1500); //发送中断信号,并返回runnableList List<Runnable> runnableList = executorService.shutdownNow(); } static class ShutDownTask implements Runnable { @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "被中断了"); } } }
工作回绝策略
回绝机会
- 当Executor敞开(shutdown)时,提交新工作会被回绝
- 当Executor对最大线程和队列容量应用有限度并且曾经饱和时
4种回绝策略
- AbortPolicy: 默认,间接抛出RejectedExecutionException回绝异样
- DiscardPolicy: 默默的把被回绝的工作抛弃
- DiscardOldestPolicy: 当有新工作时,会抛弃工作队列中存在最久的老工作,以腾出地位给新工作
- CallerRunsPolicy: 将被线程池回绝的工作交给调用者(caller)主线程去执行
钩子办法
每个工作执行前后能够减少解决(日志,统计)
/** * 演示每个工作执行前后都能够放钩子函数 */public class PauseableTreadPool extends ThreadPoolExecutor { private final ReentrantLock lock = new ReentrantLock(); private Condition unpaused = lock.newCondition(); //标记线程是否处于暂停状态 private boolean isPaused; public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } //重写办法 before钩子 @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); lock.lock(); try { while(isPaused) { unpaused.await(); } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } //暂停办法 private void pause() { lock.lock(); try { isPaused = true; } finally { lock.unlock(); } } //复原办法 private void resume() { lock.lock(); try{ isPaused = false; //唤醒全副 unpaused.signalAll(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { PauseableTreadPool pauseableTreadPool = new PauseableTreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); Runnable runnable = new Runnable() { //线程体 @Override public void run() { System.out.println("被执行"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < 10000; i++){ pauseableTreadPool.execute(runnable); } Thread.sleep(1500); pauseableTreadPool.pause(); System.out.println("线程池被暂停"); Thread.sleep(1500); pauseableTreadPool.resume(); System.out.println("线程池已复原"); }}
原理&源码剖析
次要剖析ThreadPoolExecutor
线程池的组成部分
- 线程池管理器 ExecutorService控制线程池的启动和进行
- 工作线程 ThreadPoolExecutor中的外部类Worker
- 工作队列 线程平安的
BlockingQueue<Runnable> workQueue;
工作接口(Task)
/** * Creates with given first task and thread from ThreadFactory.
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);
}
**线程池事项工作复用的原理**- 用雷同的线程执行不同的工作**ThreadPoolExecutor中的execute办法**
/** * 在未来的某个工夫执行给定的工作,工作能够在新线程或池中现有的线程中执行 * 如果无奈将工作提交执行,起因之一是执行器已敞开或因为其容量已满,该工作由以后的 RejectedExecutionHandler 解决 * @param command 要执行的工作 * */public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. 如果工作线程数量少于corePoolSize,尝试调用addWorker以给定的command启动一个新线程 * * 2.如果一个工作能够胜利排队,那么咱们依然须要 * 仔细检查咱们是否应该增加线程 * (因为现有的自上次查看后死亡)或 * 自从进入此办法以来,该池已敞开。 所以咱们 * 从新查看状态,并在必要时回退排队 * 进行,如果没有,则启动一个新线程。 * * 3.如果咱们无奈将工作排队,那么咱们尝试增加一个新的 * 线程。 如果失败,咱们晓得咱们曾经敞开或饱和 * 并因而回绝工作。 */ int c = ctl.get(); //ctl记录了线程池状态和线程数 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //循环获取工作执行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}
## 线程池的状态- RUNNING: 承受型工作并解决排队工作- SHUTDOWN: 不接受任务,但解决排队工作- STOP: 不承受新工作,也不解决排队工作,并中断正在进行的工作- TIDYING(整洁): 所有工作都已终止, workerCount为0时,线程会转换到TIDYING状态, 并将运行 terminate() 钩子办法- TERMINATED: terminate() 运行实现
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
## 应用线程池的留神点- 防止工作的沉积 FixedThreadPool SingleThreadExecutor 工作队列长度过大, 可能会沉积大量的申请, 从而导致OOM.- 防止线程数适度减少 CachedThreadPool ScheduledThreadPool 容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。 - 排查线程透露 线程曾经执行结束,却没有正确的被回收,往往是工作的逻辑问题