关于线程池:90的人以为会用ThreadPoolExecutor了看了这10张图再说吧

3次阅读

共计 10915 个字符,预计需要花费 28 分钟才能阅读完成。

在阿里巴巴手册中有一条倡议:

【强制】线程池不容许应用 Executors 去创立,而是通过 ThreadPoolExecutor 的形式,这样的解决形式让写的同学更加明确线程池的运行规定,躲避资源耗尽的危险。

如果常常基于 Executors 提供的工厂办法创立线程池,很容易疏忽线程池外部的实现。特地是回绝策略,因应用 Executors 创立线程池时不会传入这个参数,间接采纳默认值,所以经常被疏忽。

上面咱们就来理解一下线程池相干的实现原理、API 以及实例。

线程池的作用

在实际利用中创立线程池次要是为了:

  • 缩小资源开销:缩小每次创立、销毁线程的开销;
  • 进步响应速度:申请到来时,线程已创立好,可间接执行,进步响应速度;
  • 进步线程的可管理性:线程是稀缺资源,需依据状况加以限度,确保零碎稳固运行;

ThreadPoolExecutor

ThreadPoolExecutor 能够实现线程池的创立。ThreadPoolExecutor 相干类图如下:

从类图能够看出,ThreadPoolExecutor 最终实现了 Executor 接口,是线程池创立的真正实现者。

Executor 两级调度模型

在 HotSpot 虚拟机中,Java 中的线程将会被一一映射为操作系统的线程。在 Java 虚拟机层面,用户将多个工作提交给 Executor 框架,Executor 负责调配线程执行它们;在操作系统层面,操作系统再将这些线程调配给处理器执行。

ThreadPoolExecutor 的三个角色

工作

ThreadPoolExecutor 承受两种类型的工作:Callable 和 Runnable。

  • Callable:该类工作有返回后果,能够抛出异样。通过 submit 办法提交,返回 Future 对象。通过 get 获取执行后果。
  • Runnable:该类工作只执行,无奈获取返回后果,在执行过程中无奈抛异样。通过 execute 或 submit 办法提交。

工作执行器

Executor 框架最外围的接口是 Executor,它示意工作的执行器。

通过下面类图能够看出,Executor 的子接口为 ExecutorService。再往底层有两大实现类:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor(集成自 ThreadPoolExecutor)。

执行后果

Future 接口示意异步的执行后果,它的实现类为 FutureTask。

三个角色之间的解决逻辑图如下:

线程池解决流程

一个线程从被提交(submit)到执行共经验以下流程:

  • 线程池判断外围线程池里是的线程是否都在执行工作,如果不是,则创立一个新的工作线程来执行工作。如果外围线程池里的线程都在执行工作,则进入下一个流程;
  • 线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的工作贮存在这个工作队列里。如果工作队列满了,则进入下一个流程;
  • 线程池判断其外部线程是否都处于工作状态。如果没有,则创立一个新的工作线程来执行工作。如果已满了,则交给饱和策略来解决这个工作。

线程池在执行 execute 办法时,次要有以下四种状况:

  • 如果以后运行的线程少于 corePoolSize,则创立新线程来执行工作(须要取得全局锁);
  • 如果运行的线程等于或多于 corePoolSize,则将工作退出 BlockingQueue;
  • 如果无奈将工作退出 BlockingQueue(队列已满),则创立新的线程来解决工作(须要取得全局锁);
  • 如果创立新线程将使以后运行的线程超出 maxiumPoolSize,工作将被回绝,并调用 RejectedExecutionHandler.rejectedExecution()办法。

线程池采取上述的流程进行设计是为了缩小获取全局锁的次数。在线程池实现预热(以后运行的线程数大于或等于 corePoolSize)之后,简直所有的 excute 办法调用都执行步骤二。

线程的状态流转

顺便再回顾一下线程的状态的转换,在 JDK 中 Thread 类中提供了一个枚举类,例举了线程的各个状态:

    public enum State {

        NEW,

        RUNNABLE,

        BLOCKED,

        WAITING,

        TIMED_WAITING,

        TERMINATED;
    }

