关于java:java线程池源码一窥

57次阅读

共计 7633 个字符,预计需要花费 20 分钟才能阅读完成。

什么是线程池:

线程池是一种线程的应用模式, 事后将线程创立进去采纳池化的形式存储起来, 应用的时候间接应用即可, 防止频繁的线程的创立与销毁带来的性能开销, 也能够对线程的数量进行限度治理, 防止创立过多的线程导致 oom 异样。

java 中应用多线程的形式

1, 继承 Thread 类, 实现 Runable/Callable 接口, 调用 start/call 办法便能够开启一个线程。

2, 就是咱们要说的线程池的形式了。

1 的形式不用说频繁创立销毁线程带来性能开销, 以及线程的数量得不到限度治理理论我的项目中必定不能用该形式应用多线程。

java 中的线程池 Executor 框架

他是 java 对线程池的实现, 实现了将线程的执行单元和离开单元离开这种机制. 该框架包含三大部分

1, 工作单元即实现 Runable/Callable 接口的工作类(run/call 办法中写的是你真正想干的事)。

2, 执行单元, 缓存在线程池中的线程来执行你的工作.

3, 异步计算的后果, 当工作是实现 callable 接口时候, 执行办法会将返回后果封装倒 Future 对象中返回

前置思考:

有一个很经典的面试题问道: 线程能够调用屡次 start 办法么? 答案是不能, 为什么咱们能够关上源码

