关于后端:12分钟从Executor自顶向下彻底搞懂线程池

前言

上篇文章 13分钟聊聊并发包中罕用同步组件并手写一个自定义同步组件 聊到并发包中罕用的同步组件,并且还手把手实现了自定义的同步组件

本篇文章来聊聊并发包下的另一个外围-线程池

浏览本文大略12分钟

通读本篇文章前先来看看几个问题,看看你是否以及了解线程池

  1. 什么是池化技术?它有什么特点,哪些场景应用?
  2. Executor是什么?它的设计思维是什么样的?
  3. 工作工作有几种?有什么特点?如何适配而后交给Executor的?
  4. 线程池是如何实现的?有哪些外围参数,该如何配置?工作流程是怎么的?
  5. 线程池如何优雅的解决异样?如何敞开线程池?
  6. 解决定时的线程池是如何实现的?

池化技术

线程的创立、销毁都会带来肯定的开销

如果当咱们须要应用到多线程时再去创立,应用完又去销毁,这样去应用不仅会拉长业务流程,还会减少创立、销毁线程的开销

于是有了池化技术的思维,将线程提前创立进去,放在一个池子(容器)中进行治理

当须要应用时,从池子里拿取一个线程来执行工作,执行结束后再放回池子

不仅是线程有池化的思维,连贯也有池化的思维,也就是连接池

池化技术不仅能复用资源、进步响应,还方便管理

Executor框架

Executor框架是什么?

能够临时把Executor看成线程池的形象,它定义如何去执行工作

  public interface Executor {
      void execute(Runnable command);
  }

Executor将工作工作与线程池进行拆散解耦

工作工作被分为两种:无返回后果的Runnable和有返回后果的Callable

在线程池中容许执行这两种工作,其中它们都是函数式接口,能够应用lambda表达式来实现

有的同学可能会有疑难,上文Executor框架定义的执行办法不是只容许传入Runnable工作吗?

Callable工作调用哪个办法来执行呢?

Future接口用来定义获取异步工作的后果,它的实现类常是FutureTask

FutureTask实现Runnable的同时,还用字段存储Callable,在其实现Runnable时实际上会去执行Callable工作

线程池在执行Callable工作时,会将应用FutureTask将其封装成Runnable执行(具体源码咱们前面再聊),因而Executor的执行办法入参只有Runnable

FutureTask相当于适配器,将Callable转换为Runnable再进行执行

Executor 定义线程池,而它的重要实现是ThreadPoolExecutor

ThreadPoolExecutor的根底上,还有个做定时的线程池ScheduledThreadPoolExecutor

ThreadPoolExecutor

外围参数

ThreadPoolExecutor次要有七个重要的参数

  public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler)
  1. corePoolSize 线程池外围线程数量
  2. maximumPoolSize 线程池容许创立的最大线程数
  3. keepAliveTime 超时工夫,TimeUnit工夫单位:非核心线程闲暇后存活的工夫
  4. workQueue 寄存期待执行工作的阻塞队列
  5. threadFactory线程工厂:规定如何创立线程,能够依据业务不同规定 不同的线程组名称
  6. RejectedExecutionHandler 回绝策略:当线程不够用,并且阻塞队列爆满时如何回绝工作的策略
回绝策略 作用
AbortPolicy 默认 抛出异样
CallerRunsPolicy 调用线程来执行工作
DiscardPolicy 不解决,抛弃
DiscardOldestPolicy 抛弃队列中最近一个工作,并立刻执行当前任务

线程池中除了结构时的外围参数外,还应用外部类Worker来封装线程和工作,并应用HashSet容器workes工作队列存储工作线程worker

实现原理

流程图

为了清晰的了解线程池实现原理,咱们先用流程图和总结概述原理,最初来看源码实现

  1. 如果工作线程数量小于外围线程数量,创立线程、退出工作队列、执行工作
  2. 如果工作线程数量大于等于外围线程数量并且线程池还在运行则尝试将工作退出阻塞队列
  3. 如果工作退出阻塞队列失败(阐明阻塞队列已满),并且工作线程小于最大线程数,则创立线程执行
  4. 如果阻塞队列已满、并且工作线程数量达到最大线程数量则执行回绝策略
