关于java:面试官给我讲讲线程池中

前景回顾

在上一篇中咱们通过线程池的继承关系,具体分析了线程池的形象父类AbstractExecutorService中的submit、invokeAll、invokeAny办法。在本篇中,咱们将会把眼帘放在ThreadPoolExecutor具体实现当中,通过源码剖析咱们将会明确7个参数是如何在源码中运行的。

应用场景

咱们先回顾一下在理论场景下的业务代码,上面模仿了10个线程并行处理工作,而后进行线程池承受,最初期待线程池敞开。

public static void main(String[] args) throws InterruptedException {
        // 开启线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        // 开启10个工作并行处理
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                // 模仿业务代码
                try {
                    Thread.sleep(1000);
                       System.out.println("工作完结");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        // 暂停线程池工作接管
        executor.shutdown();
        // 期待线程池完结
        executor.awaitTermination(1,TimeUnit.MINUTES);
    }

构造函数

总共重载了4个构造函数,设置了默认的参数,这种设计思路大家能够借鉴,上面只展现了其中两个重要的构造函数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             // 设置默认工厂,工厂中返回线程优先级为一般并且为非守护的线程
             Executors.defaultThreadFactory(), 
             // 默认回绝策略为回绝产生时间接抛出异样
             defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
              // 判断参数边界
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
              // 设置平安管理器,不在本篇思考范畴内
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
                 // 设置外围线程数
        this.corePoolSize = corePoolSize;
              // 设置最大线程数
        this.maximumPoolSize = maximumPoolSize;
              // 设置工作队列
        this.workQueue = workQueue;
              // 设置线程闲暇工夫
        this.keepAliveTime = unit.toNanos(keepAliveTime);
              // 设置线程工厂
        this.threadFactory = threadFactory;
              // 设置回绝策略
        this.handler = handler;
}

execute办法

public void execute(Runnable command) {
              // 边界判断
        if (command == null)
            throw new NullPointerException();
              /**
                  判断当前工作线程数是否小于外围线程数
                  这里的ctl能够先认为它保留了线程池的工作线程数量和线程池状态
                  为什么一个变量能够示意两种状态前面会解释到
              **/
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
              // 增加工作线程,返回是否胜利创立,胜利创立则返回
            if (addWorker(command, true))
                return;
              /**
                  如果创立没胜利,则从新获取线程池状态,对于线程池具体状态会在下文形容
                  从新获取的起因在于execute是线程平安的办法
                  那么就会存在多线程调用,在此期间线程池状态可能会发生变化,敞开或有新工作增加
                  所以从新获取线程池状态放弃最新的状态
              **/
            c = ctl.get();
        }
              /**
                  运行到这,阐明当前工作线程大于外围线程数或者创立工作线程不胜利(线程池非Running)
                  判断以后线程是否运行并且工作队列是否胜利增加工作
              **/
        if (isRunning(c) && workQueue.offer(command)) {
              // 从新查看线程池状态
            int recheck = ctl.get();
              // 如非运行状态且可能删除删除,则回绝工作
            if (! isRunning(recheck) && remove(command))
                reject(command);
              // 如工作线程数为0,则增加工作线程,此种状况产生在工作线程在闲暇工夫销毁时
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
              /**
                  运行到此阐明,线程池状态非running或增加工作队列不胜利
                  则尝试增加工作线程,如果增加不胜利,则回绝工作
              **/
        else if (!addWorker(command, false))
            reject(command);
}

总结:浏览完execute办法后,咱们能够总结线程池会先在小于外围线程数的时增加外围工作线程,在工作队列无奈增加工作时增加非核心工作线程,在线程池非running状态工作队列满且工作线程满时回绝工作。

回顾咱们上篇提出的问题:当咱们创立外围线程数10个,最大线程数20个,工作队列为无界队列的线程池,并同时来了30个工作。

问题一:请问线程池中的线程数为多少?

问题二:那如果我把工作队列改为大小为20的队列,那么当初最多能够接管多少申请?

通过源码的浏览咱们当初能够很简略的答复这两个问题。

  • 问题一:通过源码可知前10个工作间接去创立外围工作线程,因为工作队列是无界的因而后20个工作间接退出了工作队列期待外围工作线程生产。
  • 问题二:如把工作队列改为容量为20的队列,那么现可承受最大(最大线程数+队列容量)=40个申请。

在浏览execute办法时,咱们把ctl属性、addWorker当做了黑盒,只是通过作者正文和办法命名去判断办法大抵做了什么操作,并且咱们都晓得execute是一个线程平安的办法,它能够由不同的线程去调用,然而在源码中咱们也没有发现加锁的局部,小伙伴们必定十分好奇这些底层办法是如何做到这些的。

