关于java:线程池原理源码

5次阅读

共计 3484 个字符,预计需要花费 9 分钟才能阅读完成。

线程池调用 execute 提交工作—> 创立 Worker(设置属性 thead、firstTask)—>worker.thread.start()—> 实际上调用的是 worker.run()—> 线程池的 runWorker(worker)—>worker.firstTask.run();

runWork 办法 外围 (线程复用)

final void runWorker(Worker w) {
    // 获取以后线程
    Thread wt = Thread.currentThread();
    // 获取 w 的 firstTask
    Runnable task = w.firstTask;
    // 设置 w 的 firstTask 为 null
    w.firstTask = null;
    // 开释锁,设置 AQS 的 state 为 0,容许中断
    w.unlock();
    // 用于标识线程是否异样终止,finally 中 processWorkerExit() 办法会有不同逻辑
    boolean completedAbruptly = true;
    try {// 循环调用 getTask() 获取工作, 一直从工作缓存队列获取工作并执行
        while (task != null || (task = getTask()) != null) {
            // 进入循环外部,代表曾经获取到可执行的工作,则对 worker 对象加锁,保障线程在执行工作过程中不会被中断
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||  // 若线程池状态大于等于 STOP,那么意味着该线程要中断
                    (Thread.interrupted() &&      // 线程被中断
                            runStateAtLeast(ctl.get(), STOP))) &&  // 且是因为线程池外部状态变动而被中断
                    !wt.isInterrupted())           // 确保该线程未被中断
                // 收回中断请求
                wt.interrupt();
            try {
                // 开始执行工作前的 Hook 办法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 到这里正式开始执行工作
                    task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {
                    // 执行工作后的 Hook 办法
                    afterExecute(task, thrown);
                }
            } finally {// 置空 task,筹备通过 getTask() 获取下一个工作
                task = null;
                //completedTasks 递增
                w.completedTasks++;
                // 开释掉 worker 持有的独占锁
                w.unlock();}
        }
        completedAbruptly = false;
    } finally {
        // 到这里,线程执行完结,须要执行完结线程的一些清理工作
        // 线程执行完结可能有两种状况://1.getTask() 返回 null,也就是说,这个 worker 的使命完结了,线程执行完结
        //2. 工作执行过程中产生了异样
        // 第一种状况,getTask() 返回 null,那么 getTask() 中会将 workerCount 递加
        // 第二种状况,workerCount 没有进行解决,这个递加操作会在 processWorkerExit() 中解决
        processWorkerExit(w, completedAbruptly);
    }
}

循环取工作 geTask()

private Runnable getTask() {
    // 标识以后线程是否超时未能获取到 task 对象
    boolean timedOut = false;


    for (;;) {
        // 获取线程池的管制状态
        int c = ctl.get();
        // 获取线程池的运行状态
        int rs = runStateOf(c);


        // 如果线程池状态大于等于 STOP,或者处于 SHUTDOWN 状态,并且阻塞队列为空,线程池工作线程数量递加,办法返回 null,回收线程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
            return null;
        }


        // 获取 worker 数量
        int wc = workerCountOf(c);


        // 标识以后线程在闲暇时,是否应该超时回收
        // 如果 allowCoreThreadTimeOut 为 ture,或以后线程数大于外围池大小,则须要超时回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


        // 如果 worker 数量大于 maximumPoolSize(有可能调用了 setMaximumPoolSize(), 导致 worker 数量大于 maximumPoolSize)
        if ((wc > maximumPoolSize || (timed && timedOut))  // 或者获取工作超时
                && (wc > 1 || workQueue.isEmpty())) {  //workerCount 大于 1 或者阻塞队列为空(在阻塞队列不为空时,须要保障至多有一个工作线程)if (compareAndDecrementWorkerCount(c))
                // 线程池工作线程数量递加,办法返回 null,回收线程
                return null;
            // 线程池工作线程数量递加失败,跳过残余局部,持续循环
            continue;
        }


        try {// 如果容许超时回收,则调用阻塞队列的 poll(),只在 keepAliveTime 工夫内期待获取工作,一旦超过则返回 null
            // 否则调用 take(),如果队列为空,线程进入阻塞状态,有限时期待工作,直到队列中有可取工作或者响应中断信号退出
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            // 若 task 不为 null,则返回胜利获取的 task 对象
            if (r != null)
                return r;
            // 若返回 task 为 null,示意线程闲暇工夫超时,则设置 timeOut 为 true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果此 worker 产生了中断,采取的计划是重试,没有超时
            // 在哪些状况下会产生中断?调用 setMaximumPoolSize(),shutDown(),shutDownNow()
            timedOut = false;
        }
    }
}

ps: 线程池工作线程执行工作产生异样如何解决?

processWorkerExit 办法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 失常的话再 runWorker 的 getTask 办法 workerCount 曾经被减一了
    if (completedAbruptly)
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 累加线程的 completedTasks
        completedTaskCount += w.completedTasks;
        // 从线程池中移除超时或者出现异常的线程
        workers.remove(w);
    } finally {mainLock.unlock();
    }
    // 尝试进行线程池
    tryTerminate();
    int c = ctl.get();
    // runState 为 RUNNING 或 SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        // 线程不是异样完结
        if (!completedAbruptly) {
            // 线程池最小闲暇数,容许 core thread 超时就是 0,否则就是 corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果 min == 0 然而队列不为空要保障有 1 个线程来执行队列中的工作
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            // 线程池还不为空那就不必放心了
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 1. 线程异样退出
        // 2. 线程池为空,然而队列中还有工作没执行,看 addWoker 办法对这种状况的解决
        addWorker(null, false);
    }
}
正文完
 0