关于后端:12分钟从Executor自顶向下彻底搞懂线程池

30次阅读

共计 13662 个字符,预计需要花费 35 分钟才能阅读完成。

前言

上篇文章 13 分钟聊聊并发包中罕用同步组件并手写一个自定义同步组件 聊到并发包中罕用的同步组件,并且还手把手实现了自定义的同步组件

本篇文章来聊聊并发包下的另一个外围 - 线程池

浏览本文大略 12 分钟

通读本篇文章前先来看看几个问题,看看你是否以及了解线程池

  1. 什么是池化技术?它有什么特点,哪些场景应用?
  2. Executor 是什么?它的设计思维是什么样的?
  3. 工作工作有几种?有什么特点?如何适配而后交给 Executor 的?
  4. 线程池是如何实现的?有哪些外围参数,该如何配置?工作流程是怎么的?
  5. 线程池如何优雅的解决异样?如何敞开线程池?
  6. 解决定时的线程池是如何实现的?

池化技术

线程的创立、销毁都会带来肯定的开销

如果当咱们须要应用到多线程时再去创立,应用完又去销毁,这样去应用不仅会拉长业务流程,还会减少创立、销毁线程的开销

于是有了池化技术的思维,将线程提前创立进去,放在一个池子(容器)中进行治理

当须要应用时,从池子里拿取一个线程来执行工作,执行结束后再放回池子

不仅是线程有池化的思维,连贯也有池化的思维,也就是连接池

池化技术不仅能复用资源、进步响应,还方便管理

Executor 框架

Executor 框架是什么?

能够临时把 Executor 看成线程池的形象,它定义如何去执行工作

  public interface Executor {void execute(Runnable command);
  }

Executor将工作工作与线程池进行拆散 解耦

工作工作被分为两种:无返回后果的 Runnable 和有返回后果的Callable

在线程池中容许执行这两种工作,其中它们都是函数式接口,能够应用 lambda 表达式来实现

有的同学可能会有疑难,上文 Executor 框架定义的执行办法不是只容许传入 Runnable 工作吗?

Callable 工作调用哪个办法来执行呢?

Future接口用来定义获取异步工作的后果,它的实现类常是FutureTask

FutureTask实现 Runnable 的同时,还用字段存储 Callable,在其实现Runnable 时实际上会去执行 Callable 工作

线程池在执行 Callable 工作时,会将应用 FutureTask 将其封装成 Runnable 执行 (具体源码咱们前面再聊),因而Executor 的执行办法入参只有Runnable

FutureTask相当于适配器,将 Callable 转换为 Runnable 再进行执行

Executor 定义线程池,而它的重要实现是ThreadPoolExecutor

ThreadPoolExecutor 的根底上,还有个做定时的线程池ScheduledThreadPoolExecutor

ThreadPoolExecutor

外围参数

ThreadPoolExecutor次要有七个重要的参数

  public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler)
  1. corePoolSize 线程池外围线程数量
  2. maximumPoolSize 线程池容许创立的最大线程数
  3. keepAliveTime 超时工夫,TimeUnit 工夫单位:非核心线程闲暇后存活的工夫
  4. workQueue 寄存期待执行工作的阻塞队列
  5. threadFactory 线程工厂:规定如何创立线程,能够依据业务不同规定 不同的线程组名称
  6. RejectedExecutionHandler 回绝策略:当线程不够用,并且阻塞队列爆满时如何回绝工作的策略
回绝策略 作用
AbortPolicy 默认 抛出异样
CallerRunsPolicy 调用线程来执行工作
DiscardPolicy 不解决,抛弃
DiscardOldestPolicy 抛弃队列中最近一个工作,并立刻执行当前任务

线程池中除了结构时的外围参数外,还 应用外部类 Worker 来封装线程和工作,并应用 HashSet 容器 workes 工作队列存储工作线程 worker

实现原理

流程图