一共定义了 6 个枚举值,其实代表的是 5 种类型的线程状态:

  • NEW:新建;
  • RUNNABLE:运行状态;
  • BLOCKED:阻塞状态;
  • WAITING:期待状态,WAITING 和 TIMED_WAITING 能够归为一类,都属于期待状态,只是后者能够设置等待时间,即期待多久;
  • TERMINATED:终止状态;

线程关系转换图:

当 new Thread()阐明这个线程处于 NEW(新建状态);调用 Thread.start()办法示意这个线程处于 RUNNABLE(运行状态);

然而 RUNNABLE 状态中又蕴含了两种状态:READY(就绪状态)和 RUNNING(运行中)。调用 start()办法,线程不肯定取得了 CPU 工夫片,这时就处于 READY,期待 CPU 工夫片,当取得了 CPU 工夫片,就处于 RUNNING 状态。

在运行中调用 synchronized 同步的代码块,没有获取到锁,这时会处于 BLOCKED(阻塞状态),当从新获取到锁时,又会变为 RUNNING 状态。在代码执行的过程中可能会碰到 Object.wait()等一些期待办法,线程的状态又会转变为 WAITING(期待状态),期待被唤醒,当调用了 Object.notifyAll()唤醒了之后线程执行完就会变为 TERMINATED(终止状态)。

线程池的状态

线程池中状态通过 2 个二进制位(bit)来示意线程池的 5 个状态:RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

  • RUNNING:线程池失常工作的状态,在 RUNNING 状态下线程池承受新的工作并解决工作队列中的工作;
  • SHUTDOWN:调用 shutdown() 办法会进入 SHUTDOWN 状态。在 SHUTDOWN 状态下,线程池不承受新的工作,然而会继续执行工作队列中已有的工作;
  • STOP:调用 shutdownNow() 会进入 STOP 状态。在 STOP 状态下线程池既不承受新的工作,也不解决曾经在队列中的工作。对于还在执行工作的工作线程,线程池会发动中断请求来中断正在执行的工作,同时会清空工作队列中还未被执行的工作;
  • TIDYING:当线程池中的所有执行工作的工作线程都曾经终止,并且工作线程汇合为空的时候,进入 TIDYING 状态;
  • TERMINATED:当线程池执行完 terminated() 钩子办法当前,线程池进入终态 TERMINATED

ThreadPoolExecutor API

ThreadPoolExecutor 创立线程池 API:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

参数解释:

  • corePoolSize:线程池常驻外围线程数。创立线程池时,线程池中并没有任何线程,当有工作来时才去创立线程,执行工作。提交一个工作,创立一个线程,直到须要执行的工作数大于线程池根本大小,则不再创立。当创立的线程数等于 corePoolSize 时,会退出设置的阻塞队列。
  • maximumPoolSize:线程池容许创立的最大线程数。当队列满时,会创立线程执行工作直到线程池中的数量等于 maximumPoolSize。
  • keepAliveTime:当线程数大于外围时,此为终止前多余的闲暇线程期待新工作的最长工夫。
  • unit:keepAliveTime 的工夫单位,可选项:天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、奥妙(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一奥妙)。
  • workQueue:用来贮存期待执行工作的队列。
  • threadFactory:线程工厂,用来生产一组雷同工作的线程。次要用于设置生成的线程名词前缀、是否为守护线程以及优先级等。设置有意义的名称前缀有利于在进行虚拟机剖析时,晓得线程是由哪个线程工厂创立的。
  • handler:执行回绝策略对象。当达到工作缓存下限时(即超过 workQueue 参数能存储的工作数),执行拒接策略。也就是当工作解决不过去的时候,线程池开始执行回绝策略。JDK 1.5 提供了四种饱和策略:

    • AbortPolicy:默认,间接抛异样;
    • 只用调用者所在的线程执行工作,重试增加以后的工作,它会主动反复调用 execute()办法;
    • DiscardOldestPolicy:抛弃工作队列中最久的工作;
    • DiscardPolicy:抛弃当前任务;

适当的阻塞队列

当创立的线程数等于 corePoolSize,会将工作退出阻塞队列(BlockingQueue),保护着期待执行的 Runnable 对象。