execute

线程池有两种提交形式execute和submit,其中submit会封装成RunnableFuture最终都来执行execute

      public <T> Future<T> submit(Callable<T> task) {
          if (task == null) throw new NullPointerException();
          RunnableFuture<T> ftask = newTaskFor(task);
          execute(ftask);
          return ftask;
      }

execute中实现线程池的整个运行流程

  public void execute(Runnable command) {
      //工作为空间接抛出空指针异样
      if (command == null)
          throw new NullPointerException();
      //ctl是一个整型原子状态,蕴含workerCount工作线程数量 和 runState是否运行两个状态
      int c = ctl.get();
      //1.如果工作线程数 小于 外围线程数 addWorker创立工作线程
      if (workerCountOf(c) < corePoolSize) {
          if (addWorker(command, true))
              return;
          c = ctl.get();
      }
      
      // 2.工作线程数 大于等于 外围线程数时
      // 如果 正在运行 尝试将 工作退出队列
      if (isRunning(c) && workQueue.offer(command)) {
          //工作退出队列胜利 查看是否运行
          int recheck = ctl.get();
          //不在运行 并且 删除工作胜利 执行回绝策略 否则查看工作线程为0就创立线程
          if (! isRunning(recheck) && remove(command))
              reject(command);
          else if (workerCountOf(recheck) == 0)
              addWorker(null, false);
      }
      // 3.工作退出队列失败,尝试去创立非核心线程,胜利则完结
      else if (!addWorker(command, false))
          // 4.失败则执行回绝策略
          reject(command);
  }
addWorker

addWorker用于创立线程退出工作队列并执行工作

第二个参数用来判断是不是创立外围线程,当创立外围线程时为true,创立非核心线程时为false

  private boolean addWorker(Runnable firstTask, boolean core) {
          //不便跳出双层循环
          retry:
          for (;;) {
              int c = ctl.get();
              int rs = runStateOf(c);
  
              // Check if queue empty only if necessary.
              // 查看状态
              if (rs >= SHUTDOWN &&
                  ! (rs == SHUTDOWN &&
                     firstTask == null &&
                     ! workQueue.isEmpty()))
                  return false;
  
              for (;;) {
                  int wc = workerCountOf(c);
                  //工作线程数已满 返回false 
                  if (wc >= CAPACITY ||
                      wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  //CAS自增工作线程数量 胜利跳出双重循环
                  if (compareAndIncrementWorkerCount(c))
                      break retry;
                  //CAS失败 从新读取状态 内循环
                  c = ctl.get();  // Re-read ctl
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
              }
          }
  
          //来到这里阐明曾经自增工作线程数量 筹备创立线程
          boolean workerStarted = false;
          boolean workerAdded = false;
          Worker w = null;
          try {
              //创立worker 通过线程工厂创立线程
              w = new Worker(firstTask);
              final Thread t = w.thread;
              if (t != null) {
                  //全局锁
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      // Recheck while holding lock.
                      // Back out on ThreadFactory failure or if
                      // shut down before lock acquired.
                      int rs = runStateOf(ctl.get());
  
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {
                          if (t.isAlive()) // precheck that t is startable
                              throw new IllegalThreadStateException();
                          //增加线程
                          workers.add(w);
                          int s = workers.size();
                          if (s > largestPoolSize)
                              largestPoolSize = s;
                          //标记线程增加完
                          workerAdded = true;
                      }
                  } finally {
                      mainLock.unlock();
                  }
                  //执行线程
                  if (workerAdded) {
                      t.start();
                      workerStarted = true;
                  }
              }
          } finally {
              if (! workerStarted)
                  addWorkerFailed(w);
          }
          return workerStarted;
      }

addWorker中会CAS自增工作线程数量,创立线程再加锁,将线程退出工作队列workes(hashset),解锁后开启该线程去执行工作

runWorker

