乐趣区

关于java:ThreadPoolExecutor是怎么去执行一个任务的看这篇就够了

前言

后面一遍文章 咱们看了下 FutureTask 的源码,晓得了怎么样去获取一个工作的返回值,明天咱们看下 ThreadPoolExecutor。

ThreadPoolExecutor 看名词 咱们就能够 看做是 ThreadPool 和 Executor 的联合,大略意思咱们也能晓得就是线程池执行器,哈哈这翻译 真棒。这篇博文 会从源码的角度去剖析下 一个线程工作 退出的线程池当前 是怎么被执行的~

线程池

下面 说线程的时候 咱们也说过 线程是零碎中极其宝贵的资源,那咱们要正当的应用他,所以有了线程池的呈现,那线程池能带来哪些益处呢

  • 升高资源的耗费:通过反复利用曾经创立的线程来升高线程创立和销毁带来的耗费
  • 提供响应速度:当咱们创立人物达到的时候,工作能够不须要期待线程的创立就能立刻执行
  • 进步线程可管理性:线程是稀缺资源,不能有限创立,所以要应用线程池对线程进行同一的治理和调配,调优和监控等等。

源码剖析

继承构造

首先 咱们看下 ThreadPoolExecutor 的继承关系

public class ThreadPoolExecutor extends AbstractExecutorService{}

public abstract class AbstractExecutorService implements ExecutorService{}

public interface ExecutorService extends Executor {
    <!-- 进行线程池,状态设置为 SHUTDOWN,并且不在承受新的工作,曾经提交的工作会继续执行 -->
    void shutdown();
    <!-- 进行线程池,状态设置为 STOP,不在承受先工作,尝试中断正在执行的工作,返回还未执行的工作 -->
    List<Runnable> shutdownNow();
    <!-- 是否是 SHUTDOWN 状态 -->
    boolean isShutdown();
    <!-- 是否所有工作都曾经终止 -->
    boolean isTerminated();
    <!-- 超时工夫内,去期待工作执行工作 -->
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;   
    <!--Callable 去提交工作 -->
    <T> Future<T> submit(Callable<T> task);
    <!--Runnable 去提交工作 -->
    <T> Future<T> submit(Runnable task, T result);
    <!--Runnable 去提交工作 -->
    Future<?> submit(Runnable task);
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

public interface Executor {void execute(Runnable command);
}

咱们先从最上面的接口 Executor 来看,这个接口就是一个实现,就是执行 execute 办法,这个接口就是线程执行的入口

ExecutorService 接口继承了 Executor 接口,外面的的办法比拟多,咱们常见的 shutdownNow,shutdown 就是在这个接口外面的,还有就是咱们常见往线程池外面提交工作的时候 submit 办法。ExecutorService 丰盛了对工作执行和治理的性能

AbstractExecutorService 是一个抽象类,实现了 ExecutorService 接口,这边顺带说下,为什么 java 源码外面存在大量 抽象类实现接口,而后类再继承抽象类,为什么类不间接实现接口呢?还要套一层呢,之前我也不明确,起初我才分明,抽象类去实现接口,就是去实现一些公共的接口办法,这样类再次去实现接口的时候,只有关怀我不同的实现就好了,因为 咱们晓得接口的实现类不止一个,抽象类就是把这些要实现接口的类的公共的实现再次抽取进去,防止了大量的反复实现,尤其 List,Set 接口 你看下 简直都有响应的抽象类实现!

次要的变量

    <!--ctl 存储了线程池状态和线程的数量 -->
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 2 的 29 次方 -1

    // runState is stored in the high-order bits
    <!-- 示意线程池正在运行,能够接受任务 解决线程池中工作 -->
    private static final int RUNNING    = -1 << COUNT_BITS;
    <!-- 不承受新的工作,然而任然会解决队列中的工作 -->
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    <!-- 不承受新的工作,不会解决队里中工作,对正在执行的工作进行中断 -->
    private static final int STOP       =  1 << COUNT_BITS;
    <!-- 工作被中断,正在解决整顿状态 -->
    private static final int TIDYING    =  2 << COUNT_BITS;
    <!-- 示意终结状态 -->
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    <!-- 获取以后线程池的运行状态 -->
    private static int runStateOf(int c)     {return c & ~CAPACITY;}
    <!-- 获取以后线程池中工作线程的数量 ->
    private static int workerCountOf(int c)  {return c & CAPACITY;}
     <!-- 获取 ctl 的值 ->
    private static int ctlOf(int rs, int wc) {return rs | wc;}

对于 Ctl 是怎么解决线程状态和线程数的数量的,能够看下我的另外一篇博文:https://blog.csdn.net/zxlp520…

构造函数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

这个构造函数 是所有构造函数最终调用的办法,那咱们就说下 这些具体的参数

