关于java:为什么阿里不允许用Executors创建线程池而是通过ThreadPoolExecutor的方式

18次阅读

共计 5920 个字符,预计需要花费 15 分钟才能阅读完成。

1. 通过 Executors 创立线程池的弊病

在创立线程池的时候,大部分人还是会抉择应用 Executors 去创立。

上面是创立定长线程池(FixedThreadPool)的一个例子,严格来说,当应用如下代码创立线程池时,是不合乎编程标准的。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); 

起因在于:(摘自阿里编码规约)

线程池不容许应用 Executors 去创立,而是通过 ThreadPoolExecutor 的形式,这样的解决形式让写的同学更加明确线程池的运行规定,躲避资源耗尽的危险。

阐明:Executors 各个办法的弊病:

1)newFixedThreadPool 和 newSingleThreadExecutor:

次要问题是沉积的申请解决队列可能会消耗十分大的内存,甚至 OOM。

2)newCachedThreadPool 和 newScheduledThreadPool:

次要问题是线程数最大数是 Integer.MAX_VALUE,可能会创立数量十分多的线程,甚至 OOM。

2. 通过 ThreadPoolExecutor 创立线程池

所以,针对下面的不标准代码,重构为通过 ThreadPoolExecutor 创立线程池的形式。

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    } 

ThreadPoolExecutor 是线程池的外围实现。线程的创立和终止须要很大的开销,线程池中事后提供了指定数量的可重用线程,所以应用线程池会节俭系统资源,并且每个线程池都保护了一些根底的数据统计,不便线程的治理和监控。

3.ThreadPoolExecutor 参数解释

上面是对其参数的解释,在创立线程池时需依据本人的状况来正当设置线程池。

corePoolSize & maximumPoolSize
外围线程数(corePoolSize)和最大线程数(maximumPoolSize)是线程池中十分重要的两个概念,心愿同学们可能把握。

当一个新工作被提交到池中,如果以后运行线程小于外围线程数(corePoolSize),即便以后有闲暇线程,也会新建一个线程来解决新提交的工作;如果以后运行线程数大于外围线程数(corePoolSize)并小于最大线程数(maximumPoolSize),只有当期待队列已满的状况下才会新建线程。

keepAliveTime & unit
keepAliveTime 为超过 corePoolSize 线程数量的线程最大闲暇工夫,unit 为工夫单位。

期待队列
任何阻塞队列(BlockingQueue)都能够用来转移或保留提交的工作,线程池大小和阻塞队列互相束缚线程池:

1. 如果运行线程数小于 corePoolSize,提交新工作时就会新建一个线程来运行;

2. 如果运行线程数大于或等于 corePoolSize,新提交的工作就会入列期待;如果队列已满,并且运行线程数小于 maximumPoolSize,也将会新建一个线程来运行;

3. 如果线程数大于 maximumPoolSize,新提交的工作将会依据回绝策略来解决。

上面来看一下三种通用的入队策略:

1. 间接传递:
通过 SynchronousQueue 间接把工作传递给线程。如果以后没可用线程,尝试入队操作会失败,而后再创立一个新的线程。当解决可能具备外部依赖性的申请时,该策略会防止申请被锁定。间接传递通常须要无界的最大线程数(maximumPoolSize),防止回绝新提交的工作。当工作继续达到的平均速度超过可解决的速度时,可能导致线程的有限增长。

2. 无界队列:
应用无界队列(如 LinkedBlockingQueue)作为期待队列,当所有的外围线程都在解决工作时,新提交的工作都会进入队列期待。因而,不会有大于 corePoolSize 的线程会被创立(maximumPoolSize 也将失去作用)。这种策略适宜每个工作都齐全独立于其余工作的状况;例如网站服务器。这种类型的期待队列能够使霎时暴发的高频申请变得平滑。当工作继续达到的平均速度超过可处理速度时,可能导致期待队列有限增长。

3. 有界队列:
当应用无限的最大线程数时,有界队列(如 ArrayBlockingQueue)能够避免资源耗尽,然而难以调整和管制。队列大小和线程池大小能够相互作用:应用大的队列和小的线程数能够缩小 CPU 使用率、系统资源和上下文切换的开销,然而会导致吞吐量变低,如果工作频繁地阻塞(例如被 I / O 限度),零碎就能为更多的线程调度执行工夫。应用小的队列通常须要更多的线程数,这样能够最大化 CPU 使用率,但可能会须要更大的调度开销,从而升高吞吐量。

回绝策略

当线程池曾经敞开或达到饱和(最大线程和队列都已满)状态时,新提交的工作将会被回绝。ThreadPoolExecutor 定义了四种回绝策略:

1.AbortPolicy:
默认策略,在须要回绝工作时抛出 RejectedExecutionException;

2.CallerRunsPolicy:
间接在 execute 办法的调用线程中运行被回绝的工作,如果线程池曾经敞开,工作将被抛弃;