worker中实现Runnable的是runWorker办法,在启动线程后会不停的执行工作,工作执行完就去获取工作执行

  final void runWorker(Worker w) {
      Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      w.unlock(); // allow interrupts
      boolean completedAbruptly = true;
      try {
          //循环执行工作 getTask获取工作
          while (task != null || (task = getTask获取工作()) != null) {
              w.lock();
              // If pool is stopping, ensure thread is interrupted;
              // if not, ensure thread is not interrupted.  This
              // requires a recheck in second case to deal with
              // shutdownNow race while clearing interrupt
              if ((runStateAtLeast(ctl.get(), STOP) ||
                   (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                  wt.interrupt();
              try {
                  //执行前 钩子办法
                  beforeExecute(wt, task);
                  Throwable thrown = null;
                  try {
                      //执行
                      task.run();
                  } catch (RuntimeException x) {
                      thrown = x; throw x;
                  } catch (Error x) {
                      thrown = x; throw x;
                  } catch (Throwable x) {
                      thrown = x; throw new Error(x);
                  } finally {
                      //执行后钩子办法
                      afterExecute(task, thrown);
                  }
              } finally {
                  task = null;
                  w.completedTasks++;
                  w.unlock();
              }
          }
          completedAbruptly = false;
      } finally {
          processWorkerExit(w, completedAbruptly);
      }
  }

在执行前后预留两个钩子空办法,留给子类来扩大,后文解决线程池异样也会用到

配置参数

线程池中是不是越多线程就越好呢?

首先,咱们要明确创立线程是有开销的,程序计数器、虚拟机栈、本地办法栈都是线程公有的空间

并且线程在申请空间时,是通过CAS申请年老代的Eden区中一块内存(因为可能存在多线程同时申请所以要CAS)

线程太多可能导致Eden空间被应用太多导致young gc,并且线程上下文切换也须要开销

因而,线程池中线程不是越多越好,行业内分为两种大略计划

针对CPU密集型,线程池设置最大线程数量为CPU外围数量+1,防止上下文切换,进步吞吐量,多留一个线程兜底

针对IO密集型,线程池设置最大线程数量为2倍CPU外围数量,因为IO须要期待,为了防止CPU闲暇就多一些线程

具体业务场景须要具体分析,而后加上大量测试能力失去最正当的配置

Executor框架通过动态工厂办法提供几种线程池,比方:Executors.newSingleThreadExecutor()Executors.newFixedThreadPool()Executors.newCachedThreadPool()

但因为业务场景的不同,最好还是自定义线程池;当了解线程池参数和实现原理后,查看它们的源码并不难,咱们不过多叙述

解决异样

线程池中如果出现异常会怎么样?

Runnable

当咱们应用Runnable工作时,出现异常会间接抛出

         threadPool.execute(() -> {
             int i = 1;
             int j = 0;
             System.out.println(i / j);
         });

面对这种状况,咱们能够在Runnable工作中应用try-catch进行捕捉

         threadPool.execute(() -> {
             try {
                 int i = 1;
                 int j = 0;
                 System.out.println(i / j);
             } catch (Exception e) {
                 System.out.println(e);
             }
         });

实际操作的话用日志记录哈,不要打印到控制台

Callable

当咱们应用Callable工作时,应用submit办法会获取Future

         Future<Integer> future = threadPool.submit(() -> {
             int i = 1;
             int j = 0;
             return i / j;
         });

如果不应用Future.get()去获取返回值,那么异样就不会抛出,这是比拟危险的

为什么会呈现这样的状况呢?

前文说过执行submit时会将Callable封装成FutureTask执行

在其实现Runnable中,在执行Callable工作时,如果出现异常会封装在FutureTask中

     public void run() {
         //...其余略
         try {
             //执行call工作
             result = c.call();
             ran = true;
         } catch (Throwable ex) {
             //出现异常 封装到FutureTask
             result = null;
             ran = false;
             setException(ex);
         }
         //..
     }

等到执行get时,先阻塞、直到实现工作再来判断状态,如果状态不失常则抛出封装的异样

     private V report(int s) throws ExecutionException {
         Object x = outcome;
         if (s == NORMAL)
             return (V)x;
         if (s >= CANCELLED)
             throw new CancellationException();
         throw new ExecutionException((Throwable)x);
     }

因而在解决Callable工作时,能够对工作进行捕捉也能够对get进行捕捉

         //捕捉工作
         Future<?> f = threadPool.submit(() -> {
             try {
                 int i = 1;
                 int j = 0;
                 return i / j;
             } catch (Exception e) {
                 System.out.println(e);
             } finally {
                 return null;
             }
         });
 ​
         //捕捉get
         Future<Integer> future = threadPool.submit(() -> {
             int i = 1;
             int j = 0;
             return i / j;
         });
 ​
         try {
             Integer integer = future.get();
         } catch (Exception e) {
             System.out.println(e);
         }
afterExecutor

还记得线程池的runWorker吗?

它在循环中不停的获取阻塞队列中的工作执行,在执行前后预留钩子办法

继承ThreadPoolExecutor来重写执行后的钩子办法,记录执行完是否产生异样,如果有异样则进行日志记录,作一层兜底计划

 public class MyThreadPool extends ThreadPoolExecutor {  
     //...
     
     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         //Throwable为空 可能是submit提交 如果runnable为future 则捕捉get
         if (Objects.isNull(t) && r instanceof Future<?>) {
             try {
                 Object res = ((Future<?>) r).get();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             } catch (ExecutionException e) {
                 t = e;
             }
         }
 ​
         if (Objects.nonNull(t)) {
             System.out.println(Thread.currentThread().getName() + ": " + t.toString());
         }
     }
 }

这样即便应用submit,遗记应用get时,异样也不会“隐没”

setUncaughtException

创立线程时,能够设置未捕捉异样uncaughtException办法,当线程出现异常未捕捉时调用,也能够打印日志作兜底

咱们定义咱们本人的线程工厂,以业务组group为单位,创立线程(不便出错排查)并设置uncaughtException办法

 public class MyThreadPoolFactory implements ThreadFactory {
 ​
     private AtomicInteger threadNumber = new AtomicInteger(1);
     
     private ThreadGroup group;
 ​
     private String namePrefix = "";
 ​
     public MyThreadPoolFactory(String group) {
         this.group = new ThreadGroup(group);
         namePrefix = group + "-thread-pool-";
     }
 ​
 ​
     @Override
     public Thread newThread(Runnable r) {
         Thread t = new Thread(group, r,
                 namePrefix + threadNumber.getAndIncrement(),
                 0);
         t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 System.out.println(t.getName() + ":" + e);
             }
         });
 ​
         if (t.isDaemon()) {
             t.setDaemon(false);
         }
         if (t.getPriority() != Thread.NORM_PRIORITY) {
             t.setPriority(Thread.NORM_PRIORITY);
         }
         return t;
     }
 ​
 }

