线程池

池化技术的益处

  1. 升高资源耗费:能够反复利用已创立的线程升高线程创立和销毁造成的耗费。
  2. 进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。
  3. 进步线程的可管理性:线程是稀缺资源,如果无限度地创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行统一分配、调优和监控。

线程池的利用场景

  1. 服务器承受到大量申请时,应用线程池技术时十分适合的,它能够大大减少线程的创立和销毁次数,进步服务器的工作效率
  2. 实际上,在开发中,如果须要创立5个以上的线程,那么就能够应用线程池来治理

线程池的类关系图

线程池的结构器参数

参数名类型含意
corePoolSizeint外围线程数
maxPoolSizeint最大线程数
keepAliveTimelong放弃存活工夫
workQueueBlockingQueue工作存储队列
threadFactoryThreadFactory当线程池须要新的线程的时候,会应用threadFactory来生成新的线程
HandlerRejectedExecutionHandler因为线程池无奈承受所提交的工作的回绝策略

corePoolSize和maxPoolSize

  • corePoolSize指的是外围线程数:线程池在实现初始化后,默认状况下,还没有创立任何线程,线程池会期待有工作到来时,再创立新线程去执行工作,直到达到外围线程数,之后外围线程会始终放弃这个数量;当工作数量超过外围线程数,将工作放在阻塞队列workQueue中,期待外围线程闲暇后处理
  • 如果外围线程全副在工作中,而且队列也满了,线程池就会在外围线程的根底上,额定减少一些线程,这些新减少的线程数最大下限就是maxPoolSize

线程创立规定

  1. 如果线程数小于corePoolSize, 即便其余线程处于闲暇状态,也会创立一个新线程(外围线程)来运行新工作
  2. 如果线程数等于(或大于)corePoolSize但少于maxPoolSize,则将工作放入队列
  3. 如果队列已满,并且线程数小于maxPoolSize,则创立一个新线程来运行工作
  4. 如果队列已满,并且线程数大于或等于maxPoolSize则回绝该工作

增减线程的特点

  1. 通过设置corePoolSize和maxPoolSize为雷同数量,就能够创立固定大小的线程池,即便队列满了也不会在拓展线程
  2. 线程池心愿放弃较少的线程数,并且只有在负载变得很大时才减少它,这就是队列的用意
  3. 通过设置maxPoolSize为很高的只,例如Integer.MAX_VALUE,能够容许线程池包容任意数量的并发工作
  4. 是只有在队列填满时才创立多于corePoolSize的线程,所以如果应用无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize

keepAliveTime

闲暇的非核心线程的存活工夫,用于回收线程

  • 如果线程池以后的线程数多于corePoolSize,那么如果多余的线程闲暇工夫超过keepAliveTime,它们就会被终止

ThreadFactory

线程工厂,用于创立线程

  • 新的线程是由ThreadFactory创立的,默认应用的线程工厂是Executors.defaultThreadFactory(),创立进去的线程都在同一个线程组,领有同样的NORM_PRIORITY优先级并且都不是守护线程;如果本人定义ThreadFactory,那么就能够扭转线程名,线程组,优先级,是否是守护线程等
  • 通常应用默认的就能够,源码如下:

workQueue

有三种最常见的队列类型:

  1. 间接交接: SynchronousQueue 无容量
  2. 无界队列: LinkedBlockingQueue 有限容量,有内存溢出的危险
  3. 有界队列: ArrayBlockingQueue 可设置容量

ThreadPoolExecutor的启动

/***  -通过 new 创立线程池时, 除非调用 prestartAllCoreThreads / prestartCoreThread 办法启动外围线程,*  -否则即便工作队列中存在工作,同样也不会执行.*/ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 20, 3L, TimeUnit.SECONDS, linkedBlockingDeque);        /** * Starts all core threads, causing them to idly wait for work. This* overrides the default policy of starting core threads only when* new tasks are executed.*    -启动所有外围线程,让它们无闲暇的期待工作。 这将笼罩仅在执行新工作时启动外围线程的默认策略。* -手动启动线程池.* @return the number of threads started*/threadPoolExecutor.prestartAllCoreThreads();

JDK内置线程池

线程池应该手动创立还是主动创立

手动创立,能够让咱们更加明确线程池的容许规定,防止资源耗尽的危险

