之所以存在线程池是基于以下两个起因:

  1. 线程的创立和销毁是须要有资源耗费的,多线程环境下频繁创立、销毁线程会影响零碎性能
  2. 对于一个须要频繁创立工作、线程的利用来说,创立的工作数、线程数须要受到管制或治理

有了线程池,尤其是相似ThreadPoolExecutor这种能够通过参数调整其行为的线程池,能够近乎完满的解决上述两个问题。

线程池工作原理

简略来说线程池的工作原理就是:提前或者在执行工作的时候创立线程,执行完工作之后不销毁线程而是将线程偿还到线程池中,后续有工作提交上来之后就能够不再创立线程、而是由线程池中闲暇的线程执行工作。

这样一来就能够防止频繁创立和销毁线程,并且也能够控制线程池中线程的数量,同时如果提交工作的速度太快、线程池中的线程来不及执行工作的话,能够将工作放在队列中期待,等后面的工作执行实现、线程偿还到线程池中之后,再从队列中获取工作继续执行。

其实以上就是ThreadPoolExecutor性能的简略形容。

当然ThreadPoolExecutor的性能要比这个形容弱小的多也简单的多。咱们就从以下几个方面来详细分析一下ThreadPoolExecutor的性能和底层原理:

  1. 根本属性
  2. 工作队列
  3. 创立线程池
  4. 提交工作
  5. 执行工作
  6. 回绝工作
  7. 钩子函数

根本属性

corePoolSize&maximumPoolSize:ThreadPoolExecutor有外围线程数(corePoolSize)和最大线程数(maximumPoolSize)的概念,新工作提交后,如果以后线程数小于corePoolSize,即便线程池中有闲暇线程,ThreadPoolExecutor也会立刻创立一个线程去执行工作。如果以后线程数大于corePoolSize且小于maximumPoolSize,则只有队列满的状况下才会创立线程、否则工作入队列排队。

ctl:绑定了状态runState和线程数workerCount两个属性的AtomicInteger变量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

了解ctl的工作原理是读懂ThreadPoolExecutor源码的必要前提。

先看两个辅助变量:

 private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

COUNT_BITS是Integer.size-3=31-3=29,CAPACITY是1向左位移29位后减1,用二进制示意就是:

0001 1111 1111 1111 1111 1111 1111 1111

~CAPACITY用二进制示意就是:

1110 0000 0000 0000 0000 0000 0000 0000

// runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS; // Packing and unpacking ctl    private static int runStateOf(int c)     { return c & ~CAPACITY; }    private static int workerCountOf(int c)  { return c & CAPACITY; }    private static int ctlOf(int rs, int wc) { return rs | wc; }

runState有RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED等5个状态,压入ctl示意的时候全副要左移29位。意思是:ctl是依照2进制位来表白含意的,高位的3位用来示意状态runstate,低位的29位用来示意线程数workerCount。

ctl通过ctlOf函数(runState的理论状态值左移29位,挪动到高3位后,和workerCount按位或操作)失去。

runStateOf函数:ctl和~CAPACITY进行按位与,~CAPACITY的二级制示意为:

1110 0000 0000 0000 0000 0000 0000 0000

按位与操作失去的就是ctl的高3位,对应的就是runState。

workerCountOf函数:ctl和CAPACITY进行按位与,CAPACITY的二进制示意为:

0001 1111 1111 1111 1111 1111 1111 1111

按位与失去的就是低29位,对应的就是workerCount。

Keep-alive:如果以后线程数超过了corePoolSize,那么超出的线程如果闲暇工夫超过了keep-alive会被回收(terminate)。外围线程是没有超时概念也不会被回收的,然而能够通过设置allowCoreThreadTimeOut为true,使得外围线程也受到参数Keep-alive管制从而被回收。

工作队列

创立线程池的时候,通过ThreadPoolExutor构造方法指定工作队列,能够反对任何BlockingQueue。通过Executors工具创立ThreadPoolExutor的话,反对SynchronousQueue、LinkedBlockingQueue和 ArrayBlockingQueue三种阻塞队列。

