乐趣区

关于java:Java线程池学习笔记

线程池

池化技术的益处

  1. 升高资源耗费:能够反复利用已创立的线程升高线程创立和销毁造成的耗费。
  2. 进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。
  3. 进步线程的可管理性:线程是稀缺资源,如果无限度地创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行统一分配、调优和监控。

线程池的利用场景

  1. 服务器承受到大量申请时, 应用线程池技术时十分适合的, 它能够大大减少线程的创立和销毁次数, 进步服务器的工作效率
  2. 实际上, 在开发中, 如果须要创立 5 个以上的线程, 那么就能够应用线程池来治理

线程池的类关系图

线程池的结构器参数

参数名 类型 含意
corePoolSize int 外围线程数
maxPoolSize int 最大线程数
keepAliveTime long 放弃存活工夫
workQueue BlockingQueue 工作存储队列
threadFactory ThreadFactory 当线程池须要新的线程的时候, 会应用 threadFactory 来生成新的线程
Handler RejectedExecutionHandler 因为线程池无奈承受所提交的工作的回绝策略

corePoolSize 和 maxPoolSize

  • corePoolSize指的是外围线程数: 线程池在实现初始化后, 默认状况下, 还没有创立任何线程, 线程池会期待有工作到来时, 再创立新线程去执行工作, 直到达到外围线程数, 之后外围线程会始终放弃这个数量; 当工作数量超过外围线程数, 将工作放在阻塞队列 workQueue 中, 期待外围线程闲暇后处理
  • 如果外围线程全副在工作中, 而且队列也满了, 线程池就会在外围线程的根底上, 额定减少一些线程, 这些新减少的线程数最大下限就是maxPoolSize

线程创立规定

  1. 如果线程数小于 corePoolSize, 即便其余线程处于闲暇状态, 也会创立一个新线程 (外围线程) 来运行新工作
  2. 如果线程数等于(或大于)corePoolSize 但少于 maxPoolSize, 则将工作放入队列
  3. 如果队列已满, 并且线程数小于 maxPoolSize, 则创立一个新线程来运行工作
  4. 如果队列已满, 并且线程数大于或等于 maxPoolSize 则回绝该工作

增减线程的特点

  1. 通过设置 corePoolSize 和 maxPoolSize 为雷同数量, 就能够创立固定大小的线程池, 即便队列满了也不会在拓展线程
  2. 线程池心愿放弃较少的线程数, 并且只有在负载变得很大时才减少它, 这就是队列的用意
  3. 通过设置 maxPoolSize 为很高的只, 例如 Integer.MAX_VALUE, 能够容许线程池包容任意数量的并发工作
  4. 是只有在队列填满时才创立多于 corePoolSize 的线程, 所以如果应用无界队列(例如 LinkedBlockingQueue), 那么线程数就不会超过 corePoolSize

keepAliveTime

闲暇的非核心线程的存活工夫, 用于回收线程

  • 如果线程池以后的线程数多于 corePoolSize, 那么如果多余的线程闲暇工夫超过 keepAliveTime, 它们就会被终止

ThreadFactory

线程工厂, 用于创立线程

  • 新的线程是由 ThreadFactory 创立的, 默认应用的线程工厂是 Executors.defaultThreadFactory(), 创立进去的线程都在同一个线程组, 领有同样的NORM_PRIORITY 优先级并且都不是守护线程; 如果本人定义 ThreadFactory, 那么就能够扭转线程名, 线程组, 优先级, 是否是守护线程等
  • 通常应用默认的就能够, 源码如下:

workQueue

有三种最常见的队列类型:

  1. 间接交接: SynchronousQueue 无容量
  2. 无界队列: LinkedBlockingQueue 有限容量, 有内存溢出的危险
  3. 有界队列: ArrayBlockingQueue 可设置容量

ThreadPoolExecutor 的启动

/**
*  - 通过 new 创立线程池时, 除非调用 prestartAllCoreThreads / prestartCoreThread 办法启动外围线程,
*  - 否则即便工作队列中存在工作, 同样也不会执行.
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 20, 3L, TimeUnit.SECONDS, linkedBlockingDeque);
        
/**
 * Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*    - 启动所有外围线程,让它们无闲暇的期待工作。这将笼罩仅在执行新工作时启动外围线程的默认策略。* - 手动启动线程池.
* @return the number of threads started
*/
threadPoolExecutor.prestartAllCoreThreads();