public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        /**
          他判断了线程的状态状态是 0 也就是 NEW 的状态能力执行该办法,
          很显然线程调用过一次 start 办法后状态必定不会为 NEW 而是 TERMINATED,
          所以屡次调用会抛出异样。至于线程的那几个状态在外部枚举类 State 外面有列举这里不再赘述
        */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {start0();
            started = true;
        } finally {
            try {if (!started) {group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

好那么咱们不禁要思考线程池可能实现对线程的复用, 那么他是如何做到让线程执行完工作之后在那里不完结等着下一个工作来了持续去执行的呢?要咱们来做的话怎么做呢? 咱们能够大胆的构想一下是不是像生产者消费者那样的模型就能够呢?。

应用线程池:

线程池的应用并不简单, 有两种形式去应用他:

1,Excutetors 工具类 (线程池工厂类?) 创立一些特定的线程池, 这外面是帮你屏蔽了一些参数的设置, 间接用他获取线程池对象会有如下问题: 他设置的默认参数对你的业务需要来说不见得正当, 弄不好就 OOM, 对你理解线程池的原理也不是好事儿. 所以我的项目中咱们用下一种形式. 具体外面有几种线程池读者有趣味能够自行去钻研, 我的项目开发中最好不要应用该种形式创立线程池.

2, ThreadPoolExecutor 创立线程池.

/**corePoolSize: 外围线程数量
   maximumPoolSize: 最大线程数量
   keepAliveTime: 线程多长时间没活干之后销毁(默认只针对非核心线程, 然而能够通过 allowCoreThreadTimeOut 设置外围线程超时销毁)
   unit: 工夫单位
   workQueue: 缓存工作的阻塞队列(当工作过去时候发现外围线程都在忙着就会先缓存进去该队列)
   threadFactory: 线程工厂
   handler: 回绝策略(当工作无奈被执行且不能被缓存时候执行)
*/
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
线程池大抵逻辑(摘自百度图片)

线程池中的线程是什么时候创立的:

咱们应用线程池是间接调用 execute 办法执行工作的.

 public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
             // 这里线程池的设计者奇妙的用 1 个 int 类型的变量示意了线程池的状态和以后线程的数量
            // 二进制的前 3 位示意线程池状态后 29 位示意线程的数量
        if (workerCountOf(c) < corePoolSize) {
            // 如果以后的线程数量小于外围线程数量, 尝试增加一个外围线程去执行以后任           
            if (addWorker(command, true))
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) {
            // 如果线程池当初是 runing 的状态, 且入队胜利
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //double check 线程池状态, 如果此时线程池状态不是 running 移除增加的工作并执行回绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 如果此时工作中的线程数量为 0 增加一个非核心线程
                addWorker(null, false);
        }
       // 入队也没吃胜利增加非核心线程
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker 办法:

Worker 是线程池的外部类, 外面封装了一个线程对象, 他自身又实现了 runable 接口, 封装的这个线程就是工作的执行单元. 这个线程在实例化的时候传的又是以后的 Worker 对象. 有点绕多品品。

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {w = new Worker(firstTask);
            // 真正执行工作的线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 因为线程池采纳的是 hashset 为保障线程平安, 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将 worker 增加到线程池中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            // 滚动更新池中的最大线程数
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {mainLock.unlock();
                }
                if (workerAdded) {
                    // 这里就是执行工作的逻辑了, 下面提到这个 t 是 worker 外面的一个成员, 他的实例化传了 worker 对象, 所以实际上这里执行的逻辑应该是 worker 实现 runable 接口后复写的 run 办法而 worker 类的 run 办法又是调用的线程池中的 RunWorker 办法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        // 拿到 worker 外面封装的 task
        Runnable task = w.firstTask;
        // 将 worker 外面的 task 清空, 这里变量名也取的很好, 第一个工作, 意思是 worker 第一个执行的工作必定是当初实例化他传进去的 runable 类型的参数, 当然这个工作也可能为空, 比方线程池创立之后执行事后创立线程的办法 prestartAllCoreThreads
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {while (task != null || (task = getTask()) != null) {
                // 如果 task 不为空或者 getTask 不为空就去执行 task
                // 我想大家曾经猜到了, 这个 getTask 多半就是从阻塞队列中获取工作了
                // 阻塞队列有什么特点?没有工作他就会阻塞在这里, 所以这便能够答复前文的问题,
                // 他是靠阻塞队列的 take 办法的阻塞个性让线程挂起从而实现线程的复用的
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 真正执行业务办法的中央
                        task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {
            // 这个办法是销毁 worker 的办法, 外面有将 worker 从池 hashset 中移除的逻辑, 那么他什么时候会走到呢?
            //1, 上述逻辑产生了异样即 completedAbruptly=true.
            //2, 上述逻辑失常退出, 咦, 刚不是说上述循环条件会卡在 take 办法阻塞住么, 怎么会失常退出呢?咱们看看 getTask 办法
            processWorkerExit(w, completedAbruptly);
        }
    }
private Runnable getTask() {
       // 记得初始化线程池有一个参数叫线程多长时间没活干就销毁他么
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            // 死循环
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 这里意思是说查看超时的必要条件要么是外围线程也容许超时要么是以后有非核心线程在运行着
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 疏忽第一个极其条件超过最大线程数量不看, 第一次进来是必定不会进去这个分支的因为 timeout 为 false.
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {//cas 缩小一个执行单元的数量, 并没有销毁线程池中的线程对象, 销毁动作在 processWorkerExit 办法中行将线程 (worker) 从池即 hashset 中移除
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 这里就是阻塞获取队列外面的工作
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    // 获取到了工作
                    return r;
                // 超时没有获取到 timeout 就是 true 了, 再次循环就会到下面缩小运行线程数量的分支去了
                timedOut = true;
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

最初的问题:

当初咱们大抵晓得了线程池的原理, 可是还有一个很辣手的问题, 就是那几个参数应该如何设置才正当? 在答复这个问题之前无妨先考虑一下为什么要用多线程, 答案置信大家应该都晓得为了进步 cpu 的利用率, 咱们又晓得单核处理器在同一时间只会解决一个工作的, 之所以咱们能够边看知乎边听歌, 得益于 cpu 的工夫片机制在各个 (过程) 线程之间一直的来回切换运行让咱们产生了同时运行的错觉。切线程的上下文切换的开销也不小, 因为零碎有用户态到内核态的切换。

咱们次要看外围线程数量的参数设置

所以既然上下文切换有开销, 所以线程并不是越多越好, 在抉择多线程时候要对具体的工作具体的剖析:

CPU 密集型工作: 即该工作原本就是须要 CPU 大量参加的计算型工作,CPU 的利用率曾经很充沛了, 这个时候你的线程你再弄多的线程也只会减少线程上下文带来的额定开销罢了。所以此时应该实践上设置的线程数量和零碎 cpu 数量雷同, 然而为了避免一些意外的产生个别设置为 cpu 数量 + 1 个线程。

IO 密集型工作: 工作耗时次要体现在线程期待 io 操作的返回比方网络调用, 文件读写之类的, 这个时候 cpu 的利用率并没有失去充沛的应用, 所以实践上某种程度来说线程数应该是越多越好比方 cpu 数量的两倍.

对于参数的设置, 经验性较强, 因为你的服务器上不可能就运行你一个利用, 我认为只需明确什么样的工作类型怎么样去设置, 在这个大的思维下本人灵便变通就能够。

总结:

线程池的思维能够用大白话来解释就是公司项目组有 5 个程序员 (外围线程数) 平时的工作就是解决测试所提交的 bug, 测试会在一个 bug 管控平台 (阻塞队列) 上提交 bug 报告, 闲的时候 bug 没有很多程序员做在电脑前带薪摸摸鱼期待测试提交新的 bug(阻塞获取新的工作), 忙的时候五个程序员 996 都忙不过来啦,bug 管控平台都快因为提交 bug 的测试太多, 登录都登录不了啦, 这个时候老板说这样吧, 我去招几个外包过去, 外包过去了也帮着项目组在解决 bug, 然而这个时候还是有很多测试有 bug 要提交提交不下来, 项目组老大怒了, 说你们会不会用老子写的性能, 你这提的 (ie8 不能失常显示) 算什么 bug,滚蛋 (回绝策略) 我的项目开始闲下来了,bug 也没有那么多了外包也就卷铺盖走人了(线程超时销毁).

正文完
 0