主动创立,也就是间接调用JDK封装号的构造函数,可能会带来一些问题:

Executors.newFixedThreadPool(int nThreads)数量固定的线程池

 public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }

corePoolSize和maxPoolSize被设置为雷同的nThreads参数,并应用了无界队列LinkedBlockingQueue,不会拓展线程所以也没有存活工夫

当工作在队列中沉积过多,可能就会造成OOM

Executors.newSingleThreadExecutor()只有一个线程的线程池

public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>()));    }

Executors.newCachedThreadPool() 可缓存线程

无界限程池,具备主动回收多余线程的性能

public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());    }

最大的线程数被设置为Integer.MAX_VALUE,线程闲暇60秒后回收,不应用队列(SynchronousQueue)

Executors.newScheduledTreadPool()

反对定时及周期性工作执行的线程池,应用提早队列(DelayedWorkQueue)

public static void main(String[] args) {        ScheduledExecutorService threadPool =                Executors.newScheduledThreadPool(10);        //提早5秒执行工作        threadPool.schedule(new EveryTaskOneThread.Task(),5, TimeUnit.SECONDS);        //1秒之后每个3秒执行一次工作        threadPool.scheduleAtFixedRate(new EveryTaskOneThread.Task(),                1, 3,TimeUnit.SECONDS);    }

所以,还是更具业务的并发量手动创立线程池吧

JDK1.8后退出workStealingPool

  • 子工作
  • 窃取

线程数量怎么设定?

  • CPU密集型(加密,即便hash等) : 最佳线程数为CPU外围数的1-2倍左右
  • 耗时I/O型(读写数据库,文件,网络传输等): 最佳线程数个别会大于CPU外围数很多倍,以JVM线程监控显示忙碌状况为根据,保障线程闲暇能够连接上,参考Brain Goetz举荐的计算方法:

    ==线程数=CPU外围数 * (1+均匀等待时间/均匀工作工夫))==

  • 实际上最靠谱的还是通过压力测试得出适合的线程数

进行线程池的正确形式

  • shutdown 执行该办法后,线程池会将以后队列中的工作执行结束,并且在次期间回绝新工作进入,执行完后进行线程池
    public static void main(String[] args) throws InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        for (int i = 0; i < 1000; i++){            executorService.execute(new ShutDownTask());        }        System.out.println(executorService.isShutdown());        Thread.sleep(1500);        executorService.shutdown();        //是否进入进行状态        System.out.println(executorService.isShutdown());        //回绝新工作        executorService.execute(new ShutDownTask());        //是否真正意义上的敞开        System.out.println(executorService.isTerminated());    }    static class ShutDownTask implements Runnable {        @Override        public void run() {            try {                Thread.sleep(500);                System.out.println(Thread.currentThread().getName());            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }

awaitTermination(timeout):在一段时间内所有工作是否被执行结束

  • shutdownNow 将所有线程中断,并且队列中还未执行的工作作为一个列表返回
public static void main(String[] args) throws InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        for (int i = 0; i < 1000; i++){            executorService.execute(new ShutDownTask());        }        System.out.println(executorService.isShutdown());        Thread.sleep(1500);        //发送中断信号,并返回runnableList        List<Runnable> runnableList =                executorService.shutdownNow();    }    static class ShutDownTask implements Runnable {        @Override        public void run() {            try {                Thread.sleep(500);                System.out.println(Thread.currentThread().getName());            } catch (InterruptedException e) {                System.out.println(Thread.currentThread().getName() + "被中断了");            }        }    }

工作回绝策略

  • 回绝机会

    1. 当Executor敞开(shutdown)时,提交新工作会被回绝
    2. 当Executor对最大线程和队列容量应用有限度并且曾经饱和时

4种回绝策略

  • AbortPolicy: 默认,间接抛出RejectedExecutionException回绝异样
  • DiscardPolicy: 默默的把被回绝的工作抛弃
  • DiscardOldestPolicy: 当有新工作时,会抛弃工作队列中存在最久的老工作,以腾出地位给新工作
  • CallerRunsPolicy: 将被线程池回绝的工作交给调用者(caller)主线程去执行

钩子办法

每个工作执行前后能够减少解决(日志,统计)