工作队列是ThreadPoolExutor的重要参数,与corePoolSize和MaxPoolSize配合应用会创立出体现齐全不同的线程池:有界还是无界队列会影响到线程池接管工作的能力或体现,FIFO还是LIFO会影响到工作执行程序,等等。

创立线程池

ThreadPoolExecutor提供了4个构造方法,然而如果你不打算替换默认的ThreadFactory和RejectedExecutionHandler的话,最罕用的构造方法其实只有一个:

 public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }

调用到另外一个构造方法:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.acc = System.getSecurityManager() == null ?                null :                AccessController.getContext();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }

从构造方法中咱们能够看到,ThreadPoolExecutor创立后除了设置重要属性之外啥也没干,外围线程也并没有启动!

提交工作

默认状况下,ThreadPoolExecutor提交工作的过程也同时是创立线程的过程,因为缺省状况下ThreadPoolExecutor创立的时候并不创立线程。

ThreadPoolExecutor实现了Executor接口,Executor通过其惟一办法execute来提交工作。

ThreadPoolExecutor的execute办法接管一个Runable参数作为工作,依照如下逻辑实现工作的提交:

  1. 如果线程池的线程数量小于corePoolSize,则通过addWorker(command,true)创立并启动一个新线程来执行工作
  2. 否则,尝试将工作退出队列,如果胜利,再次查看线程池状态,如果线程池曾经进行运行则工作出队,回绝工作。再次查看如果以后线程数为0则调用addWorker(null,false)创立新线程
  3. 否则,超出外围线程数且队列满,如果尚未超出最大线程数则通过addWorker(command,false)创立新线程,否则超出最大线程数,回绝工作

源码比较简单,须要留神的是退出队列局部:

if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }

调用阻塞队列的非阻塞办法offer入队,想想为啥不必阻塞办法?

线程启动机会

默认状况下ThreadPoolExecutor创立之后不会启动任何线程,包含外围线程。所有线程都是在工作提交后启动。

能够通过调用prestartCoreThread()或prestartAllCoreThreads()随时启动一个或所有外围线程。

启动线程#addWorker

线程通过addWorker(Runnable firstTask, boolean core)办法启动。firstTask是该线程的第一个工作,firstTask=null示意只启动线程、无工作。core示意要启动的是外围线程、还是一般线程,用来判断线程数是否已达下限。

addWorker办法首先做必要的合法性判断:以后线程池状态,线程数是否已达下限等,满足启动条件则更新以后线程数WorkerCount。

而后创立线程对象Worker,取得锁,加锁操作:Worker退出workers缓存(worker存储以后线程池的所有尚未执行工作的线程)......操作实现之后,解锁,并启动线程

Worker对象

Worker是实现了Runnable接口的外部类,次要属性:

  1. thread:线程池的线程本尊
  2. firstTask: 线程创立时绑定的工作,该线程如果是工作提交的时候创立的,firstTask就是被提交的工作,如果线程创立胜利,则firstTask具备优先执行权
  3. completedTasks:以后线程实现执行的工作数

初始化办法通过线程工厂创立一个线程:

 Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }

this作为Runnable参数传递给Thread的构造方法,线程启动的时候就回调this的run办法,所以Worker的run办法就是线程池中的线程执行工作的入口办法。

执行工作Worker.run

run办法调用了runWorker办法:

  public void run() {            runWorker(this);        }

持续跟踪runWorker办法:

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            //无限执行firstTask,firstTask为空的话通过getTask()从队列中获取task            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    //调用工作执行前钩子函数                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        //执行工作                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        //调用工作执行后钩子函数                        afterExecute(task, thrown);                    }                } finally {                    //开释工作                    task = null;                    //以后线程执行工作数更新 +1                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            //线程池相干数据更新            processWorkerExit(w, completedAbruptly);        }    }