敞开线程池

敞开线程池的2种办法: shutdown(),shutdownNow()

它们的原理都是: 遍历工作队列wokers中的线程,一一中断(调用线程的interrupt办法) 无奈响应中断的工作可能永远无奈终止

shutdown 工作会被执行完

  1. 将线程池状态设置为SHUTDOWN
  2. 中断所有未正在执行工作的线程

shutdownNow 工作不肯定会执行完

  1. 将线程池状态设置为STOP
  2. 尝试进行所有正在执行或暂停工作的线程
  3. 返回期待执行工作列表

通常应用shutdown,如果工作不肯定要执行完能够应用shutdownNow

SecheduledThreadPoolExecutor

ScheduledThreadPoolExecutorThreadPoolExecutor的根底上提供定时执行的性能

它有两个定时的办法

scheduleAtFixedRate 以工作开始为周期终点,比如说一个工作执行要0.5s,每隔1s执行,相当于执行完工作过0.5s又开始执行工作

scheduledWithFixedDelay 以工作完结为周期终点,比如说一个工作执行要0.5s,每隔1s执行,相当于执行完工作过1s才开始执行工作

         ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
         //scheduleAtFixedRate 固定频率执行工作 周期终点为工作开始
         scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
             try {
                 TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("scheduleAtFixedRate 周期终点为工作开始");
             //初始提早:1s  周期:1s
         },1,1, TimeUnit.SECONDS);
 ​
         //scheduledWithFixedDelay 固定提早执行工作,周期终点为工作完结
         scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
             try {
                 TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("scheduledWithFixedDelay 周期终点为工作完结 ");
             //初始提早:1s  周期:1s
         },1,1, TimeUnit.SECONDS);