CTL

    // 类型为原子整数类,增删改查都是原子操作
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 代表共有32-3=29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 代表最大容量为2^29-1 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // 将线程池状态贮存在整数字节的高位中,代表高3位代表线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 因高3位代表线程池状态,此办法将低29位变为0就能够失去高3位状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 与上雷同,将高3位变为0,失去工作线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 将rs和wc的进行或运算
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 判断c是否小于s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 判断c是否大于等于s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    // 因为只有Running小于shutdown通过此办法来判断
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    // 尝试应用CAS的形式给ctl+1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 尝试应用CAS的形式给ctl-1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

通过上方对于ctl的源码,咱们能够看出作者将一个整数变量分为了两个局部,一部分用来示意线程池状态,另一部分来示意当前工作线程数,将高3位来示意线程池以后状态,后29位示意线程池大小。通过这里骚的面试官又能够出题了,问最大线程数最大能够设置为多少,又要杀倒一片。

或者会有小伙伴不懂位运算看不懂该段逻辑,又是左移又是右移的各种位运算,但其实先把办法大抵的性能理解了并不影响前面源码的浏览。

因为该篇篇幅无限,举荐想要刨根问底的小伙伴查问一下问运算的材料。

线程池状态

在ctl属性的局部,咱们会发现有如下几个枚举状态,那么都代表什么意思呢?

  • RUNNING :容许接管新工作并解决在工作队列中的工作。
  • SHUTDOWN:不接管新工作但解决在工作队列中的工作。
  • STOP:不接管新工作、不解决工作队列中工作、中断在解决中的工作
  • TIDYING:所有工作已完结、工作线程数为0、并会调用terminated()钩子办法
  • TERMINATED:terminated()钩子办法胜利执行

在线程池中状态是这样子流转的:

  • RUNNING -> SHUTDOWN:调用线程池的shutdown()办法。
  • (RUNNING/SHUTDOWN) -> STOP:调用线程池的shutdownNow()办法。
  • SHUTDOWN -> TIDYING:当工作队列和工作线程都为空时。
  • STOP -> TIDYING:当工作线程为空时。
  • TIDYING -> TERMINATED:当terminated()钩子办法胜利执行。

addWorker

该办法将会创立工作线程,并将创立数量管制在外围线程数或最大线程数,其中的firstTask为工作线程创立胜利后执行的第一个工作,第二个参数代表是否为外围工作线程,最终返回线程是否创立胜利。

private final HashSet<Worker> workers = new HashSet<Worker>();

private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
        // 给最外层循环设置标记,且该循环为死循环
        retry:
        for (;;) {
            // 获取ctl值
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);
            if (
                // 判断是否为SHUTDOWN、STOP、TIDYING、TERMINATED其中之一
                rs >= SHUTDOWN 
                &&
                /**
                    只有在SHUTDOWN且无工作须要执行且工作队列非空的时候该段逻辑返回true
                    代表须要持续增加工作队列执行工作队列中工作
                **/
                ! (
                     // 状态为SHUTDOWN
                   rs == SHUTDOWN &&
                   // 无第一个工作须要执行
                   firstTask == null &&
                   // 工作队列非空
                   ! workQueue.isEmpty())
                  )
                return false;
            // 死循环
            for (;;) {
                // 获取当前工作线程数
                int wc = workerCountOf(c);
                if (
                    // 如以后数量超过最大容量间接返回
                    wc >= CAPACITY 
                    ||
                    // 如创立为外围工作线程则与最大外围线程大小比拟,否则与最大线程数大小比拟
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS减少工作线程数,增加超过完结最外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // CAS执行没胜利,值产生扭转,须要从新读取CTL的值
                c = ctl.get();
                // 如线程池状态产生扭转,从新执行最外层循环
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创立工作线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 因该段代码将会对HashSet进行操作,所以应用重入锁加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 从新获取状态
                    int rs = runStateOf(ctl.get());

                    if (
                        // rs为RUNNING状态
                        rs < SHUTDOWN 
                        ||
                        // 这种状况为池中工作线程达到空时工夫被销毁但工作队列还有工作时
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 预查看线程是否能够启动
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                           // 将工作线程增加进workers汇合中
                        workers.add(w);
                        // 记录工作线程数量最大达到的数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 工作线程增加示意设置为胜利
                        workerAdded = true;
                    }
                } finally {
                    // 可重入锁解锁
                    mainLock.unlock();
                }
                // 如工作线程增加胜利,将工作线程启动
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如工作线程启动失败,将工作线程从汇合中去除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回工作线程是否增加胜利
        return workerStarted;
    }