从源码能够看到,Worker的firstTask会失去以后线程的优先执行,因为代码中获取并执行工作的循环条件中的task的初始值就是Worker的firstTask:

while (task != null || (task = getTask()) != null) {

firstTask执行实现之后,开释工作(task=null)。线程持续运行,下次循环时会通过getTask办法从队列获取工作。这个动作相当于:线程执行完一个工作之后并没有完结或销毁,而是交还给线程池,通过getTask持续从队列领工作,领到工作后继续执行。

getTask办法

getTask办法从队列获取排队期待执行的工作。

private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            //如果以后线程池已进行,或者处于SHUTDOWN状态且队列为空则返回null            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            //线程数            int wc = workerCountOf(c);            // Are workers subject to culling?            //是否须要限定工夫            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            //如果超过最大线程数且队列空,或期待超时且(线程数>1或者队列空),返回null            if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;            }            try {                //如果须要限时,则通过限时参数从阻塞队列获取工作,否则不须要限时的话,阻塞直到获取到工作                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                //从队列拿到工作就返回该工作                if (r != null)                    return r;                //否则没有拿到工作,设置超时                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

咱们发现线程池的keepAliveTime参数就是在这个getTask办法中失效的。

如果线程数小于外围线程数,并且allowCoreThreadTimeOut设置为false的话,线程不受期待工作时长的限度,则采纳阻塞队列的take办法、无限期期待直到能够从队列中获取工作。

如果线程数大于外围线程数,或者参数设置外围线程也须要受到超时管制,就会设置获取工作的限时时长为keepAliveTime,如果在keepAliveTime工夫范畴内依然没有从阻塞队列中拿到工作,则返回null。

超过keepAliveTime时长没拿到工作将导致在runWorker办法的while循环满足完结条件而退出循环:

while (task != null || (task = getTask()) != null) {     //获取工作...    } finally {            processWorkerExit(w, completedAbruptly);        }

退出循环后,调用processWorkerExit办法完结线程、退出线程池。

keepAliveTime=0L的状况

如果设置keepAliveTime=0L,并且线程数超出外围线程数,会是什么状况?

上面这一行代码交代的清清楚楚了:

Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();

调用poll办法、等待时间为0去阻塞队列获取工作,能够去看一下阻塞队列的限定期待时长的poll办法的源码,期待时长为0则成果等同于非阻塞办法:获取不到数据立即返回null。

所以keepAliveTime=0L示意:超出外围线程数后,在执行完工作之后容许闲暇工夫为0!即:如果没有新的工作提交上来的话,只保留corePoolSize个线程持续留在线程池期待工作,其余线程立刻销毁、退出线程池。

回绝工作

采纳有界阻塞队列的线程池,在队列已满、且超出最大线程数后提交上来的工作会被回绝,回绝后的解决形式由ThreadPoolExecutor的回绝策略RejectedExecutionHandler决定。回绝策略在ThreadPoolExecutor创立时指定。次要包含:

  1. AbortPolicy:间接抛异样RejectedExecutionException
  2. CallerRunsPolicy:调用方解决,即交给提交execute办法的线程本人执行工作
  3. DiscardPolicy:间接扔掉该工作
  4. DiscardOldestPolicy:扔掉队列中等待时间最久的工作,执行当前任务
  5. 自定义:实现RejectedExecutionHandler接口,自定义回绝策略

钩子函数

工作执行前和工作执行后别离调用beforeExecute/afterExecute办法,这两个办法在ThreadPoolExecutor中默认都是哑实现,什么都没做。如果你既想要采纳ThreadPoolExecutor作为线程池、又想在工作执行前后做额定的动作,能够继承ThreadPoolExecutor并笼罩他的beforeExecute/afterExecute办法。

小结

线程池ThreadPoolExecutor源码剖析实现,其实从利用层面来讲,绝大部分线程池需要都能够通过ThreadPoolExecutor失去满足,而且能够利用Executors工具类创立满足各种不同场景的线程池。

Thanks a lot!

上一篇 BlockingQueue - ArrayBlockingQueue