乐趣区

Java-并发编程系列ThreadPoolExecutor-的那些事儿

线程池基础知识

ThreadPoolExecutor : 一个线程池
Executors : 线程池工厂,通过该类可以取得一个拥有特定功能的线程池
ThreadPoolExecutor 类实现了 Executor 接口,因此通过这个接口,任何的 Runnable 对象都可以被 ThreadPoolExecutor 线程池调度。
常见的线程池类型

  • public static ExecutorService newFixedThreadPool(int nThreads)

    返回一个固定线程数量的线程池。线程数量始终不变。当有空闲线程时,立即执行;若没有,新线程暂存在一个任务队列中,待线程空闲,便开始处理任务队列中的任务。

  • public static ExecutorService newSingleThreadExecutor()

    返回一个只有一个线程的线程池。新线程过来存放在任务队列中,线程空闲时,处理任务队列中的任务。

  • public static ExecutorService newCachedThreadPool()

    返回一个可以根据实际情况调整线程数量的线程池。线程的进程数不确定,但是如果有空闲线程可以复用,则会优先使用该线程。如果所有线程都在工作,又有新的任务提交,那么就会创建新的线程执行任务。所有线程在当前任务完成之后,将返回线程池进行服用。

  • public static ScheduledExecutorService newSingleThreadScheduldExecutor()

    返回一个 ScheduledExecutorService 对象,线程池大小为 1。在某个固定的延时之后执行,或者周期性执行某个任务

  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    和上面一样,但是可以指定线程数。


核心线程池的内部实现

对于核心的几个线程池,其内部实现都是使用了 ThreadPoolExecutor 实现。

  • public static ExecutorService newFixedThreaPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads,nThreads,
                                     0L,TimeUnit.SECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • public static ExecutorService newSingleThreadExecutor() {
        return new ThreadPoolExecutor(1,1,
                                     0L,TimeUnit.SECONDS,
                                     new LinkedBlockingQueue<Runnable>());
    }
  • public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                                     60L,TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
    }

看一下 ThreadPoolExecutor 最重要的构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long KeepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

函数的参数含义如下:

  • corePoolSize:线程池中的线程数量。
  • maximumPoolSize:线程池中的最大线程数量。
  • keepAliveTime:当线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间。
  • unitkeepAliveTime的单位。
  • workQueue:任务队列,被提交但是尚未被执行的任务。
  • threadFactory:线程工厂,用于创建线程,一般用默认的就行。
  • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

在上面的参数中,其他的都好理解,主要是 workQueuehandler需要重点提及一下。


workQueue 任务队列讲解

workQueue 指被提交但是尚未被执行的任务队列,它是一个 BlockingQueue 接口的对象,仅用于存在 Runnable 对象。根据队列功能分类,在 ThreadPoolExecutor 的构造函数中可以使用以下几种BlockingQueue

  • 直接提交的队列 SynchronousQueueSychronousQueue 是一个特殊的 BlockingQueueSychronousQueue 没有容量,每一个插入操作都要等待一个相应的删除操作,反之,一个删除操作都要等待对应的插入操作 。如果使用SychronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲线程,那么尝试创建新的线程,如果进程数量已经达到最大值,则执行拒绝策略。因此,使用SychronousQueue 通常要设置很大的maximumcorePoolSize,否则很容易执行拒绝策略。
  • 有界的任务队列 ArrayBlockingQueue:特点: 若有新的任务需要执行,如果线程池的实际线程小于 corePoolSize,则会优先创建新的线程;若大于corePoolSize,则会将新任务加入到等待队列。如果等待队列已满,无法加入,则在总线成熟不大于maximumPoolSize 的前提下,创建新的线程执行任务。若大于,则执行拒绝策略。
  • 无界的任务队列 LinkedBlockingQueue:特点: 若有新的任务需要执行,如果线程池的实际线程小于corePoolSize,则会优先创建新的线程;若大于corePoolSize,进入等待队列。
  • 优先任务队列PriorityBlockingQueue: 可以根据任务自身的优先级顺序先后执行。

handler拒绝策略讲解

JKD 内置的拒绝策略都是通过实现 RejectedExecutionHandler 接口实现的,具体如下:

  • AbortPolicy策略:直接抛出异常,阻止系统正常工作;
  • CallerRunsPolicy策略:只要线程未关闭,直接在调用者线程中,运行当前被丢弃的任务。warning: 有可能性能会急剧下降
  • DiscardOledstPolicy策略:丢弃一个最老的请求,并尝试再次提交当前的任务。
  • DiscardPolicy策略:丢弃无法处理的请求,不予任何处理,不做出任何的提示。
退出移动版