为了清晰的了解线程池实现原理,咱们先用流程图和总结概述原理,最初来看源码实现

  1. 如果工作线程数量小于外围线程数量,创立线程、退出工作队列、执行工作
  2. 如果工作线程数量大于等于外围线程数量并且线程池还在运行则尝试将工作退出阻塞队列
  3. 如果工作退出阻塞队列失败(阐明阻塞队列已满),并且工作线程小于最大线程数,则创立线程执行
  4. 如果阻塞队列已满、并且工作线程数量达到最大线程数量则执行回绝策略
execute

线程池有两种提交形式 execute 和 submit,其中 submit 会封装成 RunnableFuture 最终都来执行execute

      public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
          RunnableFuture<T> ftask = newTaskFor(task);
          execute(ftask);
          return ftask;
      }

execute中实现线程池的整个运行流程

  public void execute(Runnable command) {
      // 工作为空间接抛出空指针异样
      if (command == null)
          throw new NullPointerException();
      //ctl 是一个整型原子状态,蕴含 workerCount 工作线程数量 和 runState 是否运行两个状态
      int c = ctl.get();
      //1. 如果工作线程数 小于 外围线程数 addWorker 创立工作线程
      if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
              return;
          c = ctl.get();}
      
      // 2. 工作线程数 大于等于 外围线程数时
      // 如果 正在运行 尝试将 工作退出队列
      if (isRunning(c) && workQueue.offer(command)) {
          // 工作退出队列胜利 查看是否运行
          int recheck = ctl.get();
          // 不在运行 并且 删除工作胜利 执行回绝策略 否则查看工作线程为 0 就创立线程
          if (! isRunning(recheck) && remove(command))
              reject(command);
          else if (workerCountOf(recheck) == 0)
              addWorker(null, false);
      }
      // 3. 工作退出队列失败,尝试去创立非核心线程,胜利则完结
      else if (!addWorker(command, false))
          // 4. 失败则执行回绝策略
          reject(command);
  }
addWorker

addWorker用于创立线程退出工作队列并执行工作

第二个参数用来判断是不是创立外围线程,当创立外围线程时为 true,创立非核心线程时为 false

  private boolean addWorker(Runnable firstTask, boolean core) {
          // 不便跳出双层循环
          retry:
          for (;;) {int c = ctl.get();
              int rs = runStateOf(c);
  
              // Check if queue empty only if necessary.
              // 查看状态
              if (rs >= SHUTDOWN &&
                  ! (rs == SHUTDOWN &&
                     firstTask == null &&
                     ! workQueue.isEmpty()))
                  return false;
  
              for (;;) {int wc = workerCountOf(c);
                  // 工作线程数已满 返回 false 
                  if (wc >= CAPACITY ||
                      wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  //CAS 自增工作线程数量 胜利跳出双重循环
                  if (compareAndIncrementWorkerCount(c))
                      break retry;
                  //CAS 失败 从新读取状态 内循环
                  c = ctl.get();  // Re-read ctl
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
              }
          }
  
          // 来到这里阐明曾经自增工作线程数量 筹备创立线程
          boolean workerStarted = false;
          boolean workerAdded = false;
          Worker w = null;
          try {
              // 创立 worker 通过线程工厂创立线程
              w = new Worker(firstTask);
              final Thread t = w.thread;
              if (t != null) {
                  // 全局锁
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      // Recheck while holding lock.
                      // Back out on ThreadFactory failure or if
                      // shut down before lock acquired.
                      int rs = runStateOf(ctl.get());
  
                      if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                              throw new IllegalThreadStateException();
                          // 增加线程
                          workers.add(w);
                          int s = workers.size();
                          if (s > largestPoolSize)
                              largestPoolSize = s;
                          // 标记线程增加完
                          workerAdded = true;
                      }
                  } finally {mainLock.unlock();
                  }
                  // 执行线程
                  if (workerAdded) {t.start();
                      workerStarted = true;
                  }
              }
          } finally {if (! workerStarted)
                  addWorkerFailed(w);
          }
          return workerStarted;
      }

addWorker中会 CAS 自增工作线程数量,创立线程再加锁,将线程退出工作队列 workes(hashset),解锁后开启该线程去执行工作

runWorker

