线程池调用execute提交工作—>创立Worker(设置属性thead、firstTask)—>worker.thread.start()—>实际上调用的是worker.run()—>线程池的runWorker(worker)—>worker.firstTask.run();
runWork办法 外围(线程复用)
final void runWorker(Worker w) { //获取以后线程 Thread wt = Thread.currentThread(); //获取w的firstTask Runnable task = w.firstTask; //设置w的firstTask为null w.firstTask = null; // 开释锁,设置AQS的state为0,容许中断 w.unlock(); //用于标识线程是否异样终止,finally中processWorkerExit()办法会有不同逻辑 boolean completedAbruptly = true; try { //循环调用getTask()获取工作,一直从工作缓存队列获取工作并执行 while (task != null || (task = getTask()) != null) { //进入循环外部,代表曾经获取到可执行的工作,则对worker对象加锁,保障线程在执行工作过程中不会被中断 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || //若线程池状态大于等于STOP,那么意味着该线程要中断 (Thread.interrupted() && //线程被中断 runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池外部状态变动而被中断 !wt.isInterrupted()) //确保该线程未被中断 //收回中断请求 wt.interrupt(); try { //开始执行工作前的Hook办法 beforeExecute(wt, task); Throwable thrown = null; try { //到这里正式开始执行工作 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行工作后的Hook办法 afterExecute(task, thrown); } } finally { //置空task,筹备通过getTask()获取下一个工作 task = null; //completedTasks递增 w.completedTasks++; //开释掉worker持有的独占锁 w.unlock(); } } completedAbruptly = false; } finally { //到这里,线程执行完结,须要执行完结线程的一些清理工作 //线程执行完结可能有两种状况: //1.getTask()返回null,也就是说,这个worker的使命完结了,线程执行完结 //2.工作执行过程中产生了异样 //第一种状况,getTask()返回null,那么getTask()中会将workerCount递加 //第二种状况,workerCount没有进行解决,这个递加操作会在processWorkerExit()中解决 processWorkerExit(w, completedAbruptly); }}
循环取工作 geTask()
private Runnable getTask() { //标识以后线程是否超时未能获取到task对象 boolean timedOut = false; for (;;) { //获取线程池的管制状态 int c = ctl.get(); //获取线程池的运行状态 int rs = runStateOf(c); //如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递加,办法返回null,回收线程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //获取worker数量 int wc = workerCountOf(c); //标识以后线程在闲暇时,是否应该超时回收 // 如果allowCoreThreadTimeOut为ture,或以后线程数大于外围池大小,则须要超时回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize(),导致worker数量大于maximumPoolSize) if ((wc > maximumPoolSize || (timed && timedOut)) //或者获取工作超时 && (wc > 1 || workQueue.isEmpty())) { //workerCount大于1或者阻塞队列为空(在阻塞队列不为空时,须要保障至多有一个工作线程) if (compareAndDecrementWorkerCount(c)) //线程池工作线程数量递加,办法返回null,回收线程 return null; //线程池工作线程数量递加失败,跳过残余局部,持续循环 continue; } try { //如果容许超时回收,则调用阻塞队列的poll(),只在keepAliveTime工夫内期待获取工作,一旦超过则返回null //否则调用take(),如果队列为空,线程进入阻塞状态,有限时期待工作,直到队列中有可取工作或者响应中断信号退出 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //若task不为null,则返回胜利获取的task对象 if (r != null) return r; // 若返回task为null,示意线程闲暇工夫超时,则设置timeOut为true timedOut = true; } catch (InterruptedException retry) { //如果此worker产生了中断,采取的计划是重试,没有超时 //在哪些状况下会产生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow() timedOut = false; } }}
ps:线程池工作线程执行工作产生异样如何解决?
processWorkerExit办法
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 失常的话再runWorker的getTask办法workerCount曾经被减一了 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 累加线程的completedTasks completedTaskCount += w.completedTasks; // 从线程池中移除超时或者出现异常的线程 workers.remove(w); } finally { mainLock.unlock(); } // 尝试进行线程池 tryTerminate(); int c = ctl.get(); // runState为RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { // 线程不是异样完结 if (!completedAbruptly) { // 线程池最小闲暇数,容许core thread超时就是0,否则就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0然而队列不为空要保障有1个线程来执行队列中的工作 if (min == 0 && !workQueue.isEmpty()) min = 1; // 线程池还不为空那就不必放心了 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.线程异样退出 // 2.线程池为空,然而队列中还有工作没执行,看addWoker办法对这种状况的解决 addWorker(null, false); }}