场景回顾
明天回到寝室小叶唱着:快把我删了吧~,吓得我赶快理解状况,我还认为和女朋友闹变扭呢,后果说明天面试又被面试官虐了,要把面试官删了。
面试官:我看简历上写了精通并发编程,那么线程池在平时的工作中必定须要用到吧,你个别是用在什么场景下呢?
小叶:嗯嗯,线程池平时用过的,我个别用在爬虫的场景,能够将多个网络申请通过线程池的形式去并行处理,这样子能够进步零碎的吞吐量。
面试官:嗯嗯,线程池用在爬虫的场景是很常见的一种,那么你是怎么创立线程池的呢?
小叶:我个别用的是JDK自带一个工厂办法外面提供的间接创立线程池的办法,具体名字我想不大起来了。
面试官:你说的是Executors类吧,个别咱们会用到外面单线程的线程池、固定线程的线程池等,那么它底层是如何去通过参数去创立不同的线程池呢?
小叶:emm…您说的是ThreadPoolExecutor吧,它如同由7个参数组成,如同有外围线程数、最大线程数、闲暇工夫长度、闲暇工夫单位、工作队列、线程工厂、回绝策略这些参数。
面试官:嗯嗯,看来你对线程池还是颇有一些理解,如果我当初设置外围线程数10个,最大线程数20个,工作队列为无界队列,当初我一次性来了30个工作,那么以后线程池中线程数为多少,工作是否会被回绝呢?
小叶:这个简略,最大线程数为20且一次性来了30个工作,当然线程数就是20,因为工作队列为无界的,所以工作不会被回绝。
面试官:嗯嗯,那如果我把工作队列改为大小为20的队列,那么当初最多能够接管多少申请?
小叶:emm…应该是外围线程数+最大线程数+工作队列大小=50吧。
面试官:明天的面试就先到这里了。
咱们通过小叶的面试惨状,不难发现在面试中线程池是一块常常会被面试官问到的问题,线程池几个简略的参数可能并不是咱们料想的后果。以及咱们在平时编码的过程中可能为了图不便间接调用Executors类提供的简便办法,然而其中可能有一些问题须要咱们去思考,在Executors类中提供的线程池办法中所有工作队列都是无界的,这可能会导致内存溢出。还有我如果咱们想要采纳其余的工作回绝策略,咱们要怎么实现呢?带着这些问题,上面小张带你由浅入深的解析源码。
整体构造
从上图能够看出ThreadPoolExecutor的整体继承关系,这里能够看出设计线程池作者,依据职责拆散设计了两个接口,并通过模板办法增加钩子函数。
Executor接口
public interface Executor {
// 将工作执行 具体怎么执行须要看子类如何实现
void execute(Runnable command);
}
ExecutorService接口
public interface ExecutorService extends Executor {
// 进行线程池,直到池中所有工作执行结束
void shutdown();
// 立刻进行线程池,并返回未执行的工作
List<Runnable> shutdownNow();
// 是否进行状态
boolean isShutdown();
// 是否终结
boolean isTerminated();
// 期待线程池进行
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交带返回后果的工作,返回一个后果的代理对象
<T> Future<T> submit(Callable<T> task);
// 提交无返回的工作,工作执行胜利返回result
<T> Future<T> submit(Runnable task, T result);
// 提交无返回的工作
Future<?> submit(Runnable task);
// 执行结束tasks汇合中所有的工作,并返回后果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 在上一个办法的根底上增加超时工夫
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// tasks中任一工作执行结束就返回
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 在上一步增加了超时工夫
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService形容了一个线程池应该具备敞开线程池、提交工作、工作执行策略等性能。
接下来咱们通过AbstractExecutorService抽象类来看线程池是怎么通过模版办法来缩小代码的。
通过类的结构图能够看出抽象类实现了submit、invokeAll、invokeAny的办法。
submit
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// newTaskFor该办法这里先不赘述,这里能够了解为将task进行了包装
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 调用Executor类中的办法
execute(ftask);
return ftask;
}
通过源码能够看出该办法将工作包装之后间接进行调用execute办法。
invokeAll
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 计算出超时工夫
long nanos = unit.toNanos(timeout);
// 用于寄存task的汇合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
// 计算工作执行截止工夫
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// 循环的执行汇合中的task
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
// 判断是否达到截止工夫
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
// 代码运行到这里,阐明所有工作在规定工夫内都曾经被执行,然而处理结果实现还是未知
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
// 获取工作的返回值代理,判断是否实现
if (!f.isDone()) {
// 如已达到截止工夫返回future汇合
if (nanos <= 0L)
return futures;
try {
// 阻塞期待future返回
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
// 如果超时返回future汇合
return futures;
}
// 更新剩余时间
nanos = deadline - System.nanoTime();
}
}
// 代码运行到这里阐明所有工作都已被执行实现,并设置实现标记
done = true;
return futures;
} finally {
/** 如果done为false,将所有正在执行的工作勾销
只有提交工作过程中超时、获取某次执行后果超时done才会为false
**/
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
invokeAny
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
// 空查看
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
/**
该类非本文重点具体源码解析到JUC包中浏览,这里将会概述该类提供了什么性能。
这里将线程池实例传递给该类,目标是它能够调用线程池的execute办法,不须要本人再去保护池。
该类中实现了submit提交工作并返回furure,并且在外部保护了一个阻塞队列,当工作执行胜利会将future对象放入该阻塞队列
take办法是队列阻塞的获取元素
poll办法是队列非阻塞的获取元素
**/
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 记录异样
ExecutionException ee = null;
// 如果开启超时,计算超时工夫
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 将第一个工作
futures.add(ecs.submit(it.next()));
// 工作数-1
--ntasks;
int active = 1;
for (;;) {
// 非阻塞的获取元素
Future<T> f = ecs.poll();
if (f == null) {
// 如果此时还有工作可执行,持续将工作提交,并将沉闷数+1
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 沉闷数为0退出循环,沉闷数为0的状况为从future.get()获取抛出异样才会执行到这里
else if (active == 0)
break;
else if (timed) {
// 如果超时开启,则在响应队列中期待获取元素,超时则报错
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
// 未开启超时,则进入有限期待的阻塞获取响应
f = ecs.take();
}
// 如果响应队列非空,阐明此时有工作曾经实现
if (f != null) {
// 沉闷数-1
--active;
/**
通过future获取响应后果,如果胜利间接返回
如果获取后果失败记录异样,持续遍历从响应队列中拿下一个执行胜利的后果进行此过程
**/
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
// 执行到这里阐明所有的工作都执行结束,并且所有的future.get都失败了,将会把异样抛出去
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 最初将所有工作执行一遍撤销
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
最初结语
可能读到这里的读者,置信你曾经把握了线程池中基类中的十分重要的办法,你至多明确了如何通过设计基类减少代码复用,以及invokeAny逻辑执行十分值得回味无穷,浏览源码是干燥的过程,但在这个过程中你的思维会跟着大佬一步一步进化,这也是为什么互联网大厂都要求浏览过源码,在下一篇我将会正式进入ThreadPoolExecutor类中,解开它神秘的面纱,下面面试官的发问也会迎刃而解。
本文由博客一文多发平台 OpenWrite 公布!
发表回复