worker 中实现 Runnable 的是 runWorker 办法,在启动线程后会不停的执行工作,工作执行完就去获取工作执行

  final void runWorker(Worker w) {Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      w.unlock(); // allow interrupts
      boolean completedAbruptly = true;
      try {
          // 循环执行工作 getTask 获取工作
          while (task != null || (task = getTask 获取工作()) != null) {w.lock();
              // If pool is stopping, ensure thread is interrupted;
              // if not, ensure thread is not interrupted.  This
              // requires a recheck in second case to deal with
              // shutdownNow race while clearing interrupt
              if ((runStateAtLeast(ctl.get(), STOP) ||
                   (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                  wt.interrupt();
              try {
                  // 执行前 钩子办法
                  beforeExecute(wt, task);
                  Throwable thrown = null;
                  try {
                      // 执行
                      task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                  } finally {
                      // 执行后钩子办法
                      afterExecute(task, thrown);
                  }
              } finally {
                  task = null;
                  w.completedTasks++;
                  w.unlock();}
          }
          completedAbruptly = false;
      } finally {processWorkerExit(w, completedAbruptly);
      }
  }

在执行前后预留两个钩子空办法,留给子类来扩大,后文解决线程池异样也会用到

配置参数

线程池中是不是越多线程就越好呢?

首先,咱们要明确创立线程是有开销的,程序计数器、虚拟机栈、本地办法栈都是线程公有的空间

并且线程在申请空间时,是通过 CAS 申请年老代的 Eden 区中一块内存(因为可能存在多线程同时申请所以要 CAS)

线程太多可能导致 Eden 空间被应用太多导致 young gc,并且线程上下文切换也须要开销

因而,线程池中线程不是越多越好,行业内分为两种大略计划

针对 CPU 密集型,线程池设置最大线程数量为 CPU 外围数量 +1,防止上下文切换,进步吞吐量,多留一个线程兜底

针对 IO 密集型,线程池设置最大线程数量为 2 倍 CPU 外围数量,因为 IO 须要期待,为了防止 CPU 闲暇就多一些线程

具体业务场景须要具体分析,而后加上大量测试能力失去最正当的配置

Executor 框架通过动态工厂办法提供几种线程池,比方:Executors.newSingleThreadExecutor()Executors.newFixedThreadPool()Executors.newCachedThreadPool()

但因为业务场景的不同,最好还是自定义线程池;当了解线程池参数和实现原理后,查看它们的源码并不难,咱们不过多叙述

解决异样

线程池中如果出现异常会怎么样?

Runnable

当咱们应用 Runnable 工作时,出现异常会间接抛出

         threadPool.execute(() -> {
             int i = 1;
             int j = 0;
             System.out.println(i / j);
         });

面对这种状况,咱们能够在 Runnable 工作中应用 try-catch 进行捕捉

         threadPool.execute(() -> {
             try {
                 int i = 1;
                 int j = 0;
                 System.out.println(i / j);
             } catch (Exception e) {System.out.println(e);
             }
         });

实际操作的话用日志记录哈,不要打印到控制台

Callable

当咱们应用 Callable 工作时,应用 submit 办法会获取Future

         Future<Integer> future = threadPool.submit(() -> {
             int i = 1;
             int j = 0;
             return i / j;
         });

如果不应用 Future.get() 去获取返回值,那么异样就不会抛出,这是比拟危险的

为什么会呈现这样的状况呢?

前文说过执行 submit 时会将 Callable 封装成 FutureTask 执行

在其实现 Runnable 中,在执行 Callable 工作时,如果出现异常会封装在 FutureTask 中

     public void run() {
         //... 其余略
         try {
             // 执行 call 工作
             result = c.call();
             ran = true;
         } catch (Throwable ex) {
             // 出现异常 封装到 FutureTask
             result = null;
             ran = false;
             setException(ex);
         }
         //..
     }

等到执行 get 时,先阻塞、直到实现工作再来判断状态,如果状态不失常则抛出封装的异样

     private V report(int s) throws ExecutionException {
         Object x = outcome;
         if (s == NORMAL)
             return (V)x;
         if (s >= CANCELLED)
             throw new CancellationException();
         throw new ExecutionException((Throwable)x);
     }

因而在解决 Callable 工作时,能够对工作进行捕捉也能够对 get 进行捕捉

         // 捕捉工作
         Future<?> f = threadPool.submit(() -> {
             try {
                 int i = 1;
                 int j = 0;
                 return i / j;
             } catch (Exception e) {System.out.println(e);
             } finally {return null;}
         });
 ​
         // 捕捉 get
         Future<Integer> future = threadPool.submit(() -> {
             int i = 1;
             int j = 0;
             return i / j;
         });
 ​
         try {Integer integer = future.get();
         } catch (Exception e) {System.out.println(e);
         }
afterExecutor

还记得线程池的 runWorker 吗?

它在循环中不停的获取阻塞队列中的工作执行,在执行前后预留钩子办法

继承 ThreadPoolExecutor 来重写执行后的钩子办法,记录执行完是否产生异样,如果有异样则进行日志记录,作一层兜底计划

 public class MyThreadPool extends ThreadPoolExecutor {  
     //...
     
     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         //Throwable 为空 可能是 submit 提交 如果 runnable 为 future 则捕捉 get
         if (Objects.isNull(t) && r instanceof Future<?>) {
             try {Object res = ((Future<?>) r).get();} catch (InterruptedException e) {Thread.currentThread().interrupt();} catch (ExecutionException e) {t = e;}
         }
 ​
         if (Objects.nonNull(t)) {System.out.println(Thread.currentThread().getName() + ":" + t.toString());
         }
     }
 }

这样即便应用 submit,遗记应用 get 时,异样也不会“隐没”

setUncaughtException

创立线程时,能够设置未捕捉异样 uncaughtException 办法,当线程出现异常未捕捉时调用,也能够打印日志作兜底

咱们定义咱们本人的线程工厂,以业务组 group 为单位,创立线程(不便出错排查)并设置 uncaughtException 办法

 public class MyThreadPoolFactory implements ThreadFactory {
 ​
     private AtomicInteger threadNumber = new AtomicInteger(1);
     
     private ThreadGroup group;
 ​
     private String namePrefix = "";
 ​
     public MyThreadPoolFactory(String group) {this.group = new ThreadGroup(group);
         namePrefix = group + "-thread-pool-";
     }
 ​
 ​
     @Override
     public Thread newThread(Runnable r) {
         Thread t = new Thread(group, r,
                 namePrefix + threadNumber.getAndIncrement(),
                 0);
         t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {System.out.println(t.getName() + ":" + e);
             }
         });
 ​
         if (t.isDaemon()) {t.setDaemon(false);
         }
         if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);
         }
         return t;
     }
 ​
 }

