关于java:线程池原理源码

线程池调用execute提交工作—>创立Worker(设置属性thead、firstTask)—>worker.thread.start()—>实际上调用的是worker.run()—>线程池的runWorker(worker)—>worker.firstTask.run();

runWork办法 外围(线程复用)

final void runWorker(Worker w) {
    //获取以后线程
    Thread wt = Thread.currentThread();
    //获取w的firstTask
    Runnable task = w.firstTask;
    //设置w的firstTask为null
    w.firstTask = null;
    // 开释锁,设置AQS的state为0,容许中断
    w.unlock();
    //用于标识线程是否异样终止,finally中processWorkerExit()办法会有不同逻辑
    boolean completedAbruptly = true;
    try {
        //循环调用getTask()获取工作,一直从工作缓存队列获取工作并执行
        while (task != null || (task = getTask()) != null) {
            //进入循环外部,代表曾经获取到可执行的工作,则对worker对象加锁,保障线程在执行工作过程中不会被中断
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||  //若线程池状态大于等于STOP,那么意味着该线程要中断
                    (Thread.interrupted() &&      //线程被中断
                            runStateAtLeast(ctl.get(), STOP))) &&  //且是因为线程池外部状态变动而被中断
                    !wt.isInterrupted())           //确保该线程未被中断
                //收回中断请求
                wt.interrupt();
            try {
                //开始执行工作前的Hook办法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //到这里正式开始执行工作
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行工作后的Hook办法
                    afterExecute(task, thrown);
                }
            } finally {
                //置空task,筹备通过getTask()获取下一个工作
                task = null;
                //completedTasks递增
                w.completedTasks++;
                //开释掉worker持有的独占锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //到这里,线程执行完结,须要执行完结线程的一些清理工作
        //线程执行完结可能有两种状况:
        //1.getTask()返回null,也就是说,这个worker的使命完结了,线程执行完结
        //2.工作执行过程中产生了异样
        //第一种状况,getTask()返回null,那么getTask()中会将workerCount递加
        //第二种状况,workerCount没有进行解决,这个递加操作会在processWorkerExit()中解决
        processWorkerExit(w, completedAbruptly);
    }
}

循环取工作 geTask()

private Runnable getTask() {
    //标识以后线程是否超时未能获取到task对象
    boolean timedOut = false;


    for (;;) {
        //获取线程池的管制状态
        int c = ctl.get();
        //获取线程池的运行状态
        int rs = runStateOf(c);


        //如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递加,办法返回null,回收线程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }


        //获取worker数量
        int wc = workerCountOf(c);


        //标识以后线程在闲暇时,是否应该超时回收
        // 如果allowCoreThreadTimeOut为ture,或以后线程数大于外围池大小,则须要超时回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


        //如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize(),导致worker数量大于maximumPoolSize)
        if ((wc > maximumPoolSize || (timed && timedOut))  //或者获取工作超时
                && (wc > 1 || workQueue.isEmpty())) {  //workerCount大于1或者阻塞队列为空(在阻塞队列不为空时,须要保障至多有一个工作线程)
            if (compareAndDecrementWorkerCount(c))
                //线程池工作线程数量递加,办法返回null,回收线程
                return null;
            //线程池工作线程数量递加失败,跳过残余局部,持续循环
            continue;
        }


        try {
            //如果容许超时回收,则调用阻塞队列的poll(),只在keepAliveTime工夫内期待获取工作,一旦超过则返回null
            //否则调用take(),如果队列为空,线程进入阻塞状态,有限时期待工作,直到队列中有可取工作或者响应中断信号退出
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            //若task不为null,则返回胜利获取的task对象
            if (r != null)
                return r;
            // 若返回task为null,示意线程闲暇工夫超时,则设置timeOut为true
            timedOut = true;
        } catch (InterruptedException retry) {
            //如果此worker产生了中断,采取的计划是重试,没有超时
            //在哪些状况下会产生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow()
            timedOut = false;
        }
    }
}

ps:线程池工作线程执行工作产生异样如何解决?

processWorkerExit办法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 失常的话再runWorker的getTask办法workerCount曾经被减一了
    if (completedAbruptly)
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 累加线程的completedTasks
        completedTaskCount += w.completedTasks;
        // 从线程池中移除超时或者出现异常的线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 尝试进行线程池
    tryTerminate();
    int c = ctl.get();
    // runState为RUNNING或SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        // 线程不是异样完结
        if (!completedAbruptly) {
            // 线程池最小闲暇数,容许core thread超时就是0,否则就是corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min == 0然而队列不为空要保障有1个线程来执行队列中的工作
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            // 线程池还不为空那就不必放心了
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 1.线程异样退出
        // 2.线程池为空,然而队列中还有工作没执行,看addWoker办法对这种状况的解决
        addWorker(null, false);
    }
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理