共计 15344 个字符,预计需要花费 39 分钟才能阅读完成。
前言
线程池是 Java
中应用较多的并发框架,正当应用线程池,能够:升高资源耗费
, 进步响应速度
, 进步线程的可管理性
。本篇文章为《Java 并发编程的艺术》
第 9 章的学习笔记,依据原文作者的编写思路,顺次对 线程池的原理
, 线程池的创立
, 线程池执行工作
和敞开线程池
进行了学习和总结。最初会给出一个线程池的实现案例,该案例为之前在某微信公众号上看到的线程池实现实战,曾经无奈讲究其出处,原案例对于线程池的敞开存在缺点,我对其进行了一些修改和阐明,分享进去一起学习。
注释
一. 线程池的原理
当一个工作提交到线程池 ThreadPoolExecutor
时,该工作的执行如下图所示。
- 如果以后运行的线程数小于 corePoolSzie(外围线程数),则创立新线程来执行工作(须要获取全局锁);
- 如果以后运行的线程数等于或大于 corePoolSzie,则将工作退出
BlockingQueue
(工作阻塞队列); - 如果
BlockingQueue
已满,则创立新的线程来执行工作(须要获取全局锁); - 如果创立新线程会使以后线程数大于 maximumPoolSize(最大线程数),则回绝工作并调用
RejectedExecutionHandler
的rejectedExecution()
办法。
因为 ThreadPoolExecutor
存储工作线程应用的汇合是 HashSet
,因而执行上述步骤 1 和步骤 3 时须要获取全局锁来保障线程平安,而获取全局锁会导致线程池性能瓶颈,因而通常状况下,线程池实现预热后(以后线程数大于等于 corePoolSize),线程池的execute()
办法都是执行步骤 2。
二. 线程池的创立
通过 ThreadPoolExecutor
可能创立一个线程池,ThreadPoolExecutor
的构造函数签名如下。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
通过 ThreadPoolExecutor
创立线程池时,须要指定线程池的 外围线程数 , 最大线程数 , 线程保活工夫 , 线程保活工夫单位 和工作阻塞队列 ,并按需指定 线程工厂 和饱和回绝策略 ,如果不指定 线程工厂 和饱和回绝策略 ,则ThreadPoolExecutor
会应用默认的 线程工厂 和饱和回绝策略。上面别离介绍这些参数的含意。
参数 | 含意 |
---|---|
corePoolSize | 外围线程数,即线程池的根本大小。当一个工作被提交到线程池时,如果线程池的线程数小于 corePoolSize,那么无论其余线程是否闲暇,也需创立一个新线程来执行工作。 |
maximumPoolSize | 最大线程数。当线程池中线程数大于等于 corePoolSize 时,新提交的工作会退出工作阻塞队列,然而如果工作阻塞队列已满且线程数小于 maximumPoolSize,此时会持续创立新的线程来执行工作。该参数规定了线程池容许创立的最大线程数 |
keepAliveTime | 线程保活工夫。当线程池的线程数大于外围线程数时,多余的闲暇线程会最大存活 keepAliveTime 的工夫,如果超过这个工夫且闲暇线程还没有获取到工作来执行,则该闲暇线程会被回收掉。 |
unit | 线程保活工夫单位。通过 TimeUnit 指定线程保活工夫的工夫单位,可选单位有 DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和 NANOSECONDS(纳秒),但无论指定什么工夫单位,ThreadPoolExecutor 对立会将其转换为 NANOSECONDS。 |
workQueue | 工作阻塞队列。线程池的线程数大于等于 corePoolSize 时,新提交的工作会增加到 workQueue 中,所有线程执行完上一个工作后,会循环从 workQueue 中获取工作来执行。 |
threadFactory | 创立线程的工厂。能够通过线程工厂给每个创立进去的线程设置更有意义的名字。 |
handler | 饱和回绝策略。如果工作阻塞队列已满且线程池中的线程数等于 maximumPoolSize,阐明线程池此时处于饱和状态,应该执行一种回绝策略来解决新提交的工作。 |
三. 线程池执行工作
线程池应用两个办法执行工作,别离为 execute()
和submit()
。execute()
办法用于执行不须要返回值的工作,submit()
办法用于执行须要返回值的工作。execute()
是接口 Executor
定义的办法,submit()
是接口 ExecutorService
定义的办法,相干类图如下所示。
ThreadPoolExecutor
对 execute()
的实现如下。
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
AbstractExecutorService
对 submit()
实现如下。
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
在 execute()
办法中会依据以后线程数决定是新建线程来解决工作还是增加工作到工作阻塞队列中,而在 submit()
办法中是将工作封装成 RunnableFuture
而后再调用 execute()
办法。
四. 敞开线程池
能够通过调用线程池的 shutdown()
或者 shutdownNow()
办法来敞开线程池。
shutdown()
办法会将线程池状态置为SHUTDOWN
,此时线程池不会再接管新提交的工作,闲暇的线程会被中断,当正在被执行的工作和工作阻塞队列中的工作执行完后线程池才会平安的敞开掉。
shutdownNow()
办法会将线程池状态置为STOP
,此时线程池不会再接管新提交的工作,所有线程会被中断,工作阻塞队列中的工作不再执行(这些工作会以列表模式返回),正在执行中的工作也会被尝试进行。
补充:上述中的闲暇线程能够了解为正在从工作阻塞队列中获取工作的线程,即没有在执行工作的线程。
五. 线程池实现实战
在 ThreadPoolExecutor
中,存储 Worker
(工作线程)的汇合为HashSet
,因而每次对Worker
汇合做操作时须要获取全局锁。在本线程池实现的实战中,将基于 ConcurrentHashMap
实现一个 ConcurrentHashSet
并作为存储 Worker
的汇合。实现如下。
public class ConcurrentHashSet<T> extends AbstractSet<T> {private final ConcurrentHashMap<T, Object> MAP = new ConcurrentHashMap<>();
// 无实际意义,用于和 Worker 组成键值对存入 ConcurrentHashMap
private final Object PRESENT = new Object();
// 用于统计以后汇合中的 Worker 数量
private final AtomicInteger COUNT = new AtomicInteger();
@Override
public boolean add(T t) {COUNT.incrementAndGet();
return MAP.put(t, PRESENT) == null;
}
@Override
public boolean remove(Object o) {COUNT.decrementAndGet();
return MAP.remove(o) == PRESENT;
}
@Override
public Iterator<T> iterator() {return MAP.keySet().iterator();}
@Override
public int size() {return COUNT.get();
}
}
再看一下 ThreadPool
的字段。
public class ThreadPool {
// 全局锁
private final ReentrantLock lock = new ReentrantLock();
// 外围线程数量
private final int coreSize;
// 最大线程数量
private final int maxSize;
// 线程存活放弃工夫
private final long keepAliveTime;
// 线程存活放弃工夫单位
private final TimeUnit unit;
// 工作阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 回绝策略
private volatile RejectedExecutionHandler handler;
// 寄存线程池中的线程的汇合
private final Set<Worker> workers = new ConcurrentHashSet<>();
// 线程池是否平安敞开标记
private final AtomicBoolean shutDown = new AtomicBoolean(false);
// 线程池是否强制敞开标记
private final AtomicBoolean shutDownNow = new AtomicBoolean(false);
// 提交到线程池中的工作总数
private final AtomicInteger taskNum = new AtomicInteger();
......
/**
* 获取线程工厂
*/
public ThreadFactory getThreadFactory() {return threadFactory;}
/**
* 更新线程池的回绝策略
* @param handler 回绝策略
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {if (handler == null) {throw new NullPointerException();
}
this.handler = handler;
}
/**
* 更新线程池的线程工厂
* @param threadFactory 线程池
*/
public void setThreadFactory(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException();
}
this.threadFactory = threadFactory;
}
......
}
因为本实战仅须要实现线程池的根本简略性能,因而外围线程数量,最大线程数量,线程保活工夫和线程保活工夫单位一经指定,便无奈再被批改。再看一下构造函数。
public class ThreadPool {
......
/**
* 创立线程池,并应用默认的回绝策略和线程工厂
* @param coreSize 外围线程数
* @param maxSize 最大线程数
* @param keepAliveTime 线程保活工夫
* @param unit 线程保活工夫单位
* @param workQueue 工作阻塞队列
*/
public ThreadPool(int coreSize, int maxSize, int keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this.coreSize = coreSize;
this.maxSize = maxSize;
this.keepAliveTime = keepAliveTime;
this.unit = unit;
this.workQueue = workQueue;
threadFactory = Executors.defaultThreadFactory();
handler = new AbortPolicy();}
/**
* 创立线程池,并应用默认的回绝策略
* @param coreSize 外围线程数
* @param maxSize 最大线程数
* @param keepAliveTime 线程保活工夫
* @param unit 线程保活工夫单位
* @param workQueue 工作阻塞队列
* @param threadFactory 线程工厂
*/
public ThreadPool(int coreSize, int maxSize, int keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this.coreSize = coreSize;
this.maxSize = maxSize;
this.keepAliveTime = keepAliveTime;
this.unit = unit;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
handler = new AbortPolicy();}
/**
* 创立线程池
* @param coreSize 外围线程数
* @param maxSize 最大线程数
* @param keepAliveTime 线程保活工夫
* @param unit 线程保活工夫单位
* @param workQueue 工作阻塞队列
* @param threadFactory 线程工厂
* @param handler 回绝策略
*/
public ThreadPool(int coreSize, int maxSize, int keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
this.coreSize = coreSize;
this.maxSize = maxSize;
this.keepAliveTime = keepAliveTime;
this.unit = unit;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
this.handler = handler;
}
......
}
提供了三个构造函数,和 ThreadPoolExecutor
相似,外围线程数量,最大线程数量,线程保活工夫,线程保活工夫单位和工作阻塞队列须要用户指定。上面看一下 submit()
办法和 execute()
办法。
public class ThreadPool {
......
/**
* 执行有返回值的工作
* @param callable 须要执行的工作,有返回值
*/
public <T> Future<T> submit(Callable<T> callable) {if (callable == null) {throw new NullPointerException();
}
FutureTask<T> futureTask = new FutureTask<>(callable);
execute(futureTask);
return futureTask;
}
/**
* 执行无返回值的工作
* @param runnable 须要执行的工作,无返回值
*/
public void execute(Runnable runnable) {if (runnable == null) {throw new NullPointerException();
}
// 调用了 shutDown()或者 shutDownNow()办法后,此时线程池不承受新工作
if (shutDown.get() || shutDownNow.get()) {return;}
// 工作总数加一
taskNum.incrementAndGet();
// 如果以后线程数小于外围线程数,创立 Worker(线程)并执行工作
if (workers.size() < coreSize) {addWorker(runnable);
return;
}
// 如果以后线程数大于等于外围线程数,则将工作退出工作阻塞队列
// 如果工作阻塞队列已满,则 offer()办法返回 false 示意增加失败
boolean offer = workQueue.offer(runnable);
if (!offer) {
// 如果以后线程数小于最大线程数,则创立 Worker 来执行工作
if (workers.size() < maxSize) {addWorker(runnable);
} else {
// 如果以后线程数大于等于最大线程数,则执行回绝策略,并且工作总数减一
taskNum.decrementAndGet();
handler.rejectedExecution(runnable, this);
}
}
}
private void addWorker(Runnable runnable) {Worker worker = new Worker(runnable);
workers.add(worker);
worker.getThread().start();
}
......
}
ThreadPool
的 execute()
办法遵循第一大节中总结的线程池接管到一个新工作时的判断策略。addWorker()
办法会创立一个 Worker
对象,而后将其增加到 Worker
汇合中,因为应用了实现的线程平安的 ConcurrentHashSet
作为 Worker
汇合,因而该步骤不须要获取全局锁。Worker
类是 ThreadPool
的外部类,其实现如下。
public class ThreadPool {
......
private final class Worker implements Runnable {
// 要运行的初始工作(创立 Worker 时传入的工作)private final Runnable firstTask;
// 线程(通过线程工厂创立)private final Thread thread;
public Worker(Runnable task) {
firstTask = task;
thread = getThreadFactory().newThread(this);
}
public Thread getThread() {return thread;}
public void close() {thread.interrupt();
}
@Override
public void run() {
// 创立 Worker 时会先执行初始工作
Runnable task = firstTask;
try {
// 如果 task 为空,则从工作阻塞队列中获取工作
while (task != null || (task = getTask()) != null) {
try {
// 执行工作
task.run();} finally {
// 每次工作执行结束,会将 task 置为 null,而后循环从工作阻塞队列中获取工作
task = null;
taskNum.decrementAndGet();}
}
} finally {
// 从工作阻塞队列中获取工作为 null 时会跳出 while 循环并执行这里的开释线程逻辑
if (workers.remove(this)) {// 用于调用 shutDown()办法后平安的敞开线程池
tryClose();}
}
}
}
......
}
Worker
有两个字段:firstTask 和 thread。其中 firstTask 会被赋值为创立 Worker
时传入的工作,示意 Worker
的初始工作,因而一个 Worker
总是会先执行初始工作,而后再去从工作阻塞队列中获取工作。thread 会在创立 Worker
时通过线程工厂创立,并且 thread 执行的工作为 Worker
本身(因为 Worker
实现了 Runnable
接口,Worker
本身也是一个工作)。在创立 Worker
后,Worker
中的线程 thread 便会启动,从而会执行 Worker
的run()
办法,run()
办法的实现思路为先执行初始工作,而后循环调用 ThreadPool
的getTask()
办法从工作阻塞队列中获取工作,如果获取工作失败(超过了线程保活工夫 / 线程被中断)则退出循环并执行开释 Worker
的逻辑。
上面看一下 getTask()
办法的实现。
public class ThreadPool {
......
private Runnable getTask() {
Runnable task;
// 如果强制敞开标记为 true,则不再从工作阻塞队列中获取工作
// 如果平安敞开标记为 true,且工作全副执行完,则不再从工作阻塞队列中获取工作
if (shutDownNow.get() || (shutDown.get() && taskNum.get() == 0)) {return null;}
try {
// 同一时间只能有一个线程从工作阻塞队列中获取工作,其余线程进入阻塞状态
lock.lockInterruptibly();
if (workers.size() > coreSize) {
// 如果以后线程数大于外围线程数,则线程从工作阻塞队列中获取工作时最多期待线程保活的工夫,超时则返回 null
task = workQueue.poll(keepAliveTime, unit);
if (task == null) {System.out.println(Thread.currentThread().getName() + "time out");
}
} else {
// 如果以后线程数小于等于外围线程数,则线程从工作阻塞队列中获取工作时会始终阻塞直到获取到工作
task = workQueue.take();}
} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "interrupted");
task = null;
} finally {lock.unlock();
}
return task;
}
......
}
要了解 getTask()
办法,须要联合 ThreadPool
的shutDown()
办法和 shutDownNow()
办法一起剖析。其实现如下。
public class ThreadPool {
......
/**
* 平安的敞开线程池,会期待以后正在执行的工作以及工作阻塞队列中的工作执行结束后再敞开线程池
*/
public void shutDown() {shutDown.set(true);
tryClose();}
/**
* 强制的敞开线程池,立刻中断所有线程,包含正在执行工作的线程
*/
public void shutDownNow() {shutDownNow.set(true);
doClose();}
private void tryClose() {if (shutDown.get() && taskNum.get() == 0) {
// 如果平安敞开标记为 true 且工作全副执行完,此时立刻中断所有线程
for (Worker worker : workers) {worker.close();
}
}
}
private void doClose() {if (shutDownNow.get()) {
// 如果强制敞开标记为 true,此时立刻中断所有线程
for (Worker worker : workers) {worker.close();
}
}
}
}
Worker
通过 getTask()
办法从工作阻塞队列获取工作分三种状况探讨。
第一种状况 :shutDown()
办法和 shutDownNow()
办法没有被调用,此时 shutDown 和 shutDownNow 均为 false。该种状况下 Worker
能够失常从工作阻塞队列中获取工作,因为应用了全局锁,所以在同一时间,只能有一个 Worker
可能从工作阻塞队列中获取工作,其余 Worker
进入阻塞状态。如果线程池的以后线程数大于外围线程数,那么获取到全局锁的 Worker
会阻塞在工作阻塞队列的 poll()
办法上,超过保活工夫还没获取到工作则间接返回 null,从而会开释这个 Worker
;如果线程池的以后线程数小于等于外围线程数,那么获取到全局锁的Worker
会始终阻塞在工作阻塞队列的 take()
办法上直到获取到工作。
第二种状况 :shutDown()
办法被调用,此时 shutDown 为 true。如果还有未执行完的工作,那么 Worker
能够失常从工作阻塞队列获取工作,同状况一。如果工作阻塞队列没有工作且没有正在执行的工作,那么 shutDown()
办法调用时便会中断所有线程,在 getTask()
办法中阻塞的 Worker
会被全副唤醒并被开释掉,如果 shutDown()
办法调用时工作阻塞队列没有工作然而存在正在执行的工作,那么期待最初一个工作执行完后会中断所有线程,在 getTask()
办法中阻塞的 Worker
会被全副唤醒并被开释掉。
第三种状况 :shutDownNow()
办法被调用,此时 shutDownNow 为 true。无论工作阻塞队列是否有工作以及无论是否有正在执行的工作,在 shutDownNow()
办法调用时便会中断所有线程,如果线程是阻塞在工作阻塞队列中,那么线程会被唤醒并被开释掉,如果线程正在执行工作,那么也会尝试中断线程,执行完工作或者响应中断从工作中退出的线程再次从 getTask()
办法获取工作时会间接获取到 null(因为 shutDownNow 为 true),从而被开释掉。特地留神,如果某个线程执行的工作是一个死循环并且无奈响应中断,那么这个线程永远不会被开释。
最初 ThreadPool
提供一个 size()
办法用于获取以后的 Worker
数量,该办法仅用于测试。
public int size() {return workers.size();
}
上面编写测试程序来测试ThreadPool
。
测试用例一 :测试ThreadPool
的shutDown()
办法,创立一个 ThreadPool
,外围线程数为 2,最大线程数为 4,工作阻塞队列大小为 20,创立一个固定工作为打印数字 0 -2,每打印一次数字睡眠 1 秒。首先向ThreadPool
提交三个固定工作,期待 4 秒后执行 shutDown()
办法,预期的后果应该为提交的三个工作都会执行完,并且工作执行完后阻塞在工作阻塞队列中的工作会被唤醒并被开释。测试代码如下。
public class ThreadPoolTest {
@Test
void testThreadPool_1() {CountDownLatch countDownLatch = new CountDownLatch(3);
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(20);
ThreadPool threadPool = new ThreadPool(2,
4, 1000, TimeUnit.MILLISECONDS, workQueue);
Runnable task = () -> {
try {for (int i = 0; i < 3; i++) {System.out.println(Thread.currentThread().getName() + ":" + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "interrupted from sleep");
}
countDownLatch.countDown();};
for (int i = 0; i < 3; i++) {threadPool.execute(task);
}
try {Thread.sleep(4000);
threadPool.shutDown();
Thread.sleep(3000);
countDownLatch.await();} catch (InterruptedException e) {System.out.println(e.getMessage());
}
System.out.println("Worker size:" + threadPool.size());
System.out.println("Task num:" + workQueue.size());
}
}
测试后果如下所示。
结果表明,一开始 ThreadPool
别离创立了线程 1 和线程 2 来解决 task1 和 task2,而后 task3 被增加到了工作阻塞队列,当线程 1 和线程 2 执行完工作后,线程 2 获取到了全局锁,从工作阻塞队列中获取到了 task3,而线程 1 进入阻塞状态,此时调用 shutDown()
办法,期待线程 2 执行完工作后,中断所有线程,此时线程 1 被唤醒而后被开释掉,最初 ThreadPool
中线程数量和工作数量全副为 0。
测试用例二 :测试ThreadPool
的shutDownNow()
办法,创立一个 ThreadPool
,外围线程数为 2,最大线程数为 4,工作阻塞队列大小为 20,创立一个固定工作为打印数字 0 -2,每打印一次数字睡眠 1 秒。首先向ThreadPool
提交三个固定工作,期待 2 秒后执行 shutDownNow()
办法,预期的后果应该为提交的三个工作都不会执行完,所有线程在 shutDownNow()
办法被调用后会被中断并开释掉。测试代码如下。
public class ThreadPoolTest {
@Test
void testThreadPool_2() {CountDownLatch countDownLatch = new CountDownLatch(2);
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(20);
ThreadPool threadPool = new ThreadPool(2,
4, 1000, TimeUnit.MILLISECONDS, workQueue);
Runnable task = () -> {
try {for (int i = 0; i < 3; i++) {System.out.println(Thread.currentThread().getName() + ":" + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "interrupted from sleep");
}
countDownLatch.countDown();};
for (int i = 0; i < 3; i++) {threadPool.execute(task);
}
try {Thread.sleep(2000);
threadPool.shutDownNow();
Thread.sleep(1000);
countDownLatch.await();} catch (InterruptedException e) {System.out.println(e.getMessage());
}
System.out.println("Worker size:" + threadPool.size());
System.out.println("Task num:" + workQueue.size());
}
}
测试后果如下所示。
结果表明,一开始 ThreadPool
别离创立了线程 1 和线程 2 来解决 task1 和 task2,而后 task3 被增加到了工作阻塞队列,而后在线程 1 和线程 2 执行工作的过程中,调用了 shutDownNow()
办法,因为 task1 和 task2 可能响应中断并退出工作,所以线程 1 和线程 2 会从工作中退出而后被开释掉,工作阻塞队列中的 task3 也不会被执行,最初 ThreadPool
中线程数量为 0,工作数量为 1。
测试用例三 :创立一个ThreadPool
,外围线程数为 2,最大线程数为 4,工作阻塞队列大小为 1,线程保活工夫为 1 秒,创立一个固定工作为打印数字 0 -2,每打印一次数字睡眠 1 秒。首先向ThreadPool
提交五个固定工作,而后期待 ThreadPool
执行这些工作,预期的后果应该为在所有工作执行完后,ThreadPool
中的线程数应该为 2(外围线程数),工作数应该为 0。测试代码如下。
public class ThreadPoolTest {
@Test
void testThreadPool_3() {CountDownLatch countDownLatch = new CountDownLatch(5);
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1);
ThreadPool threadPool = new ThreadPool(2,
4, 1000, TimeUnit.MILLISECONDS, workQueue);
Runnable task = () -> {
try {for (int i = 0; i < 3; i++) {System.out.println(Thread.currentThread().getName() + ":" + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "interrupted from sleep");
}
countDownLatch.countDown();};
for (int i = 0; i < 5; i++) {threadPool.execute(task);
}
try {countDownLatch.await();
} catch (InterruptedException e) {System.out.println(e.getMessage());
}
System.out.println("Worker size:" + threadPool.size());
System.out.println("Task num:" + workQueue.size());
}
}
测试后果如下所示。
结果表明,一开始 ThreadPool
创立了线程 1 和线程 2 来解决 task1 和 task2,而后 task3 被增加到了工作阻塞队列,而后创立了线程 3 和线程 4 来解决 task4 和 task5,在工作执行完后,线程 4 获取到了全局锁,从工作阻塞队列中获取到了 task3 继续执行,此时线程 1 和线程 2 因为在线程保活工夫内没有获取到工作来执行,被开释掉,最初 ThreadPool
中线程数量为 2,工作数量为 0。
总结
线程池的应用次要聚焦于线程池的参数的配置,而要正当的配置线程池的参数,则须要对线程池的原理有肯定的理解,如果可能本人写一个线程池并实现基本功能,对了解线程池的原理大有益处。