敞开线程池

敞开线程池的 2 种办法: shutdown(),shutdownNow()

它们的原理都是: 遍历工作队列 wokers 中的线程,一一中断 (调用线程的interrupt 办法) 无奈响应中断的工作可能永远无奈终止

shutdown 工作会被执行完

  1. 将线程池状态设置为 SHUTDOWN
  2. 中断所有未正在执行工作的线程

shutdownNow 工作不肯定会执行完

  1. 将线程池状态设置为 STOP
  2. 尝试进行所有正在执行或暂停工作的线程
  3. 返回期待执行工作列表

通常应用 shutdown,如果工作不肯定要执行完能够应用 shutdownNow

SecheduledThreadPoolExecutor

ScheduledThreadPoolExecutorThreadPoolExecutor 的根底上提供定时执行的性能

它有两个定时的办法

scheduleAtFixedRate 以工作开始为周期终点,比如说一个工作执行要 0.5s,每隔 1s 执行,相当于执行完工作过 0.5s 又开始执行工作

scheduledWithFixedDelay 以工作完结为周期终点,比如说一个工作执行要 0.5s,每隔 1s 执行,相当于执行完工作过 1s 才开始执行工作

         ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
         //scheduleAtFixedRate 固定频率执行工作 周期终点为工作开始
         scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
             try {TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {e.printStackTrace();
             }
             System.out.println("scheduleAtFixedRate 周期终点为工作开始");
             // 初始提早:1s  周期:1s
         },1,1, TimeUnit.SECONDS);
 ​
         //scheduledWithFixedDelay 固定提早执行工作, 周期终点为工作完结
         scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
             try {TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {e.printStackTrace();
             }
             System.out.println("scheduledWithFixedDelay 周期终点为工作完结");
             // 初始提早:1s  周期:1s
         },1,1, TimeUnit.SECONDS);