阻塞队列通常有如下类型:

  • ArrayBlockingQueue:一个由数组构造组成的有界阻塞队列。能够限定队列的长度,接管到工作时,如果没有达到 corePoolSize 的值,则新建线程 (外围线程) 执行工作,如果达到了,则入队等待,如果队列已满,则新建线程 (非核心线程) 执行工作,又如果总线程数到了 maximumPoolSize,并且队列也满了,则产生谬误。
  • LinkedBlockingQueue:一个由链表构造组成的有界阻塞队列。这个队列在接管到工作时,如果以后线程数小于外围线程数,则新建线程 (外围线程) 解决工作;如果以后线程数等于外围线程数,则进入队列期待。因为这个队列没有最大值限度,即所有超过外围线程数的工作都将被增加到队列中,这也就导致了 maximumPoolSize 的设定生效,因为总线程数永远不会超过 corePoolSize。
  • PriorityBlockingQueue:一个反对优先级排序的无界阻塞队列。
  • DelayQueue:一个应用优先级队列实现的无界阻塞队列。队列内元素必须实现 Delayed 接口,这就意味着传入的工作必须先实现 Delayed 接口。这个队列在接管到工作时,首先先入队,只有达到了指定的延时工夫,才会执行工作。
  • SynchronousQueue:一个不存储元素的阻塞队列。这个队列在接管到工作时,会间接提交给线程解决,而不保留它,如果所有线程都在工作就新建一个线程来解决这个工作。所以为了保障不呈现【线程数达到了 maximumPoolSize 而不能新建线程】的谬误,应用这个类型队列时,maximumPoolSize 个别指定成 Integer.MAX_VALUE,即无限大。
  • LinkedTransferQueue:一个由链表构造组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表构造组成的双向阻塞队列。

明确的回绝策略

当工作解决不过来时,线程池开始执行回绝策略。

反对的回绝策略:

  • ThreadPoolExecutor.AbortPolicy: 抛弃工作并抛出 RejectedExecutionException 异样。(默认)
  • ThreadPoolExecutor.DiscardPolicy:也是抛弃工作,然而不抛出异样。
  • ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最后面的工作,而后从新尝试执行工作。(反复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程解决该工作。

线程池敞开

  • shutdown:将线程池状态置为 SHUTDOWN,并不会立刻进行。进行接管内部 submit 的工作,外部正在跑的工作和队列里期待的工作,会执行完后,才真正进行。
  • shutdownNow:将线程池状态置为 STOP。希图立刻进行,事实上不肯定,跟 shutdown()一样,先进行接管内部提交的工作,疏忽队列里期待的工作,尝试将正在跑的工作 interrupt 中断(如果线程未处于 sleep、wait、condition、定时锁状态,interrupt 无奈中断以后线程),返回未执行的工作列表。
  • awaitTermination(long timeOut, TimeUnit unit)以后线程 阻塞,直到等所有已提交的工作(包含正在跑的和队列中期待的)执行完或者等超时工夫到或者线程被中断,抛出 InterruptedException,而后返回 true(shutdown 申请后所有工作执行结束)或 false(已超时)。

Executors

Executors 是一个帮忙类,提供了创立几种预配置线程池实例的办法:newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool 等。

如果查看源码就会发现,Executors 实质上就是实现了几类默认的 ThreadPoolExecutor。而阿里巴巴开发手册,不倡议采纳 Executors 默认的,让使用者间接通过 ThreadPoolExecutor 来创立。

Executors.newSingleThreadExecutor()

创立一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有工作。如果这个惟一的线程因为异样完结,那么会有一个新的线程来代替它。此线程池保障所有工作的执行程序依照工作的提交程序执行。

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

该类型线程池的结构图:

该线程池的特点:

  • 只会创立一条工作线程解决工作;
  • 采纳的阻塞队列为 LinkedBlockingQueue;

Executors.newFixedThreadPool()

创立固定大小的线程池。每次提交一个工作就创立一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会放弃不变,如果某个线程因为执行异样而完结,那么线程池会补充一个新线程。

new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

该类型线程池的结构图:

该线程池的特点:

  • 固定大小;
  • corePoolSize 和 maximunPoolSize 都为用户设定的线程数量 nThreads;
  • keepAliveTime 为 0,意味着一旦有多余的闲暇线程,就会被立刻进行掉;但这里 keepAliveTime 有效;
  • 阻塞队列采纳了 LinkedBlockingQueue,一个无界队列;
  • 因为阻塞队列是一个无界队列,因而永远不可能回绝工作;
  • 因为采纳了无界队列,理论线程数量将永远维持在 nThreads,因而 maximumPoolSize 和 keepAliveTime 将有效。

Executors.newCachedThreadPool()

创立一个可缓存的线程池。如果线程池的大小超过了解决工作所须要的线程,那么就会回收局部闲暇(60 秒不执行工作)的线程,当工作数减少时,此线程池又能够智能的增加新线程来解决工作。此线程池不会对线程池大小做限度,线程池大小齐全依赖于操作系统(或者说 JVM)可能创立的最大线程大小。

new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

该类型线程池的结构图:

该线程池的特点:

  • 能够有限扩充;
  • 比拟适宜解决执行工夫比拟小的工作;
  • corePoolSize 为 0,maximumPoolSize 为无限大,意味着线程数量能够无限大;
  • keepAliveTime 为 60S,意味着线程闲暇工夫超过 60s 就会被杀死;
  • 采纳 SynchronousQueue 装期待的工作,这个阻塞队列没有存储空间,这意味着只有有申请到来,就必须要找到一条工作线程解决它,如果以后没有闲暇的线程,那么就会再创立一条新的线程。

Executors.newScheduledThreadPool()

创立一个定长线程池,反对定时及周期性工作执行。

new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());

