线程池是什么
线程池(Thread Pool)是一种基于池化思维治理线程的工具。
线程池的作用
升高资源耗费 :通过池化技术反复利用已创立的线程,升高线程创立和销毁造成的损耗。
进步响应速度 :工作达到时,无需期待线程创立即可立刻执行。
进步线程的可管理性:线程是稀缺资源,如果无限度创立,不仅会耗费系统资源,还会因为线程的不合理散布导致资源调度失衡,升高零碎的稳定性。应用线程池能够进行对立的调配、调优和监控。
总体设计
ThreadPoolExecutor 实现的顶层接口是 Executor,顶层接口 Executor 提供了一种思维:将工作提交和工作执行进行解耦。用户无需关注如何创立线程,如何调度线程来执行工作,用户只需提供 Runnable 对象,将工作的运行逻辑提交到执行器 (Executor) 中,由 Executor 框架实现线程的调配和工作的执行局部。ExecutorService 接口减少了一些能力:(1)裁减执行工作的能力,补充能够为一个或一批异步工作生成 Future 的办法;(2)提供了管控线程池的办法,比方进行线程池的运行。AbstractExecutorService 则是下层的抽象类,将执行工作的流程串联了起来,保障上层的实现只需关注一个执行工作的办法即可。最上层的实现类 ThreadPoolExecutor 实现最简单的运行局部,ThreadPoolExecutor 将会一方面保护本身的生命周期,另一方面同时治理线程和工作,使两者良好的联合从而执行并行任务。
ThreadPoolExecutor
ThreadPoolExecutor 继承了类 AbstractExecutorService。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
- corePoolSize 外围线程数
- maximumPoolSize 最大线程数
- keepAliveTime 线程池中非核心线程闲暇的存活工夫大小
- unit keepAliveTime 的工夫单位
- workQueue 线程工作缓冲队列
- threadFactory 线程工厂用来创立线程
- handle 当回绝解决工作时的策略
线程池的运行状态
ThreadPoolExecutor 的运行状态有 5 种,别离为:
其生命周期转换如下入所示:
工作执行
- 如果 workerCount < corePoolSize,则创立并启动一个线程来执行新提交的工作。
- 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将工作增加到该阻塞队列中。
- 如果 corePoolSize <= workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创立并启动一个线程来执行新提交的工作。
- 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则依据回绝策略来解决该工作, 默认的解决形式是间接抛异样
工作队列
LinkedBlockingQueue 一个由链表构造组成的有界队列,此队列依照先进先出 (FIFO) 的准则对元素进行排序。此队列的默认长度为 Integer.MAX_VALUE,如果大量新工作在队列中沉积可能最终导致 OOM。
ArrayBlockingQueue 一个数组实现的有界队列,此队列依照先进先出 (FIFO) 的准则对元素进行排序。反对偏心锁和非偏心锁。
SynchronousQueue 一个不存储元素的阻塞队列,每一个 put 操作必须期待 take 操作,否则不能增加元素。反对偏心锁和非偏心锁。
当工作队列是 LinkedBlockingQueue,队列的默认长度为 Integer.MAX_VALUE,corePoolSize=2,maximumPoolSize=6,此时启动 10 个线程工作,前 2 个线程工作立即被执行,后续 8 个工作被退出工作队列中。
public class ThreadPoolTest {public static void main(String[] args) {BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
for (int i = 0; i < 10; i++) {executor.execute(new Runnable() {
@Override
public void run() {
try {System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
});
int threadSize = queue.size();
System.out.println("线程队列大小为 -->"+threadSize);
}
executor.shutdown();}
}
pool-1-thread-1
线程队列大小为 -->0
pool-1-thread-2
线程队列大小为 -->0
线程队列大小为 -->1
线程队列大小为 -->2
线程队列大小为 -->3
线程队列大小为 -->4
线程队列大小为 -->5
线程队列大小为 -->6
线程队列大小为 -->7
线程队列大小为 -->8
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
当工作队列是 ArrayBlockingQueue,队列的长度为 4,corePoolSize=2,maximumPoolSize=6,此时启动 10 个线程工作,前 2 个线程工作立即被执行,后续 4 个工作被退出工作队列中,剩下的 4 个线程工作(2+4=6, 没有超过 maximumPoolSize),启动新线程执行工作
public class ThreadPoolTest {public static void main(String[] args) {BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4);
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
for (int i = 0; i < 10; i++) {executor.execute(new Runnable() {
@Override
public void run() {
try {System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
});
int threadSize = queue.size();
System.out.println("线程队列大小为 -->"+threadSize);
}
executor.shutdown();}
}
线程队列大小为 -->0
pool-1-thread-1
线程队列大小为 -->0
pool-1-thread-2
线程队列大小为 -->1
线程队列大小为 -->2
线程队列大小为 -->3
线程队列大小为 -->4
pool-1-thread-3
线程队列大小为 -->4
线程队列大小为 -->4
pool-1-thread-4
线程队列大小为 -->4
pool-1-thread-5
pool-1-thread-6
线程队列大小为 -->4
pool-1-thread-1
pool-1-thread-3
pool-1-thread-4
pool-1-thread-2
当工作队列是 ArrayBlockingQueue,队列的长度为 4,corePoolSize=2,maximumPoolSize=6,此时启动 11 个线程工作,前 2 个线程工作立即被执行,后续 4 个工作被退出工作队列中,剩下的 5 个线程工作(2+5>6, 超过 maximumPoolSize),最初一个工作被拒绝执行
public class ThreadPoolTest {public static void main(String[] args) {BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4);
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
for (int i = 0; i < 11; i++) {executor.execute(new Runnable() {
@Override
public void run() {
try {System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
});
int threadSize = queue.size();
System.out.println("线程队列大小为 -->"+threadSize);
}
executor.shutdown();}
}
pool-1-thread-1
线程队列大小为 -->0
pool-1-thread-2
线程队列大小为 -->0
线程队列大小为 -->1
线程队列大小为 -->2
线程队列大小为 -->3
线程队列大小为 -->4
线程队列大小为 -->4
pool-1-thread-3
pool-1-thread-4
线程队列大小为 -->4
pool-1-thread-5
线程队列大小为 -->4
pool-1-thread-6
线程队列大小为 -->4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolTest$1@2b193f2d rejected from java.util.concurrent.ThreadPoolExecutor@355da254[Running, pool size = 6, active threads = 6, queued tasks = 4, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at ThreadPoolTest.main(ThreadPoolTest.java:9)
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
当工作队列是 SynchronousQueue,corePoolSize=2,maximumPoolSize=6,此时启动 10 个线程工作,前 6 个线程工作立刻执行,剩下的 4 个线程工作(从 7 开始 >6, 超过 maximumPoolSize),被拒绝执行
public class ThreadPoolTest {public static void main(String[] args) {BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
for (int i = 0; i < 10; i++) {executor.execute(new Runnable() {
@Override
public void run() {
try {System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
});
int threadSize = queue.size();
System.out.println("线程队列大小为 -->"+threadSize);
}
executor.shutdown();}
}
线程队列大小为 -->0
pool-1-thread-1
线程队列大小为 -->0
pool-1-thread-2
线程队列大小为 -->0
pool-1-thread-3
线程队列大小为 -->0
pool-1-thread-4
线程队列大小为 -->0
pool-1-thread-5
线程队列大小为 -->0
pool-1-thread-6
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolTest$1@355da254 rejected from java.util.concurrent.ThreadPoolExecutor@4dc63996[Running, pool size = 6, active threads = 6, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at ThreadPoolTest.main(ThreadPoolTest.java:9)
回绝策略
工作回绝模块是线程池的爱护局部,线程池有一个最大的容量,当线程池的工作缓存队列已满,并且线程池中的线程数目达到 maximumPoolSize 时,就须要回绝掉该工作,采取工作回绝策略,爱护线程池。
源码解析
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
一个 ctl 变量能够蕴含两局部信息: 线程池的运行状态 (runState) 和线程池内无效线程的数量 (workerCount). 因为 int 型的变量是由 32 位二进制的数形成, 所以用 ctl 的高 3 位来示意线程池的运行状态, 用低 29 位来示意线程池内无效线程的数量.
示意线程池运行状态的变量通常命名为 rs, 示意线程池中无效线程数量的变量通常命名为 wc, 另外, ctl 也通常会简写作 c
因为 ctl 变量是由线程池的运行状态 (runState) 和线程池内无效线程的数量 (workerCount)这两个信息组合而成, 所以, 如果晓得了这两局部信息各自的数值, 就能够调用上面的 ctlOf() 办法来计算出 ctl 的数值
private static int ctlOf(int rs, int wc) {return rs | wc;}
反过来, 如果晓得了 ctl 的值, 那么也能够通过如下的 runStateOf() 和 workerCountOf() 两个办法来别离获取线程池的运行状态和线程池内无效线程的数量.
CAPACITY 示意线程池内无效线程的数量下限就是 29 个二进制 1 所示意的数值 (约为 5 亿)
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int runStateOf(int c) {return c & ~CAPACITY;}
private static int workerCountOf(int c) {return c & CAPACITY;}
线程池五种状态的具体数值
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
咱们要向线程池提交一个工作, 能够通过调用 execute() 或 submit()办法来实现, 而二者的区别是, execute()办法只能进行工作的提交而不能获取该工作执行的后果, 但 submit()办法则既能进行工作的提交, 又能获取该工作执行的后果. 所以, 如果你须要获取一个工作执行的后果或者须要对一个工作执行的后果进行某种操作, 那么就须要应用 submit()办法来提交工作. 其实 submit()办法就是对 execute()办法的一种封装, 它外部也是调用 execute()办法来实现工作提交的, 只是因为 submit()办法的返回值是一个 Future 对象, 通过返回的 Future 对象就能获取该工作最终执行的后果,从线程的提交开始剖析, 因为 submit()办法外部也是调用 execute()办法, 所以咱们就间接剖析 execute()办法, 其源码如下:
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 依据 ctl 的值, 获取线程池中的无效线程数 workerCount, 如果 workerCount 小于外围线程数 corePoolSize,调用 addWorker()办法创立线程执行工作
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
// 如果线程池内的阻塞队列 workQueue 未满, 则将提交的工作 command 增加到阻塞队列 workQueue 中,被增加到阻塞队列中的该工作将会在将来的某个时刻被执行.
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 如果线程池内的线程池内的阻塞队列已满,并且无效线程数小于最大线程数 maximumPoolSize,调用 addWorker()办法创立线程执行工作
// 如果线程池内的线程池内的阻塞队列已满,并且无效线程数达到了最大线程数 maximumPoolSize, 那么就让 RejectedExecutionHandler 依据它的回绝策略来解决该工作, 默认的解决形式是间接抛异样.
} else if (!addWorker(command, false))
reject(command);
}
四种线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
Alibaba 标准的解释:
【强制】线程池不容许应用 Executors 去创立,而是通过 ThreadPoolExecutor 的形式,这样 的解决形式让写的同学更加明确线程池的运行规定,躲避资源耗尽的危险。阐明:Executors 返回的线程池对象的弊病如下:
1)FixedThreadPool 和 SingleThreadPool : 容许的申请队列长度为 Integer.MAX_VALUE,可能会沉积大量的申请,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool : 容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。
参考:
Java 线程池实现原理及其在美团业务中的实际
Java 线程池 ThreadPoolExecutor 源码剖析