定时线程池应用提早队列充当阻塞队列实现的

提早队列是一个优先级队列,它排序存储定时工作,工夫越小越先执行

线程获取工作时,会从提早队列中获取定时工作,如果工夫已到就执行

     public RunnableScheduledFuture<?> take() throws InterruptedException {
             final ReentrantLock lock = this.lock;
             lock.lockInterruptibly();
             try {for (;;) {RunnableScheduledFuture<?> first = queue[0];
                     // 没有定时工作 期待
                     if (first == null)
                         available.await();
                     else {
                         // 获取延迟时间
                         long delay = first.getDelay(NANOSECONDS);
                         // 小于等于 0 阐明超时,拿进去执行
                         if (delay <= 0)
                             return finishPoll(first);
                         first = null; // don't retain ref while waiting
                         // 以后线程是 leader 则期待对应的延迟时间,再进入循环取出工作执行
                         // 不是 leader 则始终期待,直到被唤醒
                         if (leader != null)
                             available.await();
                         else {Thread thisThread = Thread.currentThread();
                             leader = thisThread;
                             try {available.awaitNanos(delay);
                             } finally {if (leader == thisThread)
                                     leader = null;
                             }
                         }
                     }
                 }
             } finally {if (leader == null && queue[0] != null)
                     available.signal();
                 lock.unlock();}
         }

这两个定时办法一个以工作开始为周期终点、另一个以工作完结为周期终点

获取定时工作的流程是雷同的,只是它们构建的定时工作中提早的工夫不同

定时工作应用period 区别,为负数周期终点为工作开始,为正数时周期终点为工作完结

总结

本篇文章围绕线程池,深入浅出的解说池化技术,Executor,线程池的参数、配置、实现原理、解决异样、敞开等

应用池化技术可能节俭频繁创立、敞开的开销,晋升响应速度,方便管理,常利用于线程池、连接池等

Executor 框架将工作工作与执行(线程池)解耦拆散,工作工作分为无返回值的 Runnable 和有返回值的Callable

Executor 理论只解决 Runnable 工作,会将 Callable 工作封装成 FutureTask 适配 Runnable 执行

线程池应用工作队列来治理线程,线程执行完工作会从阻塞队列取工作执行,当非核心线程闲暇肯定工夫后会被敞开

线程池执行时,如果工作队列线程数量小于外围线程数,则创立线程来执行(相当预热)

如果工作队列线程数量大于外围线程数量,并且阻塞队列未满则放入阻塞队列

如果阻塞队列已满,还未达到最大线程数量则创立非核心线程执行工作

如果已达到最大线程数量则应用回绝策略

配置参数 CPU 密集型为 CPU 核数 +1;IO 密集型为 2 倍 CPU 核数;具体配置须要测试

解决异样能够间接捕捉工作,Callable能够捕捉 get,也能够继承线程池实现 afterExecutor 记录异样,还能够在创立线程时就设置解决未捕捉异样办法

解决定时工作的线程池由提早队列实现,工夫越短的定时工作越先执行,线程会从提早队列中获取定时工作(工夫已到的状况),工夫未到就期待

最初(不要白嫖,一键三连求求拉~)

本篇文章被支出专栏 由点到线,由线到面,深入浅出构建 Java 并发编程常识体系,感兴趣的同学能够继续关注喔

本篇文章笔记以及案例被支出 gitee-StudyJava、github-StudyJava 感兴趣的同学能够 stat 下继续关注喔~

案例地址:

Gitee-JavaConcurrentProgramming/src/main/java/D_ThreadPool

Github-JavaConcurrentProgramming/src/main/java/D_ThreadPool

有什么问题能够在评论区交换,如果感觉菜菜写的不错,能够点赞、关注、珍藏反对一下~

关注菜菜,分享更多干货,公众号:菜菜的后端私房菜

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0