定时线程池应用提早队列充当阻塞队列实现的

提早队列是一个优先级队列,它排序存储定时工作,工夫越小越先执行

线程获取工作时,会从提早队列中获取定时工作,如果工夫已到就执行

     public RunnableScheduledFuture<?> take() throws InterruptedException {
             final ReentrantLock lock = this.lock;
             lock.lockInterruptibly();
             try {
                 for (;;) {
                     RunnableScheduledFuture<?> first = queue[0];
                     //没有定时工作 期待
                     if (first == null)
                         available.await();
                     else {
                         //获取延迟时间
                         long delay = first.getDelay(NANOSECONDS);
                         //小于等于0 阐明超时,拿进去执行
                         if (delay <= 0)
                             return finishPoll(first);
                         first = null; // don't retain ref while waiting
                         //以后线程是leader则期待对应的延迟时间,再进入循环取出工作执行
                         //不是leader则始终期待,直到被唤醒
                         if (leader != null)
                             available.await();
                         else {
                             Thread thisThread = Thread.currentThread();
                             leader = thisThread;
                             try {
                                 available.awaitNanos(delay);
                             } finally {
                                 if (leader == thisThread)
                                     leader = null;
                             }
                         }
                     }
                 }
             } finally {
                 if (leader == null && queue[0] != null)
                     available.signal();
                 lock.unlock();
             }
         }

这两个定时办法一个以工作开始为周期终点、另一个以工作完结为周期终点

获取定时工作的流程是雷同的,只是它们构建的定时工作中提早的工夫不同

定时工作应用period 区别,为负数周期终点为工作开始,为正数时周期终点为工作完结

总结

本篇文章围绕线程池,深入浅出的解说池化技术,Executor,线程池的参数、配置、实现原理、解决异样、敞开等

应用池化技术可能节俭频繁创立、敞开的开销,晋升响应速度,方便管理,常利用于线程池、连接池等

Executor框架将工作工作与执行(线程池)解耦拆散,工作工作分为无返回值的Runnable和有返回值的Callable

Executor理论只解决Runnable工作,会将Callable工作封装成FutureTask适配Runnable执行

线程池应用工作队列来治理线程,线程执行完工作会从阻塞队列取工作执行,当非核心线程闲暇肯定工夫后会被敞开

线程池执行时,如果工作队列线程数量小于外围线程数,则创立线程来执行(相当预热)

如果工作队列线程数量大于外围线程数量,并且阻塞队列未满则放入阻塞队列

如果阻塞队列已满,还未达到最大线程数量则创立非核心线程执行工作

如果已达到最大线程数量则应用回绝策略

配置参数CPU密集型为CPU核数+1;IO密集型为2倍CPU核数;具体配置须要测试

解决异样能够间接捕捉工作,Callable能够捕捉get,也能够继承线程池实现afterExecutor记录异样,还能够在创立线程时就设置解决未捕捉异样办法

解决定时工作的线程池由提早队列实现,工夫越短的定时工作越先执行,线程会从提早队列中获取定时工作(工夫已到的状况),工夫未到就期待

最初(不要白嫖,一键三连求求拉~)

本篇文章被支出专栏 由点到线,由线到面,深入浅出构建Java并发编程常识体系,感兴趣的同学能够继续关注喔

本篇文章笔记以及案例被支出 gitee-StudyJava、 github-StudyJava 感兴趣的同学能够stat下继续关注喔~

案例地址:

Gitee-JavaConcurrentProgramming/src/main/java/D_ThreadPool

Github-JavaConcurrentProgramming/src/main/java/D_ThreadPool

有什么问题能够在评论区交换,如果感觉菜菜写的不错,能够点赞、关注、珍藏反对一下~

关注菜菜,分享更多干货,公众号:菜菜的后端私房菜

本文由博客一文多发平台 OpenWrite 公布!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理