JDK 内置线程池

线程池应该手动创立还是主动创立

手动创立, 能够让咱们更加明确线程池的容许规定, 防止资源耗尽的危险

主动创立, 也就是间接调用 JDK 封装号的构造函数, 可能会带来一些问题:

Executors.newFixedThreadPool(int nThreads)数量固定的线程池

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

corePoolSize 和 maxPoolSize 被设置为雷同的 nThreads 参数, 并应用了无界队列 LinkedBlockingQueue, 不会拓展线程所以也没有存活工夫

当工作在队列中沉积过多, 可能就会造成 OOM

Executors.newSingleThreadExecutor()只有一个线程的线程池

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

Executors.newCachedThreadPool() 可缓存线程

无界限程池, 具备主动回收多余线程的性能

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

最大的线程数被设置为 Integer.MAX_VALUE, 线程闲暇 60 秒后回收, 不应用队列(SynchronousQueue)

Executors.newScheduledTreadPool()

反对定时及周期性工作执行的线程池, 应用提早队列(DelayedWorkQueue)

public static void main(String[] args) {
        ScheduledExecutorService threadPool =
                Executors.newScheduledThreadPool(10);

        // 提早 5 秒执行工作
        threadPool.schedule(new EveryTaskOneThread.Task(),5, TimeUnit.SECONDS);
        // 1 秒之后每个 3 秒执行一次工作
        threadPool.scheduleAtFixedRate(new EveryTaskOneThread.Task(),
                1, 3,TimeUnit.SECONDS);
    }

所以, 还是更具业务的并发量手动创立线程池吧

JDK1.8 后退出workStealingPool

  • 子工作
  • 窃取

线程数量怎么设定?

  • CPU 密集型(加密, 即便 hash 等) : 最佳线程数为 CPU 外围数的 1 - 2 倍左右
  • 耗时 I / O 型(读写数据库, 文件, 网络传输等): 最佳线程数个别会大于 CPU 外围数很多倍, 以 JVM 线程监控显示忙碌状况为根据, 保障线程闲暇能够连接上, 参考 Brain Goetz 举荐的计算方法:

    == 线程数 =CPU 外围数 * (1+ 均匀等待时间 / 均匀工作工夫))==

  • 实际上最靠谱的还是通过压力测试得出适合的线程数

进行线程池的正确形式

  • shutdown 执行该办法后, 线程池会将以后队列中的工作执行结束, 并且在次期间回绝新工作进入, 执行完后进行线程池
    public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++){executorService.execute(new ShutDownTask());
        }
        System.out.println(executorService.isShutdown());
        Thread.sleep(1500);
        executorService.shutdown();
        // 是否进入进行状态
        System.out.println(executorService.isShutdown());
        // 回绝新工作
        executorService.execute(new ShutDownTask());
        // 是否真正意义上的敞开
        System.out.println(executorService.isTerminated());
    }

    static class ShutDownTask implements Runnable {

        @Override
        public void run() {
            try {Thread.sleep(500);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }

awaitTermination(timeout): 在一段时间内所有工作是否被执行结束

  • shutdownNow 将所有线程中断, 并且队列中还未执行的工作作为一个列表返回
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++){executorService.execute(new ShutDownTask());
        }
        System.out.println(executorService.isShutdown());
        Thread.sleep(1500);
        // 发送中断信号, 并返回 runnableList
        List<Runnable> runnableList =
                executorService.shutdownNow();}

    static class ShutDownTask implements Runnable {

        @Override
        public void run() {
            try {Thread.sleep(500);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "被中断了");
            }
        }
    }

工作回绝策略

  • 回绝机会

    1. 当 Executor 敞开 (shutdown) 时, 提交新工作会被回绝
    2. 当 Executor 对最大线程和队列容量应用有限度并且曾经饱和时

4 种回绝策略

  • AbortPolicy: 默认, 间接抛出 RejectedExecutionException 回绝异样
  • DiscardPolicy: 默默的把被回绝的工作抛弃
  • DiscardOldestPolicy: 当有新工作时, 会抛弃工作队列中存在最久的老工作, 以腾出地位给新工作
  • CallerRunsPolicy: 将被线程池回绝的工作交给调用者 (caller) 主线程去执行