3.DiscardPolicy:
间接抛弃工作;

4.DiscardOldestPolicy:
抛弃队列中等待时间最长的工作,并执行以后提交的工作,如果线程池曾经敞开,工作将被抛弃。

咱们也能够自定义回绝策略,只须要实现 RejectedExecutionHandler;须要留神的是,回绝策略的运行须要指定线程池和队列的容量。

4.ThreadPoolExecutor 创立线程形式

通过上面的 demo 来理解 ThreadPoolExecutor 创立线程的过程。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 测试 ThreadPoolExecutor 对线程的执行程序
 **/
public class ThreadPoolSerialTest {public static void main(String[] args) {
        // 外围线程数
        int corePoolSize = 3;
        // 最大线程数
        int maximumPoolSize = 6;
        // 超过 corePoolSize 线程数量的线程最大闲暇工夫
        long keepAliveTime = 2;
        // 以秒为工夫单位
        TimeUnit unit = TimeUnit.SECONDS;
        // 创立工作队列,用于寄存提交的期待执行工作
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
        ThreadPoolExecutor threadPoolExecutor = null;
        try {
            // 创立线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());

            // 循环提交工作
            for (int i = 0; i < 8; i++) {
                // 提交工作的索引
                final int index = (i + 1);
                threadPoolExecutor.submit(() -> {
                    // 线程打印输出
                    System.out.println("大家好,我是线程:" + index);
                    try {
                        // 模仿线程执行工夫,10s
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                });
                // 每个工作提交后休眠 500ms 再提交下一个工作,用于保障提交程序
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {threadPoolExecutor.shutdown();
        }
    }
} 

执行后果:

这里形容一下执行的流程:

  • 首先通过 ThreadPoolExecutor 构造函数创立线程池;
  • 执行 for 循环,提交 8 个工作(恰好等于 maximumPoolSize[最大线程数] + capacity[队列大小]);
  • 通过 threadPoolExecutor.submit 提交 Runnable 接口实现的执行工作;
  • 提交第 1 个工作时,因为以后线程池中正在执行的工作为 0,小于 3(corePoolSize 指定),所以会创立一个线程用来执行提交的工作 1;
  • 提交第 2,3 个工作的时候,因为以后线程池中正在执行的工作数量小于等于 3(corePoolSize 指定),所以会为每一个提交的工作创立一个线程来执行工作;
  • 当提交第 4 个工作的时候,因为以后正在执行的工作数量为 3(因为每个线程工作执行工夫为 10s,所以提交第 4 个工作的时候,后面 3 个线程都还在执行中),此时会将第 4 个工作寄存到 workQueue 队列中期待执行;
  • 因为 workQueue 队列的大小为 2,所以该队列中也就只能保留 2 个期待执行的工作,所以第 5 个工作也会保留到工作队列中;
  • 当提交第 6 个工作的时候,因为以后线程池正在执行的工作数量为 3,workQueue 队列中存储的工作数量也满了,这时会判断以后线程池中正在执行的工作的数量是否小于 6(maximumPoolSize 指定);
  • 如果小于 6,那么就会新创建一个线程来执行提交的工作 6;
  • 执行第 7,8 个工作的时候,也要判断以后线程池中正在执行的工作数是否小于 6(maximumPoolSize 指定),如果小于 6,那么也会立刻新建线程来执行这些提交的工作;
  • 此时,6 个工作都曾经提交结束,那 workQueue 队列中的期待 工作 4 和 工作 5 什么时候执行呢?
  • 当工作 1 执行结束后(10s 后),执行工作 1 的线程并没有被销毁掉,而是获取 workQueue 中的工作 4 来执行;
  • 当工作 2 执行结束后,执行工作 2 的线程也没有被销毁,而是获取 workQueue 中的工作 5 来执行;

通过下面流程的剖析,也就晓得了之前案例的输入后果的起因。其实,线程池中会线程执行结束后,并不会被立即销毁,线程池中会保留 corePoolSize 数量的线程,当 workQueue 队列中存在工作或者有新提交工作时,那么会通过线程池中已有的线程来执行工作,防止了频繁的线程创立与销毁,而大于 corePoolSize 小于等于 maximumPoolSize 创立的线程,则会在闲暇指定工夫(keepAliveTime)后进行回收。

5.ThreadPoolExecutor 回绝策略

在下面的测试中,我设置的执行线程总数恰好等于 maximumPoolSize[最大线程数] + capacity[队列大小],因而没有呈现须要执行回绝策略的状况,因而在这里,我再减少一个线程,提交 9 个工作,来演示不同的回绝策略。

AbortPolicy

CallerRunsPolicy

DiscardPolicy

DiscardOldestPolicy

参考
https://www.jianshu.com/p/7be…
https://www.jianshu.com/p/6f8…

作者:雪山上的蒲公英
起源:cnblogs.com/zjfjava/p/11227456.html

正文完
 0