  1. int corePoolSize 外围的线程数量
  2. int maximumPoolSize 最大的线程数量
  3. long keepAliveTime 线程存活的最大工夫设置
  4. TimeUnit unit 设置工夫的单位 和 keepAliveTime 是对应的
  5. BlockingQueue<Runnable> workQueue 阻塞队里,存储要执行的工作
  6. ThreadFactory threadFactory 创立执行线程的工厂 默认值:Executors.defaultThreadFactory()
  7. RejectedExecutionHandler handler 工作的回绝 Hander 办法

    • 默认的是 AbortPolicy 就是抛出异样,
    • 还有三种策略是 DiscardPolicy 抛弃策略,DiscardOldestPolicy 抛弃队列中等待时间最长的工作策略,CallerRunsPolicy 这个是让调用的线程去解决的策略

Worker

为什么要先讲 worker 呢?因为咱们提交的工作 Runnabale 是以 Worker 这个对象去包装后运行的,这个前面我我讲 addWorker 办法的时候在细聊

先看下 Worker 的代码:

 /** Worker 继承了 AQS 和实现了 Runnable 接口   */
 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** worker 运行的主体线程 就是在哪个线程外面运行工作的 */
        final Thread thread;
        /** 须要运行的工作 */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);// 这边的 this 就是以后的 Worker 对象 
        }

        /**  运行 以后的工作 runWorker 是 ThreadPoolExecutor 外面的办法 */
        public void run() {runWorker(this);
        }

        // Lock methods
        // 0 示意 没有锁住状态
        // 1 示意 锁住状态
        protected boolean isHeldExclusively() {return getState() != 0;
        }
        
        <!-- 这个办法咱们应该很相熟 我在将 AQS 的时候聊过这个办法,这边做的就是尝试批改 state 的状态,这样就是示意加锁的意思,示意这个 worker 是锁住状态,别的线程不能执行,-->
        protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {//CAS 去批改 State 的值,1 示意 曾经被上锁
                setExclusiveOwnerThread(Thread.currentThread()); 设置以后锁的占用者线程是以后线程
                return true;
            }
            return false;
        }
        <!-- 开释锁,也就是批改 State 的值 为 0 unused 这个字段命名也挺有意思,意思是说 没用的意思 -->
        protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);// 设置以后锁的占用者线程是 null
            setState(0);
            return true;
        }
        <!-- 给以后的 Worker 加锁,如果获取不到 就退出期待队里中,阻塞以后执行线程 -->
        public void lock()        { acquire(1); }
        <!-- 这边相当于一个非偏心锁的实现  去尝试下加锁 -->
        public boolean tryLock()  { return tryAcquire(1); }
        <!-- 开释锁 -->
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        
        <!-- 尝试去中断运行的线程工作,就是咱们调用 shutdownNow 的时候 -->
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {t.interrupt();
                } catch (SecurityException ignore) {}}
        }
    }

首先看下 这个 Worker 的继承构造,首先是实现了 Runnable, 又了这样的关系,Worker 就能够被 Thread 去执行了,另外一个还有一个继承了一个抽象类 AbstractQueuedSynchronizer,简称 AQS, 这个类 哈哈 真的是很久不见了,我之前花了 5 篇文章解释了这个 AQS, 可想而知其重要性,JUC 中很多实现都是 基于这个去做的,还是不分明的小伙伴能够去到我的博客外面去找下。

这边又一行代码 咱们须要注意下,挺有意思的,this.thread = getThreadFactory().newThread(this); 这边 的 this 就是咱们构建的 Worker,thread 就是用 ThreadFactory 去创立的一个线程并且执行的工作就是 Worker, 也就是调用 thread.start() 就能够执行 Worker 了

execute

