关于后端:详解-ThreadPoolExecutor-的参数含义及源码执行流程

2次阅读

共计 8933 个字符,预计需要花费 23 分钟才能阅读完成。

线程池

线程池是为了防止线程频繁的创立和销毁带来的性能耗费,而建设的一种池化技术,它是把已创立的线程放入“池”中,当有工作来长期就能够重用已有的线程,无需期待创立的过程,这样就能够无效进步程序的响应速度。但如果要说线程池的话肯定离不开 ThreadPoolExecutor,在阿里巴巴的《Java 开发手册》中是这样规定线程池的:

 线程池不容许应用 Executors 去创立,而是通过 ThreadPoolExecutor 的形式,这样的解决形式让写的读者更加明确线程池的运行规定,躲避资源耗尽的危险。

阐明:Executors 返回的线程池对象的弊病如下:

FixedThreadPool 和 SingleThreadPool:容许的申请队列长度为 Integer.MAX_VALUE,可能会沉积大量的申请,从而导致 OOM。CachedThreadPool 和 ScheduledThreadPool:容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。

其实当咱们去看 Executors 的源码会发现,Executors.newFixedThreadPool()Executors.newSingleThreadExecutor()Executors.newCachedThreadPool() 等办法的底层都是通过 ThreadPoolExecutor 实现的,所以文咱们就重点来理解一下 ThreadPoolExecutor 的相干常识,比方它有哪些外围的参数?它是如何工作的?

线程池的外围参数

ThreadPoolExecutor 的外围参数指的是它在构建时须要传递的参数,其构造方法如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        // maximumPoolSize 必须大于 0,且必须大于 corePoolSize
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • 第 1 个参数:corePoolSize 示意线程池的常驻外围线程数。如果设置为 0,则示意在没有任何工作时,销毁线程池;如果大于 0,即便没有工作时也会保障线程池的线程数量等于此值。但须要留神,此值如果设置的比拟小,则会频繁的创立和销毁线程(创立和销毁的起因会在本课时的下半局部讲到);如果设置的比拟大,则会节约系统资源,所以开发者须要依据本人的理论业务来调整此值。
  • 第 2 个参数:maximumPoolSize 示意线程池在工作最多时,最大能够创立的线程数。官网规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在工作比拟多,且不能寄存在工作队列时,才会用到。
  • 第 3 个参数:keepAliveTime 示意线程的存活工夫,当线程池闲暇时并且超过了此工夫,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在闲暇的时候也不会销毁任何线程。
  • 第 4 个参数:unit 示意存活工夫的单位,它是配合 keepAliveTime 参数独特应用的。
  • 第 5 个参数:workQueue 示意线程池执行的工作队列,当线程池的所有线程都在解决工作时,如果来了新工作就会缓存到此工作队列中排队期待执行。
  • 第 6 个参数:threadFactory 示意线程的创立工厂,此参数个别用的比拟少,咱们通常在创立线程池时不指定此参数,它会应用默认的线程创立工厂的办法来创立线程,源代码如下:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {// Executors.defaultThreadFactory() 为默认的线程创立工厂
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();
}
// 默认的线程创立工厂,须要实现 ThreadFactory 接口
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    // 创立线程
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon()) 
            t.setDaemon(false); // 创立一个非守护线程
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值
        return t;
    }
}

咱们也能够自定义一个线程工厂,通过实现 ThreadFactory 接口来实现,这样就能够自定义线程的名称或线程执行的优先级了。

  • 第 7 个参数:RejectedExecutionHandler 示意指定线程池的回绝策略,当线程池的工作曾经在缓存队列 workQueue 中存储满了之后,并且不能创立新的线程来执行此工作时,就会用到此回绝策略,它属于一种限流爱护的机制。

线程池的工作流程要从它的执行办法 execute() 说起,源码如下:

public void execute(Runnable command) {if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 当前工作的线程数小于外围线程数
    if (workerCountOf(c) < corePoolSize) {
        // 创立新的线程执行此工作
        if (addWorker(command, true))
            return;
        c = ctl.get();}
    // 查看线程池是否处于运行状态,如果是则把工作增加到队列
    if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
        // 再次查看线程池是否处于运行状态,避免在第一次校验通过后线程池敞开
        // 如果是非运行状态,则将刚退出队列的工作移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池的线程数为 0 时(当 corePoolSize 设置为 0 时会产生)else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 新建线程执行工作
    }
    // 外围线程都在忙且队列都已爆满,尝试新启动一个线程执行失败
    else if (!addWorker(command, false)) 
        // 执行回绝策略
        reject(command);
}

其中 addWorker(Runnable firstTask, boolean core) 办法的参数阐明如下:

firstTask,线程应首先运行的工作,如果没有则能够设置为 null;core,判断是否能够创立线程的阀值(最大值),如果等于 true 则示意应用 corePoolSize 作为阀值,false 则示意应用 maximumPoolSize 作为阀值。

考点剖析

这道面试题考查的是你对于线程池和 ThreadPoolExecutor 的把握水平,也属于 Java 的基础知识,简直所有的面试都会被问到,其中线程池工作执行的次要流程,能够参考以下流程图:

与 ThreadPoolExecutor 相干的面试题还有以下几个:

ThreadPoolExecutor 的执行办法有几种?它们有什么区别?什么是线程的回绝策略?回绝策略的分类有哪些?如何自定义回绝策略?ThreadPoolExecutor 能不能实现扩大?如何实现扩大?

常识扩大

execute() VS submit()

execute()submit() 都是用来执行线程池工作的,它们最次要的区别是,submit() 办法能够接管线程池执行的返回值,而 execute() 不能接管返回值。

