关于高并发:高并发通过源码深度解析ThreadPoolExecutor类是如何保证线程池正确运行的

39次阅读

共计 5894 个字符,预计需要花费 15 分钟才能阅读完成。

大家好,我是冰河~~

对于线程池的外围类 ThreadPoolExecutor 来说,有哪些重要的属性和外部类为线程池的正确运行提供重要的保障呢?

ThreadPoolExecutor 类中的重要属性

在 ThreadPoolExecutor 类中,存在几个十分重要的属性和办法,接下来,咱们就介绍下这些重要的属性和办法。

ctl 相干的属性

AtomicInteger 类型的常量 ctl 是贯通线程池整个生命周期的重要属性,它是一个原子类对象,次要用来保留线程的数量和线程池的状态,咱们看下与这个属性相干的代码如下所示。

// 次要用来保留线程数量和线程池的状态,高 3 位保留线程状态,低 29 位保留线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池中线程的数量的位数(32-3)private static final int COUNT_BITS = Integer.SIZE - 3;
// 示意线程池中的最大线程数量
// 将数字 1 的二进制值向右移 29 位,再减去 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 线程池的运行状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// 获取线程状态
private static int runStateOf(int c)     {return c & ~CAPACITY;}
// 获取线程数量
private static int workerCountOf(int c)  {return c & CAPACITY;}
private static int ctlOf(int rs, int wc) {return rs | wc;}
private static boolean runStateLessThan(int c, int s) {return c < s;}
private static boolean runStateAtLeast(int c, int s) {return c >= s;}
private static boolean isRunning(int c) {return c < SHUTDOWN;}
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

对于线程池的各状态阐明如下所示。

  • RUNNING: 运行状态,能接管新提交的工作,并且也能解决阻塞队列中的工作
  • SHUTDOWN: 敞开状态,不能再接管新提交的工作,然而能够解决阻塞队列中曾经保留的工作,当线程池处于 RUNNING 状态时,调用 shutdown() 办法会使线程池进入该状态
  • STOP: 不能接管新工作,也不能解决阻塞队列中曾经保留的工作,会中断正在解决工作的线程,如果线程池处于 RUNNING 或 SHUTDOWN 状态,调用 shutdownNow() 办法,会使线程池进入该状态
  • TIDYING: 如果所有的工作都曾经终止,无效线程数为 0(阻塞队列为空,线程池中的工作线程数量为 0),线程池就会进入该状态。
  • TERMINATED: 处于 TIDYING 状态的线程池调用 terminated () 办法,会应用线程池进入该状态

也能够依照 ThreadPoolExecutor 类的正文,将线程池的各状态之间的转化总结成如下图所示。

  • RUNNING -> SHUTDOWN:显式调用 shutdown() 办法, 或者隐式调用了 finalize() 办法
  • (RUNNING or SHUTDOWN) -> STOP:显式调用 shutdownNow() 办法
  • SHUTDOWN -> TIDYING:当线程池和工作队列都为空的时候
  • STOP -> TIDYING:当线程池为空的时候
  • TIDYING -> TERMINATED:当 terminated() hook 办法执行实现时候

其余重要属性

除了 ctl 相干的属性外,ThreadPoolExecutor 类中其余一些重要的属性如下所示。

// 用于寄存工作的阻塞队列  
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 寄存线程池中线程的汇合,拜访这个汇合时,必须取得 mainLock 锁
private final HashSet<Worker> workers = new HashSet<Worker>();
// 在锁外部阻塞期待条件实现
private final Condition termination = mainLock.newCondition();
// 线程工厂,以此来创立新线程
private volatile ThreadFactory threadFactory;
// 回绝策略
private volatile RejectedExecutionHandler handler;
// 默认的回绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

ThreadPoolExecutor 类中的重要外部类

在 ThreadPoolExecutor 类中存在对于线程池的执行至关重要的外部类,Worker 外部类和回绝策略外部类。接下来,咱们别离看这些外部类。