该线程池类图:

该线程池的特点:

  • 接管 SchduledFutureTask 类型的工作,有两种提交工作的形式:scheduledAtFixedRate 和 scheduledWithFixedDelay。SchduledFutureTask 接管的参数:

    • time:工作开始的工夫
    • sequenceNumber:工作的序号
    • period:工作执行的工夫距离
  • 采纳 DelayQueue 存储期待的工作;
  • DelayQueue 外部封装了一个 PriorityQueue,它会依据 time 的先后工夫排序,若 time 雷同则依据 sequenceNumber 排序;
  • DelayQueue 也是一个无界队列;
  • 工作线程执行时,工作线程会从 DelayQueue 取曾经到期的工作去执行;执行完结后从新设置工作的到期工夫,再次放回 DelayQueue;

Executors.newWorkStealingPool()

JDK8 引入,创立持有足够线程的线程池反对给定的并行度,并通过应用多个队列缩小竞争。

public static ExecutorService newWorkStealingPool() {return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
        null, true);
}

Executors 办法的弊病

1)newFixedThreadPool 和 newSingleThreadExecutor:容许的 申请队列长度 为 Integer.MAX_VALUE,可能会沉积大量的申请,从而导致 OOM。
2)newCachedThreadPool 和 newScheduledThreadPool:容许的 创立线程数量 为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。

合理配置线程池大小

合理配置线程池,须要先剖析工作个性,能够从以下角度来进行剖析:

  • 工作的性质:CPU 密集型工作,IO 密集型工作和混合型工作。
  • 工作的优先级:高,中和低。
  • 工作的执行工夫:长,中和短。
  • 工作的依赖性:是否依赖其余系统资源,如数据库连贯。

另外,还须要查看零碎的内核数:

Runtime.getRuntime().availableProcessors());

依据工作所须要的 CPU 和 IO 资源能够分为:

  • CPU 密集型工作: 次要是执行计算工作,响应工夫很快,CPU 始终在运行。个别公式:线程数 = CPU 核数 + 1。只有在真正的多核 CPU 上能力失去减速,长处是不存在线程切换开销,进步了 CPU 的利用率并缩小了线程切换的效力损耗。
  • IO 密集型工作:次要是进行 IO 操作,CPU 并不是始终在执行工作,IO 操作(CPU 闲暇状态)的工夫较长,应配置尽可能多的线程,其中的线程在 IO 操作时,其余线程能够持续利用 CPU,从而进步 CPU 的利用率。个别公式:线程数 = CPU 核数 * 2。

应用实例

工作实现类:

/**
 * 工作实现线程
 * @author sec
 * @version 1.0
 * @date 2021/10/30
 **/
public class MyThread implements Runnable{

   private final Integer number;

   public MyThread(int number){this.number = number;}

   public Integer getNumber() {return number;}

   @Override
   public void run() {
      try {
         // 业务解决
         TimeUnit.SECONDS.sleep(1);
         System.out.println("Hello! ThreadPoolExecutor -" + getNumber());
      } catch (InterruptedException e) {e.printStackTrace();
      }
   }
}