来看两个办法的具体应用:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L,
        TimeUnit.SECONDS, new LinkedBlockingQueue(20));
// execute 应用
executor.execute(new Runnable() {
    @Override
    public void run() {System.out.println("Hello, execute.");
    }
});
// submit 应用
Future<String> future = executor.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {System.out.println("Hello, submit.");
        return "Success";
    }
});
System.out.println(future.get());

以上程序执行后果如下:

Hello, submit.
Hello, execute.
Success

从以上后果能够看出 submit() 办法能够配合 Futrue 来接管线程执行的返回值。它们的另一个区别是 execute() 办法属于 Executor 接口的办法,而 submit() 办法则是属于 ExecutorService 接口的办法,它们的继承关系如下图所示:

线程池的回绝策略

当线程池中的工作队列曾经被存满,再有工作增加时会先判断以后线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的回绝策略。

Java 自带的回绝策略有 4 种:

AbortPolicy,终止策略,线程池会抛出异样并终止执行,它是默认的回绝策略;CallerRunsPolicy,把工作交给以后线程来执行;DiscardPolicy,疏忽此工作(最新的工作);DiscardOldestPolicy,疏忽最早的工作(最先退出队列的工作)。

例如,咱们来演示一个 AbortPolicy 的回绝策略,代码如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.AbortPolicy()); // 增加 AbortPolicy 回绝策略
for (int i = 0; i < 6; i++) {executor.execute(() -> {System.out.println(Thread.currentThread().getName());
    });
}

以上程序的执行后果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lagou.interview.ThreadPoolExample$$Lambda$1/1096979270@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
 at com.lagou.interview.ThreadPoolExample.rejected(ThreadPoolExample.java:35)
 at com.lagou.interview.ThreadPoolExample.main(ThreadPoolExample.java:26)

能够看出当第 6 个工作来的时候,线程池则执行了 AbortPolicy 回绝策略,抛出了异样。因为队列最多存储 2 个工作,最大能够创立 3 个线程来执行工作(2+3=5),所以当第 6 个工作来的时候,此线程池就“忙”不过去了。

自定义回绝策略

自定义回绝策略只须要新建一个 RejectedExecutionHandler 对象,而后重写它的 rejectedExecution() 办法即可,如下代码所示:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new RejectedExecutionHandler() {  // 增加自定义回绝策略
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 业务解决办法
                System.out.println("执行自定义回绝策略");
            }
        });
for (int i = 0; i < 6; i++) {executor.execute(() -> {System.out.println(Thread.currentThread().getName());
    });
}

以上代码执行的后果如下:

 执行自定义回绝策略
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2

能够看出线程池执行了自定义的回绝策略,咱们能够在 rejectedExecution 中增加本人业务解决的代码。

ThreadPoolExecutor 扩大

ThreadPoolExecutor 的扩大次要是通过重写它的 beforeExecute() 和 afterExecute() 办法实现的,

咱们能够在扩大办法中增加日志或者实现数据统计,比方统计线程的执行工夫,如下代码所示:

public class ThreadPoolExtend {public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 线程池扩大调用
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2, 4, 10,
                TimeUnit.SECONDS, new LinkedBlockingQueue());
        for (int i = 0; i < 3; i++) {executor.execute(() -> {Thread.currentThread().getName();});
        }
    }
   /**
     * 线程池扩大
     */
    static class MyThreadPoolExecutor extends ThreadPoolExecutor {
        // 保留线程执行开始工夫
        private final ThreadLocal<Long> localTime = new ThreadLocal<>();
        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
        /**
         * 开始执行之前
         * @param t 线程
         * @param r 工作
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {Long sTime = System.nanoTime(); // 开始工夫 (单位:纳秒)
            localTime.set(sTime);
            System.out.println(String.format("%s | before | time=%s",
                    t.getName(), sTime));
            super.beforeExecute(t, r);
        }
        /**
         * 执行实现之后
         * @param r 工作
         * @param t 抛出的异样
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {Long eTime = System.nanoTime(); // 完结工夫 (单位:纳秒)
            Long totalTime = eTime - localTime.get(); // 执行总工夫
            System.out.println(String.format("%s | after | time=%s | 耗时:%s 毫秒",
                    Thread.currentThread().getName(), eTime, (totalTime / 1000000.0)));
            super.afterExecute(r, t);
        }
    }
}

以上程序的执行后果如下所示:

pool-1-thread-1 | before | time=4570298843700
pool-1-thread-2 | before | time=4570298840000
pool-1-thread-1 | after | time=4570327059500 | 耗时:28.2158 毫秒
pool-1-thread-2 | after | time=4570327138100 | 耗时:28.2981 毫秒
pool-1-thread-1 | before | time=4570328467800
pool-1-thread-1 | after | time=4570328636800 | 耗时:0.169 毫秒 

总结

最初咱们总结一下:线程池的应用必须要通过 ThreadPoolExecutor 的形式来创立,这样才能够更加明确线程池的运行规定,躲避资源耗尽的危险。同时,也介绍了 ThreadPoolExecutor 的七大外围参数,包含外围线程数和最大线程数之间的区别,当线程池的工作队列没有可用空间且线程池的线程数量曾经达到了最大线程数时,则会执行回绝策略,Java 主动的回绝策略有 4 种,用户也能够通过重写 rejectedExecution() 来自定义回绝策略,咱们还能够通过重写 beforeExecute()afterExecute() 来实现 ThreadPoolExecutor 的扩大性能。

本文由 mdnice 多平台公布

正文完
 0