乐趣区

关于高并发:高并发从源码角度分析创建线程池究竟有哪些方式

大家好,我是冰河~~

在 Java 的高并发畛域,线程池始终是一个绕不开的话题。有些童鞋始终在应用线程池,然而,对于如何创立线程池仅仅停留在应用 Executors 工具类的形式,那么,创立线程池到底存在哪几种形式呢?就让咱们一起从创立线程池的源码来深入分析到底有哪些形式能够创立线程池。

应用 Executors 工具类创立线程池

在创立线程池时,初学者用的最多的就是 Executors 这个工具类,而应用这个工具类创立线程池时非常简单的,不须要关注太多的线程池细节,只须要传入必要的参数即可。Executors 工具类提供了几种创立线程池的办法,如下所示。

  • Executors.newCachedThreadPool:创立一个可缓存的线程池,如果线程池的大小超过了须要,能够灵便回收闲暇线程,如果没有可回收线程,则新建线程
  • Executors.newFixedThreadPool:创立一个定长的线程池,能够控制线程的最大并发数,超出的线程会在队列中期待
  • Executors.newScheduledThreadPool:创立一个定长的线程池,反对定时、周期性的工作执行
  • Executors.newSingleThreadExecutor: 创立一个单线程化的线程池,应用一个惟一的工作线程执行工作,保障所有工作依照指定程序(先入先出或者优先级)执行
  • Executors.newSingleThreadScheduledExecutor: 创立一个单线程化的线程池,反对定时、周期性的工作执行
  • Executors.newWorkStealingPool:创立一个具备并行级别的 work-stealing 线程池

其中,Executors.newWorkStealingPool 办法是 Java 8 中新增的创立线程池的办法,它可能为线程池设置并行级别,具备更高的并发度和性能。除了此办法外,其余创立线程池的办法实质上调用的是 ThreadPoolExecutor 类的构造方法。

例如,咱们能够应用如下代码创立线程池。

Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);

应用 ThreadPoolExecutor 类创立线程池

从代码构造上看 ThreadPoolExecutor 类继承自 AbstractExecutorService,也就是说,ThreadPoolExecutor 类具备 AbstractExecutorService 类的全副性能。

既然 Executors 工具类中创立线程池大部分调用的都是 ThreadPoolExecutor 类的构造方法,所以,咱们也能够间接调用 ThreadPoolExecutor 类的构造方法来创立线程池,而不再应用 Executors 工具类。接下来,咱们一起看下 ThreadPoolExecutor 类的构造方法。

ThreadPoolExecutor 类中的所有构造方法如下所示。

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                 BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                    TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                    BlockingQueue<Runnable> workQueue,
                ThreadFactory threadFactory,
                RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

由 ThreadPoolExecutor 类的构造方法的源代码可知,创立线程池最终调用的构造方法如下。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
              long keepAliveTime, TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                  RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

对于此构造方法中各参数的含意和作用,各位能够移步《高并发之——不得不说的线程池与 ThreadPoolExecutor 类浅析》进行查阅。

大家能够自行调用 ThreadPoolExecutor 类的构造方法来创立线程池。例如,咱们能够应用如下模式创立线程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

应用 ForkJoinPool 类创立线程池

在 Java8 的 Executors 工具类中,新增了如下创立线程池的形式。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

从源代码能够能够,实质上调用的是 ForkJoinPool 类的构造方法类创立线程池,而从代码构造上来看 ForkJoinPool 类继承自 AbstractExecutorService 抽象类。接下来,咱们看下 ForkJoinPool 类的构造方法。

public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
 public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism,
                ForkJoinWorkerThreadFactory factory,
                UncaughtExceptionHandler handler,
                boolean asyncMode) {this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();}

private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

通过查看源代码得悉,ForkJoinPool 的构造方法,最终调用的是如下公有构造方法。

private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

其中,各参数的含意如下所示。

  • parallelism:并发级别。
  • factory:创立线程的工厂类对象。
  • handler:当线程池中的线程抛出未捕捉的异样时,对立应用 UncaughtExceptionHandler 对象解决。
  • mode:取值为 FIFO_QUEUE 或者 LIFO_QUEUE。
  • workerNamePrefix:执行工作的线程名称的前缀。

当然,公有构造方法尽管是参数最多的一个办法,然而其不会间接对外办法,咱们能够应用如下形式创立线程池。

new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

应用 ScheduledThreadPoolExecutor 类创立线程池

在 Executors 工具类中存在如下办法类创立线程池。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

从源码来看,这几个办法实质上调用的都是 ScheduledThreadPoolExecutor 类的构造方法,ScheduledThreadPoolExecutor 中存在的构造方法如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

而从代码构造上看,ScheduledThreadPoolExecutor 类继承自 ThreadPoolExecutor 类,实质上还是调用 ThreadPoolExecutor 类的构造方法,只不过此时传递的队列为 DelayedWorkQueue。咱们能够间接调用 ScheduledThreadPoolExecutor 类的构造方法来创立线程池,例如以如下模式创立线程池。

new ScheduledThreadPoolExecutor(3)

好了,明天就到这儿吧,我是冰河,咱们下期见~~

退出移动版