关于java:线程池-ThreadPoolExecutor源码分析

43次阅读

共计 8668 个字符,预计需要花费 22 分钟才能阅读完成。

之所以存在线程池是基于以下两个起因:

  1. 线程的创立和销毁是须要有资源耗费的,多线程环境下频繁创立、销毁线程会影响零碎性能
  2. 对于一个须要频繁创立工作、线程的利用来说,创立的工作数、线程数须要受到管制或治理

有了线程池,尤其是相似 ThreadPoolExecutor 这种能够通过参数调整其行为的线程池,能够近乎完满的解决上述两个问题。

线程池工作原理

简略来说线程池的工作原理就是:提前或者在执行工作的时候创立线程,执行完工作之后不销毁线程而是将线程偿还到线程池中,后续有工作提交上来之后就能够不再创立线程、而是由线程池中闲暇的线程执行工作。

这样一来就能够防止频繁创立和销毁线程,并且也能够控制线程池中线程的数量,同时如果提交工作的速度太快、线程池中的线程来不及执行工作的话,能够将工作放在队列中期待,等后面的工作执行实现、线程偿还到线程池中之后,再从队列中获取工作继续执行。

其实以上就是 ThreadPoolExecutor 性能的简略形容。

当然 ThreadPoolExecutor 的性能要比这个形容弱小的多也简单的多。咱们就从以下几个方面来详细分析一下 ThreadPoolExecutor 的性能和底层原理:

  1. 根本属性
  2. 工作队列
  3. 创立线程池
  4. 提交工作
  5. 执行工作
  6. 回绝工作
  7. 钩子函数

根本属性

corePoolSize&maximumPoolSize:ThreadPoolExecutor 有外围线程数(corePoolSize)和最大线程数(maximumPoolSize)的概念,新工作提交后,如果以后线程数小于 corePoolSize,即便线程池中有闲暇线程,ThreadPoolExecutor 也会立刻创立一个线程去执行工作。如果以后线程数大于 corePoolSize 且小于 maximumPoolSize,则只有队列满的状况下才会创立线程、否则工作入队列排队。

ctl:绑定了状态 runState 和线程数 workerCount 两个属性的 AtomicInteger 变量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

了解 ctl 的工作原理是读懂 ThreadPoolExecutor 源码的必要前提。

先看两个辅助变量:

 private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

COUNT_BITS 是 Integer.size-3=31-3=29,CAPACITY 是 1 向左位移 29 位后减 1,用二进制示意就是:

0001 1111 1111 1111 1111 1111 1111 1111

~CAPACITY 用二进制示意就是:

1110 0000 0000 0000 0000 0000 0000 0000

// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

 // Packing and unpacking ctl
    private static int runStateOf(int c)     {return c & ~CAPACITY;}
    private static int workerCountOf(int c)  {return c & CAPACITY;}
    private static int ctlOf(int rs, int wc) {return rs | wc;}

runState 有 RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED 等 5 个状态,压入 ctl 示意的时候全副要左移 29 位。意思是:ctl 是依照 2 进制位来表白含意的,高位的 3 位用来示意状态 runstate,低位的 29 位用来示意线程数 workerCount。

ctl 通过 ctlOf 函数(runState 的理论状态值左移 29 位,挪动到高 3 位后,和 workerCount 按位或操作)失去。

runStateOf 函数:ctl 和~CAPACITY 进行按位与,~CAPACITY 的二级制示意为:

1110 0000 0000 0000 0000 0000 0000 0000

按位与操作失去的就是 ctl 的高 3 位,对应的就是 runState。

workerCountOf 函数:ctl 和 CAPACITY 进行按位与,CAPACITY 的二进制示意为:

0001 1111 1111 1111 1111 1111 1111 1111

按位与失去的就是低 29 位,对应的就是 workerCount。

Keep-alive:如果以后线程数超过了 corePoolSize,那么超出的线程如果闲暇工夫超过了 keep-alive 会被回收(terminate)。外围线程是没有超时概念也不会被回收的,然而能够通过设置 allowCoreThreadTimeOut 为 true,使得外围线程也受到参数 Keep-alive 管制从而被回收。

工作队列