钩子办法

每个工作执行前后能够减少解决(日志, 统计)

/**
 * 演示每个工作执行前后都能够放钩子函数
 */
public class PauseableTreadPool extends ThreadPoolExecutor {private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();

    // 标记线程是否处于暂停状态
    private boolean isPaused;

    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    // 重写办法 before 钩子
    @Override
    protected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);
        lock.lock();
        try {while(isPaused) {unpaused.await();
            }
        } catch (InterruptedException e) {e.printStackTrace();
        }finally {lock.unlock();
        }
    }

    // 暂停办法
    private void pause() {lock.lock();
        try {isPaused = true;} finally {lock.unlock();
        }
    }
    // 复原办法
    private void resume() {lock.lock();
        try{
            isPaused = false;
            // 唤醒全副
            unpaused.signalAll();} finally {lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableTreadPool pauseableTreadPool =
                new PauseableTreadPool(10, 20,
                10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());

        Runnable runnable = new Runnable() { // 线程体
            @Override
            public void run() {System.out.println("被执行");
                try {Thread.sleep(10);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++){pauseableTreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableTreadPool.pause();
        System.out.println("线程池被暂停");
        Thread.sleep(1500);
        pauseableTreadPool.resume();
        System.out.println("线程池已复原");
    }
}

原理 & 源码剖析

次要剖析 ThreadPoolExecutor

线程池的组成部分

  • 线程池管理器 ExecutorService 控制线程池的启动和进行
  • 工作线程 ThreadPoolExecutor 中的外部类 Worker
  • 工作队列 线程平安的BlockingQueue<Runnable> workQueue;
  • 工作接口(Task)

    /**
     * Creates with given first task and thread from ThreadFactory.

*/
Worker(Runnable firstTask) {

  setState(-1); // inhibit interrupts until runWorker
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this);

}


** 线程池事项工作复用的原理 **

- 用雷同的线程执行不同的工作

**ThreadPoolExecutor 中的 execute 办法 **
/**
 * 在未来的某个工夫执行给定的工作, 工作能够在新线程或池中现有的线程中执行
 * 如果无奈将工作提交执行, 起因之一是执行器已敞开或因为其容量已满, 该工作由以后的 RejectedExecutionHandler 解决
 * @param command 要执行的工作
 * 
 */
public void execute(Runnable command) {if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. 如果工作线程数量少于 corePoolSize, 尝试调用 addWorker 以给定的 command 启动一个新线程
     *
     * 2. 如果一个工作能够胜利排队,那么咱们依然须要
     * 仔细检查咱们是否应该增加线程
     *(因为现有的自上次查看后死亡)或
     * 自从进入此办法以来,该池已敞开。所以咱们
     * 从新查看状态,并在必要时回退排队
     * 进行,如果没有,则启动一个新线程。*
     * 3. 如果咱们无奈将工作排队,那么咱们尝试增加一个新的
     * 线程。如果失败,咱们晓得咱们曾经敞开或饱和
     * 并因而回绝工作。*/
    int c = ctl.get(); //ctl 记录了线程池状态和线程数
    if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
            return;
        c = ctl.get();}
    if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try { // 循环获取工作执行
        while (task != null || (task = getTask()) != null) {w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {beforeExecute(wt, task);
                Throwable thrown = null;
                try {task.run();
                } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();}
        }
        completedAbruptly = false;
    } finally {processWorkerExit(w, completedAbruptly);
    }
}

## 线程池的状态

- RUNNING: 承受型工作并解决排队工作
- SHUTDOWN: 不接受任务, 但解决排队工作
- STOP: 不承受新工作, 也不解决排队工作, 并中断正在进行的工作
- TIDYING(整洁): 所有工作都已终止, workerCount 为 0 时, 线程会转换到 TIDYING 状态, 并将运行 terminate() 钩子办法
- TERMINATED: terminate() 运行实现

// runState is stored in the high-order bits

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

## 应用线程池的留神点

- 防止工作的沉积

   FixedThreadPool  SingleThreadExecutor
   工作队列长度过大, 可能会沉积大量的申请, 从而导致 OOM.

- 防止线程数适度减少

  CachedThreadPool ScheduledThreadPool
  容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。- 排查线程透露

  线程曾经执行结束, 却没有正确的被回收, 往往是工作的逻辑问题
退出移动版