Worker 外部类

Worker 类从源代码上来看,实现了 Runnable 接口,阐明其本质上是一个用来执行工作的线程,接下来,咱们看下 Worker 类的源代码,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    private static final long serialVersionUID = 6138294804551838833L;
    // 真正执行工作的线程
    final Thread thread;
    // 第一个 Runnable 工作,如果在创立线程时指定了须要执行的第一个工作
    // 则第一个工作会寄存在此变量中,此变量也能够为 null
    // 如果为 null,则线程启动后,通过 getTask 办法到 BlockingQueue 队列中获取工作
    Runnable firstTask;
    // 用于寄存此线程齐全的工作数,留神:应用了 volatile 关键字
    volatile long completedTasks;
    
    //Worker 类惟一的结构放大,传递的 firstTask 能够为 null
    Worker(Runnable firstTask) {
        // 避免在调用 runWorker 之前被中断
        setState(-1);
        this.firstTask = firstTask;
        // 应用 ThreadFactory 来创立一个新的执行工作的线程
        this.thread = getThreadFactory().newThread(this);
    }
    // 调用内部 ThreadPoolExecutor 类的 runWorker 办法执行工作
    public void run() {runWorker(this);
    }

    // 是否获取到锁 
    //state= 0 示意锁未被获取
    //state= 1 示意锁被获取
    protected boolean isHeldExclusively() {return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {t.interrupt();
            } catch (SecurityException ignore) {}}
    }
}

在 Worker 类的构造方法中,能够看出,首先将同步状态 state 设置为 -1,设置为 - 1 是为了避免 runWorker 办法运行之前被中断。这是因为如果其余线程调用线程池的 shutdownNow() 办法时,如果 Worker 类中的 state 状态的值大于 0,则会中断线程,如果 state 状态的值为 -1,则不会中断线程。

Worker 类实现了 Runnable 接口,须要重写 run 办法,而 Worker 的 run 办法实质上调用的是 ThreadPoolExecutor 类的 runWorker 办法,在 runWorker 办法中,会首先调用 unlock 办法,该办法会将 state 置为 0,所以这个时候调用 shutDownNow 办法就会中断以后线程,而这个时候曾经进入了 runWork 办法,就不会在还没有执行 runWorker 办法的时候就中断线程。

留神:大家须要重点了解 Worker 类的实现。

回绝策略外部类

在线程池中,如果 workQueue 阻塞队列满了,并且没有闲暇的线程池,此时,持续提交工作,须要采取一种策略来解决这个工作。而线程池总共提供了四种策略,如下所示。

  • 间接抛出异样,这也是默认的策略。实现类为 AbortPolicy。
  • 用调用者所在的线程来执行工作。实现类为 CallerRunsPolicy。
  • 抛弃队列中最靠前的工作并执行当前任务。实现类为 DiscardOldestPolicy。
  • 间接抛弃当前任务。实现类为 DiscardPolicy。

在 ThreadPoolExecutor 类中提供了 4 个外部类来默认实现对应的策略,如下所示。

public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();
        }
    }
}

public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task" + r.toString() + "rejected from" + e.toString());
    }
}

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }


    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();
            e.execute(r);
        }
    }
}

咱们也能够通过实现 RejectedExecutionHandler 接口,并重写 RejectedExecutionHandler 接口的 rejectedExecution 办法来自定义回绝策略,在创立线程池时,调用 ThreadPoolExecutor 的构造方法,传入咱们本人写的回绝策略。

例如,自定义的回绝策略如下所示。

public class CustomPolicy implements RejectedExecutionHandler {public CustomPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {System.out.println("应用调用者所在的线程来执行工作")
            r.run();}
    }
}

应用自定义回绝策略创立线程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
               new CustomPolicy());

明天就到这儿吧,我是冰河,咱们下期见~~

正文完
 0