创立线程池的时候,通过 ThreadPoolExutor 构造方法指定工作队列,能够反对任何 BlockingQueue。通过 Executors 工具创立 ThreadPoolExutor 的话,反对 SynchronousQueue、LinkedBlockingQueue 和 ArrayBlockingQueue 三种阻塞队列。

工作队列是 ThreadPoolExutor 的重要参数,与 corePoolSize 和 MaxPoolSize 配合应用会创立出体现齐全不同的线程池:有界还是无界队列会影响到线程池接管工作的能力或体现,FIFO 还是 LIFO 会影响到工作执行程序,等等。

创立线程池

ThreadPoolExecutor 提供了 4 个构造方法,然而如果你不打算替换默认的 ThreadFactory 和 RejectedExecutionHandler 的话,最罕用的构造方法其实只有一个:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

调用到另外一个构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

从构造方法中咱们能够看到,ThreadPoolExecutor 创立后除了设置重要属性之外啥也没干,外围线程也并没有启动!

提交工作

默认状况下,ThreadPoolExecutor 提交工作的过程也同时是创立线程的过程,因为缺省状况下 ThreadPoolExecutor 创立的时候并不创立线程。

ThreadPoolExecutor 实现了 Executor 接口,Executor 通过其惟一办法 execute 来提交工作。

ThreadPoolExecutor 的 execute 办法接管一个 Runable 参数作为工作,依照如下逻辑实现工作的提交:

  1. 如果线程池的线程数量小于 corePoolSize,则通过 addWorker(command,true)创立并启动一个新线程来执行工作
  2. 否则,尝试将工作退出队列,如果胜利,再次查看线程池状态,如果线程池曾经进行运行则工作出队,回绝工作。再次查看如果以后线程数为 0 则调用 addWorker(null,false)创立新线程
  3. 否则,超出外围线程数且队列满,如果尚未超出最大线程数则通过 addWorker(command,false)创立新线程,否则超出最大线程数,回绝工作

源码比较简单,须要留神的是退出队列局部:

if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

调用阻塞队列的非阻塞办法 offer 入队,想想为啥不必阻塞办法?

线程启动机会

默认状况下 ThreadPoolExecutor 创立之后不会启动任何线程,包含外围线程。所有线程都是在工作提交后启动。

能够通过调用 prestartCoreThread()或 prestartAllCoreThreads()随时启动一个或所有外围线程。

启动线程 #addWorker

线程通过 addWorker(Runnable firstTask, boolean core)办法启动。firstTask 是该线程的第一个工作,firstTask=null 示意只启动线程、无工作。core 示意要启动的是外围线程、还是一般线程,用来判断线程数是否已达下限。

addWorker 办法首先做必要的合法性判断:以后线程池状态,线程数是否已达下限等,满足启动条件则更新以后线程数 WorkerCount。

而后创立线程对象 Worker,取得锁,加锁操作:Worker 退出 workers 缓存(worker 存储以后线程池的所有尚未执行工作的线程)…… 操作实现之后,解锁, 并 启动线程

Worker 对象

Worker 是实现了 Runnable 接口的外部类,次要属性:

  1. thread:线程池的线程本尊
  2. firstTask:线程创立时绑定的工作,该线程如果是工作提交的时候创立的,firstTask 就是被提交的工作,如果线程创立胜利,则 firstTask 具备优先执行权
  3. completedTasks:以后线程实现执行的工作数

初始化办法通过线程工厂创立一个线程:

 Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

this 作为 Runnable 参数传递给 Thread 的构造方法,线程启动的时候就回调 this 的 run 办法,所以 Worker 的 run 办法就是线程池中的线程执行工作的入口办法。

执行工作 Worker.run

run 办法调用了 runWorker 办法:

  public void run() {runWorker(this);
        }

