共计 10043 个字符,预计需要花费 26 分钟才能阅读完成。
线程池
池化技术的益处
- 升高资源耗费:能够反复利用已创立的线程升高线程创立和销毁造成的耗费。
- 进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。
- 进步线程的可管理性:线程是稀缺资源,如果无限度地创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行统一分配、调优和监控。
线程池的利用场景
- 服务器承受到大量申请时, 应用线程池技术时十分适合的, 它能够大大减少线程的创立和销毁次数, 进步服务器的工作效率
- 实际上, 在开发中, 如果须要创立 5 个以上的线程, 那么就能够应用线程池来治理
线程池的类关系图
线程池的结构器参数
参数名 | 类型 | 含意 |
---|---|---|
corePoolSize | int | 外围线程数 |
maxPoolSize | int | 最大线程数 |
keepAliveTime | long | 放弃存活工夫 |
workQueue | BlockingQueue | 工作存储队列 |
threadFactory | ThreadFactory | 当线程池须要新的线程的时候, 会应用 threadFactory 来生成新的线程 |
Handler | RejectedExecutionHandler | 因为线程池无奈承受所提交的工作的回绝策略 |
corePoolSize 和 maxPoolSize
corePoolSize
指的是外围线程数: 线程池在实现初始化后, 默认状况下, 还没有创立任何线程, 线程池会期待有工作到来时, 再创立新线程去执行工作, 直到达到外围线程数, 之后外围线程会始终放弃这个数量; 当工作数量超过外围线程数, 将工作放在阻塞队列workQueue
中, 期待外围线程闲暇后处理- 如果外围线程全副在工作中, 而且队列也满了, 线程池就会在外围线程的根底上, 额定减少一些线程, 这些新减少的线程数最大下限就是
maxPoolSize
线程创立规定
- 如果线程数小于 corePoolSize, 即便其余线程处于闲暇状态, 也会创立一个新线程 (外围线程) 来运行新工作
- 如果线程数等于(或大于)corePoolSize 但少于 maxPoolSize, 则将工作放入队列
- 如果队列已满, 并且线程数小于 maxPoolSize, 则创立一个新线程来运行工作
- 如果队列已满, 并且线程数大于或等于 maxPoolSize 则回绝该工作
增减线程的特点
- 通过设置 corePoolSize 和 maxPoolSize 为雷同数量, 就能够创立固定大小的线程池, 即便队列满了也不会在拓展线程
- 线程池心愿放弃较少的线程数, 并且只有在负载变得很大时才减少它, 这就是队列的用意
- 通过设置 maxPoolSize 为很高的只, 例如 Integer.MAX_VALUE, 能够容许线程池包容任意数量的并发工作
- 是只有在队列填满时才创立多于 corePoolSize 的线程, 所以如果应用无界队列(例如 LinkedBlockingQueue), 那么线程数就不会超过 corePoolSize
keepAliveTime
闲暇的非核心线程的存活工夫, 用于回收线程
- 如果线程池以后的线程数多于 corePoolSize, 那么如果多余的线程闲暇工夫超过 keepAliveTime, 它们就会被终止
ThreadFactory
线程工厂, 用于创立线程
- 新的线程是由 ThreadFactory 创立的, 默认应用的线程工厂是
Executors.defaultThreadFactory()
, 创立进去的线程都在同一个线程组, 领有同样的NORM_PRIORITY
优先级并且都不是守护线程; 如果本人定义 ThreadFactory, 那么就能够扭转线程名, 线程组, 优先级, 是否是守护线程等 - 通常应用默认的就能够, 源码如下:
workQueue
有三种最常见的队列类型:
- 间接交接: SynchronousQueue 无容量
- 无界队列: LinkedBlockingQueue 有限容量, 有内存溢出的危险
- 有界队列: ArrayBlockingQueue 可设置容量
ThreadPoolExecutor 的启动
/**
* - 通过 new 创立线程池时, 除非调用 prestartAllCoreThreads / prestartCoreThread 办法启动外围线程,
* - 否则即便工作队列中存在工作, 同样也不会执行.
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 20, 3L, TimeUnit.SECONDS, linkedBlockingDeque);
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
* - 启动所有外围线程,让它们无闲暇的期待工作。这将笼罩仅在执行新工作时启动外围线程的默认策略。* - 手动启动线程池.
* @return the number of threads started
*/
threadPoolExecutor.prestartAllCoreThreads();
JDK 内置线程池
线程池应该手动创立还是主动创立
手动创立, 能够让咱们更加明确线程池的容许规定, 防止资源耗尽的危险
主动创立, 也就是间接调用 JDK 封装号的构造函数, 可能会带来一些问题:
Executors.newFixedThreadPool(int nThreads)
数量固定的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
corePoolSize 和 maxPoolSize 被设置为雷同的 nThreads 参数, 并应用了无界队列 LinkedBlockingQueue, 不会拓展线程所以也没有存活工夫
当工作在队列中沉积过多, 可能就会造成 OOM
Executors.newSingleThreadExecutor()
只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Executors.newCachedThreadPool()
可缓存线程
无界限程池, 具备主动回收多余线程的性能
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最大的线程数被设置为 Integer.MAX_VALUE, 线程闲暇 60 秒后回收, 不应用队列(SynchronousQueue)
Executors.newScheduledTreadPool()
反对定时及周期性工作执行的线程池, 应用提早队列(DelayedWorkQueue)
public static void main(String[] args) {
ScheduledExecutorService threadPool =
Executors.newScheduledThreadPool(10);
// 提早 5 秒执行工作
threadPool.schedule(new EveryTaskOneThread.Task(),5, TimeUnit.SECONDS);
// 1 秒之后每个 3 秒执行一次工作
threadPool.scheduleAtFixedRate(new EveryTaskOneThread.Task(),
1, 3,TimeUnit.SECONDS);
}
所以, 还是更具业务的并发量手动创立线程池吧
JDK1.8 后退出workStealingPool
- 子工作
- 窃取
线程数量怎么设定?
- CPU 密集型(加密, 即便 hash 等) : 最佳线程数为 CPU 外围数的 1 - 2 倍左右
- 耗时 I / O 型(读写数据库, 文件, 网络传输等): 最佳线程数个别会大于 CPU 外围数很多倍, 以 JVM 线程监控显示忙碌状况为根据, 保障线程闲暇能够连接上, 参考 Brain Goetz 举荐的计算方法:
== 线程数 =CPU 外围数 * (1+ 均匀等待时间 / 均匀工作工夫))==
- 实际上最靠谱的还是通过压力测试得出适合的线程数
进行线程池的正确形式
- shutdown 执行该办法后, 线程池会将以后队列中的工作执行结束, 并且在次期间回绝新工作进入, 执行完后进行线程池
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++){executorService.execute(new ShutDownTask());
}
System.out.println(executorService.isShutdown());
Thread.sleep(1500);
executorService.shutdown();
// 是否进入进行状态
System.out.println(executorService.isShutdown());
// 回绝新工作
executorService.execute(new ShutDownTask());
// 是否真正意义上的敞开
System.out.println(executorService.isTerminated());
}
static class ShutDownTask implements Runnable {
@Override
public void run() {
try {Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
awaitTermination(timeout)
: 在一段时间内所有工作是否被执行结束
- shutdownNow 将所有线程中断, 并且队列中还未执行的工作作为一个列表返回
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++){executorService.execute(new ShutDownTask());
}
System.out.println(executorService.isShutdown());
Thread.sleep(1500);
// 发送中断信号, 并返回 runnableList
List<Runnable> runnableList =
executorService.shutdownNow();}
static class ShutDownTask implements Runnable {
@Override
public void run() {
try {Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
工作回绝策略
-
回绝机会
- 当 Executor 敞开 (shutdown) 时, 提交新工作会被回绝
- 当 Executor 对最大线程和队列容量应用有限度并且曾经饱和时
4 种回绝策略
- AbortPolicy: 默认, 间接抛出 RejectedExecutionException 回绝异样
- DiscardPolicy: 默默的把被回绝的工作抛弃
- DiscardOldestPolicy: 当有新工作时, 会抛弃工作队列中存在最久的老工作, 以腾出地位给新工作
- CallerRunsPolicy: 将被线程池回绝的工作交给调用者 (caller) 主线程去执行
钩子办法
每个工作执行前后能够减少解决(日志, 统计)
/**
* 演示每个工作执行前后都能够放钩子函数
*/
public class PauseableTreadPool extends ThreadPoolExecutor {private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
// 标记线程是否处于暂停状态
private boolean isPaused;
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
// 重写办法 before 钩子
@Override
protected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);
lock.lock();
try {while(isPaused) {unpaused.await();
}
} catch (InterruptedException e) {e.printStackTrace();
}finally {lock.unlock();
}
}
// 暂停办法
private void pause() {lock.lock();
try {isPaused = true;} finally {lock.unlock();
}
}
// 复原办法
private void resume() {lock.lock();
try{
isPaused = false;
// 唤醒全副
unpaused.signalAll();} finally {lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableTreadPool pauseableTreadPool =
new PauseableTreadPool(10, 20,
10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Runnable runnable = new Runnable() { // 线程体
@Override
public void run() {System.out.println("被执行");
try {Thread.sleep(10);
} catch (InterruptedException e) {e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++){pauseableTreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableTreadPool.pause();
System.out.println("线程池被暂停");
Thread.sleep(1500);
pauseableTreadPool.resume();
System.out.println("线程池已复原");
}
}
原理 & 源码剖析
次要剖析 ThreadPoolExecutor
线程池的组成部分
- 线程池管理器 ExecutorService 控制线程池的启动和进行
- 工作线程 ThreadPoolExecutor 中的外部类 Worker
- 工作队列 线程平安的
BlockingQueue<Runnable> workQueue;
-
工作接口(Task)
/** * Creates with given first task and thread from ThreadFactory.
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
** 线程池事项工作复用的原理 **
- 用雷同的线程执行不同的工作
**ThreadPoolExecutor 中的 execute 办法 **
/**
* 在未来的某个工夫执行给定的工作, 工作能够在新线程或池中现有的线程中执行
* 如果无奈将工作提交执行, 起因之一是执行器已敞开或因为其容量已满, 该工作由以后的 RejectedExecutionHandler 解决
* @param command 要执行的工作
*
*/
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. 如果工作线程数量少于 corePoolSize, 尝试调用 addWorker 以给定的 command 启动一个新线程
*
* 2. 如果一个工作能够胜利排队,那么咱们依然须要
* 仔细检查咱们是否应该增加线程
*(因为现有的自上次查看后死亡)或
* 自从进入此办法以来,该池已敞开。所以咱们
* 从新查看状态,并在必要时回退排队
* 进行,如果没有,则启动一个新线程。*
* 3. 如果咱们无奈将工作排队,那么咱们尝试增加一个新的
* 线程。如果失败,咱们晓得咱们曾经敞开或饱和
* 并因而回绝工作。*/
int c = ctl.get(); //ctl 记录了线程池状态和线程数
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try { // 循环获取工作执行
while (task != null || (task = getTask()) != null) {w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {beforeExecute(wt, task);
Throwable thrown = null;
try {task.run();
} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {processWorkerExit(w, completedAbruptly);
}
}
## 线程池的状态
- RUNNING: 承受型工作并解决排队工作
- SHUTDOWN: 不接受任务, 但解决排队工作
- STOP: 不承受新工作, 也不解决排队工作, 并中断正在进行的工作
- TIDYING(整洁): 所有工作都已终止, workerCount 为 0 时, 线程会转换到 TIDYING 状态, 并将运行 terminate() 钩子办法
- TERMINATED: terminate() 运行实现
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
## 应用线程池的留神点
- 防止工作的沉积
FixedThreadPool SingleThreadExecutor
工作队列长度过大, 可能会沉积大量的申请, 从而导致 OOM.
- 防止线程数适度减少
CachedThreadPool ScheduledThreadPool
容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。- 排查线程透露
线程曾经执行结束, 却没有正确的被回收, 往往是工作的逻辑问题