execute 是实现 Executor 接口的办法,就是执行的工作的入口办法,咱们看下一个工作的提交进来是怎么做的

 public void execute(Runnable command) {if (command == null)
           throw new NullPointerException();
       /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task.  The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread.  If it fails, we know we are shut down or saturated
        * and so reject the task.
        */
       int c = ctl.get();// 获取以后的 ctl 值
       /*
        * workerCountOf 办法我下面也讲过,就是获取以后的工作线程数
        * 如果以后的工作线程数小于设置的外围线程数量,就调用 addWorker 去新增一个工作线程,ture 是示意要增加外围工作线程
        * addWorker 如果增加胜利就间接返回,如果增加失败就持续后去下 ctl, 这边重写获取是为了 避免在 addWorker 过程中 ctl 产生了扭转 
        */
       if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
               return;
           c = ctl.get();}
       /*
        * 走到这步 阐明以后的工作线程数大于外围线程数或者是 addWorker 产生了失败
        * 首先去判断了下 以后的线程状态是否是 Running 而后把当前任务退出到阻塞队列 workQueue 中
        * 如果都胜利了 那就再次获取下 ctl, 因为咱们在 offer Runnable 的时候可能 ctl 也会发生变化
        * 这边的多重验证 思考到高并发的状况,代码逻辑十分的谨严
        * 持续走上来的逻辑是  再次判断下线程池状态 如果是非 Running,那就移除以后的工作, 最初执行 reject 办法 依据不同的回绝策略,做不同的行为
        * 最初走到 判断以后线程数量如果是 0,还是回去调用 addWorker 办法,传入一个空的 Runnalbe,false 是示意创立一个非核心的工作线程
        */
       if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
           if (! isRunning(recheck) && remove(command))
               reject(command);
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
       /*
        * 走到这个判断 阐明以后线程池状态是非 Running 或者入队工作失败,队列可能是满了
        * 这边是去创立非核心线程去解决工作,如果创立失败 就执行回绝策略
        */
       else if (!addWorker(command, false))
           reject(command);
   }

这边的英文正文 我没舍得删除,读者能够去本人翻译下 形容的可能比我精确,我置信 大家能看的懂,而后再比照下 我上面的中文正文,我置信能分明 一个工作新增进来 是怎么个解决流程!

看完本人再回忆下,什么时候去创立外围线程?什么时候去创立非核心线程?什么时候工作会退出的阻塞队列中?最初执行回绝策略 有那几种状况?晓得这些答案 那么 execute 办法你应该了然于心了!

addWorker

上面咱们看下一个重点的办法,这个办法 调用的频次很高,咱们进入去看下

 private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       // 这个是一个自旋 套了一个自旋  其目标就是 CAS 新增线程池的数量
       for (;;) {int c = ctl.get();// 获取 ctl 的值
           int rs = runStateOf(c);// 获取以后的线程状态

           // 这边这个条件看上去很绕头,然而认真看看就能晓得
           // 第一个条件 rs >= SHUTDOWN 阐明线程池状态不失常
           // 前面有一个非的判断 其实就是括号外面的条件有一个不成立 整个条件就是 false
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               return false;

           for (;;) {
               // 上面是获取线程外面的工作线程 如果大于最大值或者设置的阈值,就返回间接返回 false 办法完结 
               int wc = workerCountOf(c);
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize))
                   return false;
               // 这个的意思是 如果 CAS 批改 workerCount 胜利 整个最外层的自旋就完结
               if (compareAndIncrementWorkerCount(c))
                   break retry;
               // 这边为什么要用 2 个自旋 次要是这边又判断了下 以后这个自旋 CAS 批改 WorkerCount 失败后,ctl 会发生变化
               // 如果和外层的不相等,就要返回外层的自旋 去重写做
               这边就是为什么用的是   continue retry 
               c = ctl.get();  // Re-read ctl
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
           }
       }

       boolean workerStarted = false;//worker 是否开始执行了
       boolean workerAdded = false;//worker 是否增加胜利
       Worker w = null;
       try {w = new Worker(firstTask);// 将 Runnable 传入到 worker 的构造函数中,下面也讲过,其实就是用 firstTask 去结构了先的 Thread
           final Thread t = w.thread;// 以后的 t 就是执行 Runnable 的线程,在 worker 中创立
           if (t != null) {
               final ReentrantLock mainLock = this.mainLock;// 重入锁
               mainLock.lock();// 保障增加 workder 时候的线程平安
               try {int rs = runStateOf(ctl.get());
                   if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       workers.add(w);// 增加 worker 到一个工作 worker 汇合中 HashSet 存储的
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true;
                   }
               } finally {mainLock.unlock();// 开释锁
               }
               if (workerAdded) {// 如果增加胜利
                   t.start();// 这个是真正执行 Worker 的中央 就是这儿
                   workerStarted = true;
               }
           }
       } finally {if (! workerStarted)
               addWorkerFailed(w);// 如果最终 Worker 没有运行,那就清理掉他 批改对应的 WorkerCount 
       }
       return workerStarted;
   }

办法最开始的中央 用了 2 个自旋去解决并发状况下的 CAS 批改 workerCount 失败的状况,这边每个细节,每种状况都思考的很到位,状态判断的特地的谨严,真正看明确,感觉多线程状况下的编程是如许的麻烦,辛亏帮咱们做了封装!