自定义阻塞提交的 ThreadLocalExcutor:

/**
 * 自定义阻塞提交的 ThreadPoolExecutor
 * @author sec
 * @version 1.0
 * @date 2021/10/30
 **/
public class CustomBlockThreadPoolExecutor {

   private ThreadPoolExecutor pool = null;

   /**
    * 线程池初始化办法
    */
   public void init() {
      // 外围线程池大小
      int poolSize = 2;
      // 最大线程池大小
      int maxPoolSize = 4;
      // 线程池中超过 corePoolSize 数目的闲暇线程最大存活工夫:30+ 单位 TimeUnit
      long keepAliveTime = 30L;
      // ArrayBlockingQueue<Runnable> 阻塞队列容量 30
      int arrayBlockingQueueSize = 30;
      pool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(arrayBlockingQueueSize), new CustomThreadFactory(),
            new CustomRejectedExecutionHandler());
   }

   /**
    * 敞开线程池办法
    */
   public void destroy() {if (pool != null) {pool.shutdownNow();
      }
   }

   public ExecutorService getCustomThreadPoolExecutor() {return this.pool;}

   /**
    * 自定义线程工厂类,* 生成的线程名词前缀、是否为守护线程以及优先级等
    */
   private static class CustomThreadFactory implements ThreadFactory {private final AtomicInteger count = new AtomicInteger(0);

      @Override
      public Thread newThread(Runnable r) {Thread t = new Thread(r);
         String threadName = CustomBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
         t.setName(threadName);
         return t;
      }
   }


   /**
    * 自定义回绝策略对象
    */
   private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
      @Override
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
         // 外围革新点,将 blockingqueue 的 offer 改成 put 阻塞提交
         try {executor.getQueue().put(r);
         } catch (InterruptedException e) {e.printStackTrace();
         }
      }
   }

   /**
    * 当提交工作被回绝时,进入回绝机制,实现回绝办法,把工作从新用阻塞提交办法 put 提交,实现阻塞提交工作性能,避免队列过大,OOM
    */
   public static void main(String[] args) {CustomBlockThreadPoolExecutor executor = new CustomBlockThreadPoolExecutor();

      // 初始化
      executor.init();
      ExecutorService pool = executor.getCustomThreadPoolExecutor();
      for (int i = 1; i < 51; i++) {MyThread myThread = new MyThread(i);
         System.out.println("提交第" + i + "个工作");
         pool.execute(myThread);
      }

      pool.shutdown();
      try {
         // 阻塞,超时工夫到或者线程被中断
         if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
            // 立刻敞开
            executor.destroy();}
      } catch (InterruptedException e) {executor.destroy();
      }
   }
}

小结

看似简略的线程池创立,其中却蕴含着各类常识,交融贯通,依据具体场景采纳具体的参数进行设置才可能达到最优的成果。

总结一下就是:

  • 用 ThreadPoolExecutor 自定义线程池,要看线程的用处。如果任务量不大,能够用无界队列,如果任务量十分大,要用有界队列,避免 OOM;
  • 如果任务量很大,且要求每个工作都解决胜利,要对提交的工作进行阻塞提交,重写回绝机制,改为阻塞提交。保障不摈弃一个工作;
  • 最大线程数个别设为 2N+ 1 最好,N 是 CPU 核数;
  • 外围线程数,要依据工作是 CPU 密集型,还是 IO 密集型。同时,如果工作是一天跑一次,设置为 0 适合,因为跑完就停掉了;
  • 如果要获取工作执行后果,用 CompletionService,然而留神,获取工作的后果要从新开一个线程获取,如果在主线程获取,就要等工作都提交后才获取,就会阻塞大量工作后果,队列过大 OOM,所以最好异步开个线程获取后果。

博主简介:《SpringBoot 技术底细》技术图书作者,热爱钻研技术,写技术干货文章。

公众号:「程序新视界」,博主的公众号,欢送关注~

技术交换:请分割博主微信号:zhuan2quan

参考文章:

[1]https://www.jianshu.com/p/948…

[2]https://blog.csdn.net/jek1234…

[3]https://blog.csdn.net/z_s_z20…

[4]https://zhuanlan.zhihu.com/p/…

[5]https://www.cnblogs.com/semi-…

正文完
 0