关于java:面试官给我讲讲线程池上

4次阅读

共计 5513 个字符,预计需要花费 14 分钟才能阅读完成。

场景回顾

明天回到寝室小叶唱着:快把我删了吧~,吓得我赶快理解状况,我还认为和女朋友闹变扭呢,后果说明天面试又被面试官虐了,要把面试官删了。

面试官:我看简历上写了精通并发编程,那么线程池在平时的工作中必定须要用到吧,你个别是用在什么场景下呢?

小叶:嗯嗯,线程池平时用过的,我个别用在爬虫的场景,能够将多个网络申请通过线程池的形式去并行处理,这样子能够进步零碎的吞吐量。

面试官:嗯嗯,线程池用在爬虫的场景是很常见的一种,那么你是怎么创立线程池的呢?

小叶:我个别用的是 JDK 自带一个工厂办法外面提供的间接创立线程池的办法,具体名字我想不大起来了。

面试官:你说的是 Executors 类吧,个别咱们会用到外面 单线程的线程池、固定线程的线程池等,那么它底层是如何去通过参数去创立不同的线程池呢?

小叶:emm… 您说的是 ThreadPoolExecutor 吧,它如同由 7 个参数组成,如同有外围线程数、最大线程数、闲暇工夫长度、闲暇工夫单位、工作队列、线程工厂、回绝策略这些参数。

面试官:嗯嗯,看来你对线程池还是颇有一些理解,如果我当初设置 外围线程数 10 个 最大线程数 20 个 工作队列为无界队列,当初我一次性来了30 个工作,那么以后线程池中线程数为多少,工作是否会被回绝呢?

小叶:这个简略,最大线程数为 20 且一次性来了 30 个工作,当然线程数就是 20,因为工作队列为无界的,所以工作不会被回绝。

面试官:嗯嗯,那如果我把工作队列改为大小为 20 的队列,那么当初最多能够接管多少申请?

小叶:emm… 应该是外围线程数 + 最大线程数 + 工作队列大小 =50 吧。

面试官:明天的面试就先到这里了。

咱们通过小叶的面试惨状,不难发现在面试中线程池是一块常常会被面试官问到的问题,线程池几个简略的参数可能并不是咱们料想的后果。以及咱们在平时编码的过程中可能为了图不便间接调用 Executors 类提供的简便办法,然而其中可能有一些问题须要咱们去思考,在 Executors 类中提供的线程池办法中所有工作队列都是无界的,这可能会导致内存溢出。还有我如果咱们想要采纳其余的工作回绝策略,咱们要怎么实现呢?带着这些问题,上面小张带你由浅入深的解析源码。

整体构造

从上图能够看出 ThreadPoolExecutor 的整体继承关系,这里能够看出设计线程池作者,依据职责拆散设计了两个接口,并通过模板办法增加钩子函数。

Executor 接口

public interface Executor {
    // 将工作执行 具体怎么执行须要看子类如何实现
    void execute(Runnable command);
}

ExecutorService 接口

public interface ExecutorService extends Executor {
        // 进行线程池, 直到池中所有工作执行结束
    void shutdown();
       // 立刻进行线程池, 并返回未执行的工作
    List<Runnable> shutdownNow();
        // 是否进行状态
    boolean isShutdown();
    // 是否终结
    boolean isTerminated();
    // 期待线程池进行
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
        // 提交带返回后果的工作, 返回一个后果的代理对象
    <T> Future<T> submit(Callable<T> task);
       // 提交无返回的工作, 工作执行胜利返回 result
    <T> Future<T> submit(Runnable task, T result);
    // 提交无返回的工作
    Future<?> submit(Runnable task);
        // 执行结束 tasks 汇合中所有的工作, 并返回后果
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    // 在上一个办法的根底上增加超时工夫
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    // tasks 中任一工作执行结束就返回
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
        // 在上一步增加了超时工夫
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService 形容了一个线程池应该具备敞开线程池、提交工作、工作执行策略等性能。

接下来咱们通过 AbstractExecutorService 抽象类来看线程池是怎么通过模版办法来缩小代码的。

通过类的结构图能够看出抽象类实现了 submit、invokeAll、invokeAny 的办法。

submit

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
          // newTaskFor 该办法这里先不赘述, 这里能够了解为将 task 进行了包装
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 调用 Executor 类中的办法
        execute(ftask);
        return ftask;
}

