共计 5513 个字符,预计需要花费 14 分钟才能阅读完成。
场景回顾
明天回到寝室小叶唱着:快把我删了吧~,吓得我赶快理解状况,我还认为和女朋友闹变扭呢,后果说明天面试又被面试官虐了,要把面试官删了。
面试官:我看简历上写了精通并发编程,那么线程池在平时的工作中必定须要用到吧,你个别是用在什么场景下呢?
小叶:嗯嗯,线程池平时用过的,我个别用在爬虫的场景,能够将多个网络申请通过线程池的形式去并行处理,这样子能够进步零碎的吞吐量。
面试官:嗯嗯,线程池用在爬虫的场景是很常见的一种,那么你是怎么创立线程池的呢?
小叶:我个别用的是 JDK 自带一个工厂办法外面提供的间接创立线程池的办法,具体名字我想不大起来了。
面试官:你说的是 Executors 类吧,个别咱们会用到外面 单线程的线程池、固定线程的线程池等,那么它底层是如何去通过参数去创立不同的线程池呢?
小叶:emm… 您说的是 ThreadPoolExecutor 吧,它如同由 7 个参数组成,如同有外围线程数、最大线程数、闲暇工夫长度、闲暇工夫单位、工作队列、线程工厂、回绝策略这些参数。
面试官:嗯嗯,看来你对线程池还是颇有一些理解,如果我当初设置 外围线程数 10 个 , 最大线程数 20 个 , 工作队列为无界队列,当初我一次性来了30 个工作,那么以后线程池中线程数为多少,工作是否会被回绝呢?
小叶:这个简略,最大线程数为 20 且一次性来了 30 个工作,当然线程数就是 20,因为工作队列为无界的,所以工作不会被回绝。
面试官:嗯嗯,那如果我把工作队列改为大小为 20 的队列,那么当初最多能够接管多少申请?
小叶:emm… 应该是外围线程数 + 最大线程数 + 工作队列大小 =50 吧。
面试官:明天的面试就先到这里了。
咱们通过小叶的面试惨状,不难发现在面试中线程池是一块常常会被面试官问到的问题,线程池几个简略的参数可能并不是咱们料想的后果。以及咱们在平时编码的过程中可能为了图不便间接调用 Executors 类提供的简便办法,然而其中可能有一些问题须要咱们去思考,在 Executors 类中提供的线程池办法中所有工作队列都是无界的,这可能会导致内存溢出。还有我如果咱们想要采纳其余的工作回绝策略,咱们要怎么实现呢?带着这些问题,上面小张带你由浅入深的解析源码。
整体构造
从上图能够看出 ThreadPoolExecutor 的整体继承关系,这里能够看出设计线程池作者,依据职责拆散设计了两个接口,并通过模板办法增加钩子函数。
Executor 接口
public interface Executor {
// 将工作执行 具体怎么执行须要看子类如何实现
void execute(Runnable command);
}
ExecutorService 接口
public interface ExecutorService extends Executor {
// 进行线程池, 直到池中所有工作执行结束
void shutdown();
// 立刻进行线程池, 并返回未执行的工作
List<Runnable> shutdownNow();
// 是否进行状态
boolean isShutdown();
// 是否终结
boolean isTerminated();
// 期待线程池进行
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交带返回后果的工作, 返回一个后果的代理对象
<T> Future<T> submit(Callable<T> task);
// 提交无返回的工作, 工作执行胜利返回 result
<T> Future<T> submit(Runnable task, T result);
// 提交无返回的工作
Future<?> submit(Runnable task);
// 执行结束 tasks 汇合中所有的工作, 并返回后果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 在上一个办法的根底上增加超时工夫
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// tasks 中任一工作执行结束就返回
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 在上一步增加了超时工夫
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService 形容了一个线程池应该具备敞开线程池、提交工作、工作执行策略等性能。
接下来咱们通过 AbstractExecutorService 抽象类来看线程池是怎么通过模版办法来缩小代码的。
通过类的结构图能够看出抽象类实现了 submit、invokeAll、invokeAny 的办法。
submit
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
// newTaskFor 该办法这里先不赘述, 这里能够了解为将 task 进行了包装
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 调用 Executor 类中的办法
execute(ftask);
return ftask;
}
通过源码能够看出该办法将工作包装之后间接进行调用 execute 办法。
invokeAll
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {if (tasks == null)
throw new NullPointerException();
// 计算出超时工夫
long nanos = unit.toNanos(timeout);
// 用于寄存 task 的汇合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
// 计算工作执行截止工夫
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// 循环的执行汇合中的 task
for (int i = 0; i < size; i++) {execute((Runnable)futures.get(i));
// 判断是否达到截止工夫
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
// 代码运行到这里, 阐明所有工作在规定工夫内都曾经被执行, 然而处理结果实现还是未知
for (int i = 0; i < size; i++) {Future<T> f = futures.get(i);
// 获取工作的返回值代理, 判断是否实现
if (!f.isDone()) {
// 如已达到截止工夫返回 future 汇合
if (nanos <= 0L)
return futures;
try {
// 阻塞期待 future 返回
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {} catch (ExecutionException ignore) {} catch (TimeoutException toe) {
// 如果超时返回 future 汇合
return futures;
}
// 更新剩余时间
nanos = deadline - System.nanoTime();}
}
// 代码运行到这里阐明所有工作都已被执行实现, 并设置实现标记
done = true;
return futures;
} finally {
/** 如果 done 为 false, 将所有正在执行的工作勾销
只有提交工作过程中超时、获取某次执行后果超时 done 才会为 false
**/
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
invokeAny
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
// 空查看
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
/**
该类非本文重点具体源码解析到 JUC 包中浏览, 这里将会概述该类提供了什么性能。这里将线程池实例传递给该类, 目标是它能够调用线程池的 execute 办法, 不须要本人再去保护池。该类中实现了 submit 提交工作并返回 furure, 并且在外部保护了一个阻塞队列, 当工作执行胜利会将 future 对象放入该阻塞队列
take 办法是队列阻塞的获取元素
poll 办法是队列非阻塞的获取元素
**/
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 记录异样
ExecutionException ee = null;
// 如果开启超时, 计算超时工夫
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 将第一个工作
futures.add(ecs.submit(it.next()));
// 工作数 -1
--ntasks;
int active = 1;
for (;;) {
// 非阻塞的获取元素
Future<T> f = ecs.poll();
if (f == null) {
// 如果此时还有工作可执行, 持续将工作提交, 并将沉闷数 +1
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 沉闷数为 0 退出循环, 沉闷数为 0 的状况为从 future.get()获取抛出异样才会执行到这里
else if (active == 0)
break;
else if (timed) {
// 如果超时开启, 则在响应队列中期待获取元素, 超时则报错
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();}
else
// 未开启超时, 则进入有限期待的阻塞获取响应
f = ecs.take();}
// 如果响应队列非空, 阐明此时有工作曾经实现
if (f != null) {
// 沉闷数 -1
--active;
/**
通过 future 获取响应后果, 如果胜利间接返回
如果获取后果失败记录异样, 持续遍历从响应队列中拿下一个执行胜利的后果进行此过程
**/
try {return f.get();
} catch (ExecutionException eex) {ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex);
}
}
}
// 执行到这里阐明所有的工作都执行结束, 并且所有的 future.get 都失败了, 将会把异样抛出去
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 最初将所有工作执行一遍撤销
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
最初结语
可能读到这里的读者,置信你曾经把握了线程池中基类中的十分重要的办法,你至多明确了如何通过设计基类减少代码复用,以及 invokeAny 逻辑执行十分值得回味无穷,浏览源码是干燥的过程,但在这个过程中你的思维会跟着大佬一步一步进化,这也是为什么互联网大厂都要求浏览过源码,在下一篇我将会正式进入 ThreadPoolExecutor 类中,解开它神秘的面纱,下面面试官的发问也会迎刃而解。
本文由博客一文多发平台 OpenWrite 公布!