咱们看下 t.Start() 这边办法,咱们晓得 t 就是 Worker 外面创立线程主体,是以本人为工作传入到 Thread 中的,咱们晓得 start 是开始运行线程,最终是会调用到 run 办法的,那么就是说会调到 Worker 外面的 run 办法,咱们在回看下 Worker 外面的 run 办法

public void run() {runWorker(this);//ThreadPoolExecutor 外面的办法
}

runWorker

下面我也说了 线程 start 后会调用 run 办法,那么也就是调用 runWorker 办法,咱们在看下这个外面写的时候什么

 final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;// 获取 Worker 外面的工作
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 始终 while 循环
            while (task != null || (task = getTask()) != null) {w.lock();// 锁住 Worker
                // 判断如果以后的线程池状态是 stop 并且检测以后线程的中断状态如果是 false 就帮忙以后线程执行中断调用 interrupt()
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);// 执行工作的前置 Action
                    Throwable thrown = null;
                    try {task.run();// 执行最终的 Runnable 工作 
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);/// 执行工作的后置 Action
                    }
                } finally {
                    task = null;
                    w.completedTasks++;//Worker 实现的工作 +1
                    w.unlock();// 开释锁}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);//Worker 执行完结后退出
        }
    }

RunWorker 办法是整个线程池运行工作的外围办法,线程会应用 While 循环 一直的从阻塞队里外面去获取工作,而后去执行工作,如果阻塞队列外面没有工作,这个时候
getTask() 办法就会阻塞线程,直至新工作的到来,所以咱们在做单元测试的时候,用到线程池,如果你不调用 Shutdown 办法,你的 debug 小红点就始终在运行,就是这个起因!

getTask

这个办法就是从阻塞队列中取获取工作

 private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);
            // 判断线程池的状态如果是 SHUTDOWN 并且队列为空 或者间接状态就是 null 就不会从阻塞队列中 取出工作 间接返回 null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            
            //timed 就是用来管制 获取阻塞队列中的工作 是否有等待时间,咱们设置的 keepAliveTime 值就会在这边用到,如果一个工作线程在期待工作超过了设置的值就会退出期待,回收线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))// 工作线程数减 1
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();// 获取工作
                if (r != null)
                    return r;
                timedOut = true;// 设置期待超时标记 应该在自旋中,下次判断会用到此值
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

咱们都晓得 当咱们调用 shutdown 的时候 线程池状态是 ShUTDOWN, 调用 shutdownnow 的时候线程状态是 Stop,那么这 2 种状态是怎么解决阻塞队列外面的工作的呢,看了上文咱们应该能找到答案,当状态是 stop 的时候,咱们获取队列中的工作是间接返回的 null 的也就是说队列中的工作不会在执行了,然而当状态是 shutdown 的时候 只有 队列为空的时候 才会返回 null,也就是队列不空 还是能够获取队列中的工作的,这种问题 在面试题中经常出现,如果要正在晓得答案,还是要通过从源码中去真正了解,光是被答案我置信你很快还是会遗记的!

submit

把握了 execute 办法 在看 submit 办法 其实就很简略了,submit 个别是用于增加 带返回值的工作,咱们看下 代码

 public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);// 将 Runnable 包装成 FutureTask 工作 去让线程执行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);// 将 Runnable 包装成 FutureTask 工作 去让线程执行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
    }

看到这边的代码,应该有点儿相熟的滋味,应该上篇文章聊 FutureTask 的时候 很多曾经将过了,包含 Runnable 和 Callable 怎么转换的,Future 是怎么获取返回值的?
不分明的小伙伴 能够去看下我之前的文章!https://blog.csdn.net/zxlp520…

下面三个构造函数,就是对应着 FutureTask 的构造函数,说白了就是咱们应用 execute 的时候都是用 FutureTask 去传入的,因为 FutureTask 也是实现了 Runable 接口的

执行流程图

最初 用一张流程图,来形容下一个工作从增加到运行完结,经验了哪些办法!

总结

ThreadPoolExecutor 尽管外面执行办法很多,然而你如果把握了常见的逻辑运算符,AQS,线程,FutureTask 等相干常识的根底前提下 去看源码,也不会那么的累。最初我画的流程图,就是一个工作在新增到线程池中执行的整个流程!

最初分享下最近看到的一段话:
什么是危机?

真正的危机,来源于在正确的工夫做不正确的事。没有在正确的工夫,为下一步做出积攒,这才是危机的本源。

如果你正在这条成长路上的敌人,晚醒不如早醒,这就是我想说的。千万别等到中年才发现自己没有建设好本人的护城河,这个时候才晓得致力。在本人致力的阶段,不仅不致力反了抉择了放纵本人,这才是危机的本源。

心愿大家会有所播种,不负时光,不负卿!

退出移动版