持续跟踪 runWorker 办法:

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {// 无限执行 firstTask,firstTask 为空的话通过 getTask()从队列中获取 task
            while (task != null || (task = getTask()) != null) {w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 调用工作执行前钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行工作
                        task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {
                        // 调用工作执行后钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 开释工作
                    task = null;
                    // 以后线程执行工作数更新 +1
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {
            // 线程池相干数据更新
            processWorkerExit(w, completedAbruptly);
        }
    }

从源码能够看到,Worker 的 firstTask 会失去以后线程的优先执行,因为代码中获取并执行工作的循环条件中的 task 的初始值就是 Worker 的 firstTask:

while (task != null || (task = getTask()) != null) {

firstTask 执行实现之后,开释工作(task=null)。线程持续运行,下次循环时会通过 getTask 办法从队列获取工作。这个动作相当于:线程执行完一个工作之后并没有完结或销毁,而是交还给线程池,通过 getTask 持续从队列领工作,领到工作后继续执行。

getTask 办法

getTask 办法从队列获取排队期待执行的工作。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?

        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果以后线程池已进行,或者处于 SHUTDOWN 状态且队列为空则返回 null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                return null;
            }
            // 线程数
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 是否须要限定工夫
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果超过最大线程数且队列空,或期待超时且(线程数 >1 或者队列空),返回 null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 如果须要限时,则通过限时参数从阻塞队列获取工作,否则不须要限时的话,阻塞直到获取到工作
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 从队列拿到工作就返回该工作
                if (r != null)
                    return r;
                // 否则没有拿到工作,设置超时
                timedOut = true;
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

咱们发现线程池的 keepAliveTime 参数就是在这个 getTask 办法中失效的。

如果线程数小于外围线程数,并且 allowCoreThreadTimeOut 设置为 false 的话,线程不受期待工作时长的限度,则采纳阻塞队列的 take 办法、无限期期待直到能够从队列中获取工作。

如果线程数大于外围线程数,或者参数设置外围线程也须要受到超时管制,就会设置获取工作的限时时长为 keepAliveTime,如果在 keepAliveTime 工夫范畴内依然没有从阻塞队列中拿到工作,则返回 null。

超过 keepAliveTime 时长没拿到工作将导致在 runWorker 办法的 while 循环满足完结条件而退出循环:

while (task != null || (task = getTask()) != null) {// 获取工作...} finally {processWorkerExit(w, completedAbruptly);
        }

退出循环后,调用 processWorkerExit 办法完结线程、退出线程池。

keepAliveTime=0L 的状况

如果设置 keepAliveTime=0L,并且线程数超出外围线程数,会是什么状况?

上面这一行代码交代的清清楚楚了:

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

调用 poll 办法、等待时间为 0 去阻塞队列获取工作,能够去看一下阻塞队列的限定期待时长的 poll 办法的源码,期待时长为 0 则成果等同于非阻塞办法:获取不到数据立即返回 null。

所以 keepAliveTime=0L 示意:超出外围线程数后,在执行完工作之后容许闲暇工夫为 0!即:如果没有新的工作提交上来的话,只保留 corePoolSize 个线程持续留在线程池期待工作,其余线程立刻销毁、退出线程池。

回绝工作

采纳有界阻塞队列的线程池,在队列已满、且超出最大线程数后提交上来的工作会被回绝,回绝后的解决形式由 ThreadPoolExecutor 的回绝策略 RejectedExecutionHandler 决定。回绝策略在 ThreadPoolExecutor 创立时指定。次要包含:

  1. AbortPolicy:间接抛异样 RejectedExecutionException
  2. CallerRunsPolicy:调用方解决,即交给提交 execute 办法的线程本人执行工作
  3. DiscardPolicy:间接扔掉该工作
  4. DiscardOldestPolicy:扔掉队列中等待时间最久的工作,执行当前任务
  5. 自定义:实现 RejectedExecutionHandler 接口,自定义回绝策略

钩子函数

工作执行前和工作执行后别离调用 beforeExecute/afterExecute 办法,这两个办法在 ThreadPoolExecutor 中默认都是哑实现,什么都没做。如果你既想要采纳 ThreadPoolExecutor 作为线程池、又想在工作执行前后做额定的动作,能够继承 ThreadPoolExecutor 并笼罩他的 beforeExecute/afterExecute 办法。

小结

线程池 ThreadPoolExecutor 源码剖析实现,其实从利用层面来讲,绝大部分线程池需要都能够通过 ThreadPoolExecutor 失去满足,而且能够利用 Executors 工具类创立满足各种不同场景的线程池。

Thanks a lot!

上一篇 BlockingQueue – ArrayBlockingQueue

正文完
 0