/** * 演示每个工作执行前后都能够放钩子函数 */public class PauseableTreadPool extends ThreadPoolExecutor {    private final ReentrantLock lock = new ReentrantLock();    private Condition unpaused = lock.newCondition();    //标记线程是否处于暂停状态    private boolean isPaused;    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);    }    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);    }    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);    }    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);    }    //重写办法 before钩子    @Override    protected void beforeExecute(Thread t, Runnable r) {        super.beforeExecute(t, r);        lock.lock();        try {            while(isPaused) {                unpaused.await();            }        } catch (InterruptedException e) {            e.printStackTrace();        }finally {            lock.unlock();        }    }    //暂停办法    private void pause() {        lock.lock();        try {            isPaused = true;        } finally {            lock.unlock();        }    }    //复原办法    private void resume() {        lock.lock();        try{            isPaused = false;            //唤醒全副            unpaused.signalAll();        } finally {            lock.unlock();        }    }    public static void main(String[] args) throws InterruptedException {        PauseableTreadPool pauseableTreadPool =                new PauseableTreadPool(10, 20,                10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());        Runnable runnable = new Runnable() { //线程体            @Override            public void run() {                System.out.println("被执行");                try {                    Thread.sleep(10);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };        for (int i = 0; i < 10000; i++){            pauseableTreadPool.execute(runnable);        }        Thread.sleep(1500);        pauseableTreadPool.pause();        System.out.println("线程池被暂停");        Thread.sleep(1500);        pauseableTreadPool.resume();        System.out.println("线程池已复原");    }}

原理&源码剖析

次要剖析ThreadPoolExecutor

线程池的组成部分

  • 线程池管理器 ExecutorService控制线程池的启动和进行
  • 工作线程 ThreadPoolExecutor中的外部类Worker
  • 工作队列 线程平安的BlockingQueue<Runnable> workQueue;
  • 工作接口(Task)

    /** * Creates with given first task and thread from ThreadFactory.

*/
Worker(Runnable firstTask) {

  setState(-1); // inhibit interrupts until runWorker  this.firstTask = firstTask;  this.thread = getThreadFactory().newThread(this);

}

**线程池事项工作复用的原理**- 用雷同的线程执行不同的工作**ThreadPoolExecutor中的execute办法**
/** * 在未来的某个工夫执行给定的工作,工作能够在新线程或池中现有的线程中执行 * 如果无奈将工作提交执行,起因之一是执行器已敞开或因为其容量已满,该工作由以后的 RejectedExecutionHandler 解决 * @param command 要执行的工作 *  */public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    /*     * Proceed in 3 steps:     *     * 1. 如果工作线程数量少于corePoolSize,尝试调用addWorker以给定的command启动一个新线程     *     * 2.如果一个工作能够胜利排队,那么咱们依然须要     * 仔细检查咱们是否应该增加线程     * (因为现有的自上次查看后死亡)或     * 自从进入此办法以来,该池已敞开。 所以咱们     * 从新查看状态,并在必要时回退排队     * 进行,如果没有,则启动一个新线程。     *     * 3.如果咱们无奈将工作排队,那么咱们尝试增加一个新的     * 线程。 如果失败,咱们晓得咱们曾经敞开或饱和     * 并因而回绝工作。     */    int c = ctl.get(); //ctl记录了线程池状态和线程数    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try { //循环获取工作执行        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                beforeExecute(wt, task);                Throwable thrown = null;                try {                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}
## 线程池的状态- RUNNING: 承受型工作并解决排队工作- SHUTDOWN: 不接受任务,但解决排队工作- STOP: 不承受新工作,也不解决排队工作,并中断正在进行的工作- TIDYING(整洁): 所有工作都已终止, workerCount为0时,线程会转换到TIDYING状态, 并将运行 terminate() 钩子办法- TERMINATED: terminate() 运行实现

// runState is stored in the high-order bits

private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;
## 应用线程池的留神点- 防止工作的沉积   FixedThreadPool  SingleThreadExecutor   工作队列长度过大, 可能会沉积大量的申请, 从而导致OOM.- 防止线程数适度减少  CachedThreadPool ScheduledThreadPool  容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。 - 排查线程透露  线程曾经执行结束,却没有正确的被回收,往往是工作的逻辑问题