通过该段源码,它在适当的机会把咱们的task工作传递给了工作线程并创立,并将创立胜利的工作线程退出汇合中。其中CAS死循环的模式,是咱们开发中能够借鉴学习的模式。

worker

通过源码,将task传递到worker中,并调用了start()办法,那么阐明worker中必定是一个线程并且有它本人的run办法,那么咱们就很有必要探寻其中是如何进行编码的。

上图是Worker类的继承关系图,能够看出Worker继承了AQS、实现了Runnable办法,那么咱们就能够大胆的猜想他实现了某种锁的机制、并且能够被线程执行。

worker结构器
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

    Worker(Runnable firstTask) {
            // 这里先看做设置标记
            setState(-1); 
            // 设置第一个将会执行的工作
            this.firstTask = firstTask;
            /**
                通过最开始通过线程池结构器传入的线程池工厂创立线程
                因为worker实现Runnable接口,那么它就能够通过传入新线程中
                能够推断出调用了thread.start()就会执行worker的run()办法
            **/
            this.thread = getThreadFactory().newThread(this);
}

run办法

public void run() {
    runWorker(this);
}

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 设置标记可被中断
        w.unlock();
        boolean completedAbruptly = true;
        try {
            while (
                // firstTask不为空或能够从工作队列中获取到工作
                task != null || (task = getTask()) != null) {
                w.lock();
                // 中断判断,在下篇介绍
               if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 钩子办法,子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    // 调用task的run办法,并抓住所有异样,由钩子办法解决
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子办法,子类实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将task置空
                    task = null;
                    // 实现的工作数+1
                    w.completedTasks++;
                    // 标记worker可用
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 执行工作线程退出
            processWorkerExit(w, completedAbruptly);
        }
    }

通过该段代码,可用剖析出该段代码通过while循环始终从getTask()中获取工作,那么上面剖析getTask办法。

getTask

private Runnable getTask() {
        boolean timedOut = false; 
        
        // 死循环
        for (;;) {
            // 获取以后状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (
                // SHUTDOWN、STOP、TIDYING、TERMINATED
                rs >= SHUTDOWN && 
                // STOP、TIDYING、TERMINATED或工作队列为空
                (rs >= STOP || workQueue.isEmpty())) {
                // 工作线程数量-1
                decrementWorkerCount();
                // return null之后下层runWorker将会退出while循环执行工作线程退出
                return null;
            }
            // 获取当前工作线程数量
            int wc = workerCountOf(c);

            // 判断是否容许超时:当容许外围线程超时为true或以后数量超过外围线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if (
                // 当前工作线程数量超过最大数量或容许超时并且曾经超时
                (wc > maximumPoolSize || (timed && timedOut))
                && 
                // 工作线程大于1或者工作队列为空
                (wc > 1 || workQueue.isEmpty())) {
                // 尝试CAS工作线程数量-1
                if (compareAndDecrementWorkerCount(c))
                    return null;
                // CAS不超过,持续下一次循环
                continue;
            }

            try {
                // 如果容许超时则调用poll办法期待设置定的超时工夫,否则调用take办法始终阻塞期待工作获取
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 获取到工作间接返回
                if (r != null)
                    return r;
                // 执行到这阐明获取工作超时,设置超时标记位
                timedOut = true;
            } catch (InterruptedException retry) {
                // 线程被中断,设置超时标记为false,从新下一次循环
                timedOut = false;
            }
        }
    }

通过getTask办法咱们能够看出,在设置容许外围线程超时或以后线程数大于外围线程数则示意超时开启,由此开关来判断调用阻塞队列中的阻塞办法还是非阻塞办法,一旦超时则返回null那么worker的run办法就会退出循环进入worker销毁过程,由此实现线程池线程数量的动静批改。

总结

本文通过通过execute办法作为切入点,带大家意识了CAS模式、锁模式以及是如何解决线程池状态。

在浏览源码的过程中,很多人喜爱刨根问底,但其实浏览源码就是一个不求甚解的过程,在理论浏览源码过程中调用栈可能会达到5-6层甚至可能更多层,这样子浏览源码其实是十分低效的,在始终往下深挖的过程中你会发现你的工夫和精力在一直的被耗费,最初只明确了源码中的一部分的逻辑分支,和咱们浏览源码的初衷齐全不同。

所以我举荐浏览源码先浏览调用栈的1-2层,再往深了就不要去深究了 ,等到整体逻辑都看明确了能够再回过头来去学习哪些具体细节。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理