通过源码能够看出该办法将工作包装之后间接进行调用 execute 办法。

invokeAll

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {if (tasks == null)
            throw new NullPointerException();
        // 计算出超时工夫
        long nanos = unit.toNanos(timeout);
        // 用于寄存 task 的汇合
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            // 计算工作执行截止工夫
            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();
            // 循环的执行汇合中的 task
            for (int i = 0; i < size; i++) {execute((Runnable)futures.get(i));
                // 判断是否达到截止工夫
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }
            // 代码运行到这里, 阐明所有工作在规定工夫内都曾经被执行, 然而处理结果实现还是未知
            for (int i = 0; i < size; i++) {Future<T> f = futures.get(i);
                // 获取工作的返回值代理, 判断是否实现
                if (!f.isDone()) {
                    // 如已达到截止工夫返回 future 汇合
                    if (nanos <= 0L)
                        return futures;
                    try {
                        // 阻塞期待 future 返回
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {} catch (ExecutionException ignore) {} catch (TimeoutException toe) {
                        // 如果超时返回 future 汇合
                        return futures;
                    }
                    // 更新剩余时间
                    nanos = deadline - System.nanoTime();}
            }
            // 代码运行到这里阐明所有工作都已被执行实现, 并设置实现标记
            done = true;
            return futures;
        } finally {
            /** 如果 done 为 false, 将所有正在执行的工作勾销
                只有提交工作过程中超时、获取某次执行后果超时 done 才会为 false
            **/
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

invokeAny

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 空查看
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        
        /**
            该类非本文重点具体源码解析到 JUC 包中浏览, 这里将会概述该类提供了什么性能。这里将线程池实例传递给该类, 目标是它能够调用线程池的 execute 办法, 不须要本人再去保护池。该类中实现了 submit 提交工作并返回 furure, 并且在外部保护了一个阻塞队列, 当工作执行胜利会将 future 对象放入该阻塞队列
            take 办法是队列阻塞的获取元素
            poll 办法是队列非阻塞的获取元素
        **/
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            // 记录异样
            ExecutionException ee = null;
            // 如果开启超时, 计算超时工夫
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 将第一个工作
            futures.add(ecs.submit(it.next()));
            // 工作数 -1
            --ntasks;
            int active = 1;

            for (;;) {
                // 非阻塞的获取元素
                Future<T> f = ecs.poll();
                if (f == null) {
                    // 如果此时还有工作可执行, 持续将工作提交, 并将沉闷数 +1
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 沉闷数为 0 退出循环, 沉闷数为 0 的状况为从 future.get()获取抛出异样才会执行到这里
                    else if (active == 0)
                        break;
                    else if (timed) {
                        // 如果超时开启, 则在响应队列中期待获取元素, 超时则报错
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();}
                    else
                        // 未开启超时, 则进入有限期待的阻塞获取响应
                        f = ecs.take();}
                // 如果响应队列非空, 阐明此时有工作曾经实现
                if (f != null) {
                    // 沉闷数 -1
                    --active;
                    /**
                        通过 future 获取响应后果, 如果胜利间接返回
                        如果获取后果失败记录异样, 持续遍历从响应队列中拿下一个执行胜利的后果进行此过程
                    **/
                    try {return f.get();
                    } catch (ExecutionException eex) {ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex);
                    }
                }
            }
            
            // 执行到这里阐明所有的工作都执行结束, 并且所有的 future.get 都失败了, 将会把异样抛出去
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            // 最初将所有工作执行一遍撤销
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

最初结语

可能读到这里的读者,置信你曾经把握了线程池中基类中的十分重要的办法,你至多明确了如何通过设计基类减少代码复用,以及 invokeAny 逻辑执行十分值得回味无穷,浏览源码是干燥的过程,但在这个过程中你的思维会跟着大佬一步一步进化,这也是为什么互联网大厂都要求浏览过源码,在下一篇我将会正式进入 ThreadPoolExecutor 类中,解开它神秘的面纱,下面面试官的发问也会迎刃而解。

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0