本文的副标题为由CountDownLatch到AQS
前言
我日常用到CountDownLatch的场景还是比拟多,定时工作应用多线程去采集数据,而后采集完数据之后,做进一步的解决。个别我在程序外面会如下写:
public class CountDownLatchDemo { // 防止指令重排序 private volatile ThreadPoolExecutor threadPoolExecutor; public void countDownLatchDemo(){ // 一共五十个工作 CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { getThreadPoolExecutor().execute(()->{ // 线程做完对应的工作,工作数减一 countDownLatch.countDown(); }); } try { // main 线程走到这里,如果50个工作没实现,就会被阻塞在这这里。 // hello world 不会输入 countDownLatch.await(); System.out.println("hello world"); } catch (InterruptedException e) { e.printStackTrace(); } // 接着做对应的业务解决 } private ThreadPoolExecutor getThreadPoolExecutor() { // 获取以后零碎的逻辑外围数,我的是八核十六线程。 int cpuNumbers = Runtime.getRuntime().availableProcessors(); if (Objects.isNull(threadPoolExecutor)) { synchronized (CountDownLatchDemo.class) { // 为线程池起名,便于定位问题 // CustomizableThreadFactory 来自于Spring ThreadFactory threadFactory = new CustomizableThreadFactory("fetch-data-pool-"); threadPoolExecutor = new ThreadPoolExecutor(cpuNumbers, cpuNumbers, 1L, TimeUnit.HOURS, new LinkedBlockingDeque<>(), threadFactory); return threadPoolExecutor; } } return threadPoolExecutor; } public static void main(String[] args) { CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo(); countDownLatchDemo.countDownLatchDemo(); }}
所以不禁好奇CountDownLatch是如何实现的, 这也就是本篇的由来,我次要关怀的问题次要有两个:
- 调用await办法的线程在工作实现之后,如何唤醒。
- 到底是如何管制工作实现的,也就是countDown办法做了些什么
本篇的源码基于JDK8(高版本的JDK对AbstractQueuedSynchronizer有些调整,前面也会针对JDK 17的AbstractQueuedSynchronizer出一篇源码相干的文章,重点比照高版本JDK的优化). 咱们首先关上CountDownLatch进行查看:
咱们看到调用CountDownLatch的构造函数初始化的时候,事实上间接的对CountDownLatch的动态外部类Sync进行了初始化。咱们调CountDownLatch的countDown办法也是间接的调用Sync的releaseShared()办法. 这两个办法在Sync外面没有,那阐明继承自AbstractQueuedSynchronizer里。这个类的作者依然是Doug Lea,正文比拟多,我还是喜爱正文多的源码。之前的看源码,我先看下面的正文,再看类的根本构造。这次咱们调整下程序,先大抵看一下这个类的根本构造再看类下面的正文。
关上AbstractQueuedSynchronizer往下翻,咱们首先看到的一个一眼能看懂的数据结构恐怕就是Node类了,Node是双向链表的节点, 仿佛每一个节点次要数据是线程。
剩下的如同都围绕这个链表进行操作,本节咱们关怀的问题在于CountDownLatch的实现原理,而后才是AbstractQueuedSynchronizer。咱们重点关注CountDownLatch中countDown和await的实现。
countDown办法的调用如下:
// 这个sync事实上是CountDownLatch中动态外部类Sync的实例public void countDown() { //releaseShared调用的是父类AbstractQueuedSynchronizer的releaseShared办法 sync.releaseShared(1); }// tryReleaseShared为CountDownLatch中动态外部类Sync重写父类的办法public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 缩小count for (;;) { // 咱们看getState和setState办法,getState和setState是父类的办法 // 获取state int c = getState(); // 如果等于0 返回false if (c == 0) return false; // 否则就将state 变量 减一 int nextc = c - 1; // compareAndSetState调用的是AbstractQueuedSynchronizer的办法 // 默认调用的是CAS,给AQS的state变量设置值。 if (compareAndSetState(c, nextc)) return nextc == 0; } } //上面是AbstractQueuedSynchronizer的compareAndSetState// 这里的stateOffset用能够了解为字段state的偏移地址,通过this+stateOffset能够定位到state变量的内存地址 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
捋一下countDown办法到底做了什么
咱们应用CountDownLatch的时候是首先调用其构造函数进行初始化, 这会调用CountDownLatch的动态外部类Sync进行初始化, 传进来的工作数最终给了继承来自AbstractQueuedSynchronizerd的setState办法。
当线程调用countDown办法实现工作的时候,调用链如下:
留神tryReleaseShared尽管来自于AbstractQueued,但这个办法是空办法,留给子类重写,所以咱们又须要回到CountDownLatch的Sync中。
tryReleaseShared代表以后实现工作,须要将总的工作数减一,如果工作数曾经清0, 接着调用doReleaseShared办法。doReleaseShared办法须要配合CountDownLatch的await办法来看。到当初咱们曾经失去了第一个问题的答案: 划分的工作数通过CAS更新工作数,事实上在AQS外面咱们能够称之为同步状态。
await办法浅析
咱们调用CountDownLatch办法的await办法事实上调用的是CountDownLatch外部类的Sync的acquireSharedInterruptibly, 这个acquireSharedInterruptibly来自AbstractQueuedSynchronizer,tryAcquireShared在AbstractQueuedSynchronizer是一个空办法,留给子类重写,在CountDownLatch的实现是,如果state == 0,返回 1,咱们重点看工作未实现的时候的doAcquireSharedInterruptibly办法,doAcquireSharedInterruptibly中又首先调用了addWaiter,所以咱们接着看addWaiter办法,addWaiter,从办法名上来推上,这是增加一个期待者。
/*** 为以后线程创立一个排队结点,并且给一个模式* Creates and enqueues node for current thread and given mode.* mode 有两种类型, 一种是独占模式,一种是共享模式。有没有想到独占锁和共享锁呢* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) { // 以后结点的下一个结点指向传入的结点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 这个操作相当于设置尾结点 // 首次增加为尾结点为null if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 走到这里阐明 以后队列是空队列,间接增加结点 enq(node); return node;}/* * 将以后结点插入到队列中,执行初始化。下面有图。忽然感觉这里挺有意思的。* 上面用图片演示一下这个过程* Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor // 返回插入结点的前结点*/private Node enq(final Node node) { for (;;) { // 取得tail的利用 Node t = tail; // 如果tail 为空 if (t == null) { // Must initialize // 设置头结点,此时头结点又是尾结点 if (compareAndSetHead(new Node())) tail = head; } else { // 如果尾结点非空代表此时队列非空,尾结点成为该结点的前驱结点 node.prev = t; // 将以后结点设置尾结点 if (compareAndSetTail(t, node)) { // 将尾结点的后继结点指向插入结点 t.next = node; return t; } } } }
独占锁如果你感到生疏的话,咱们换一个译名排他锁, 是指一个锁一次只能被一个线程所持有。如果线程A对数据A加上排他锁的话,其余线程不能再对A加任何类型的锁,取得排他锁的线程既能读数据又能批改数据,JDK中的synchronized和Lock的实现类就是排他锁, 所以ReentrantLock也借助AbstractQueuedSynronizer来实现。共享锁是指该锁能够被多个线程所持有,如果线程T对线程数据A加上共享锁后,其余线程只能对A再加共享锁,不能加排它锁。取得共享锁的线程只能读数据,不能批改数据。
ReentranLock有一个外部类叫Sync,Sync有两个子类:FairSync,NonfairSync. Fair是偏心的意思,如果你对ReentranLock比拟相熟的话, 会联想到ReetranLock的偏心锁与非偏心锁。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 创立一个结点,并返回该结点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取前驱结点 final Node p = node.predecessor(); // 如果是头结点 if (p == head) { // 获取state状态 // 大于零工作实现 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
setHeadAndPropagate、shouldParkAfterFailedAcquire、parkAndCheckInterrupt这三个办法互有分割咱们独自拎进去说,依据办法见名知义准则,咱们来认单词, Propagate有扩散的意思也就是,咱们讲CountDownLatch是线程合作利器,咱们在下面讲的场景上某个线程须要启用多线程并发解决数据,后续线程外面的解决逻辑须要并发解决数据结束之后,才进行下一阶段的解决,那这里咱们能够揣测调用CountDownLatch办法的await办法时因为工作没实现而陷入阻塞的线程,就是由这个办法进行唤醒的,目前队列中只有一个线程,属于比较简单的情况。当初这种状况时一个线程等多个线程,对外体现的模式是只有一个线程调用await办法。CountDownLatch的另一种应用场景是多等一,即多个线程调用await办法,一个线程调用countDown实现工作,要将处于期待状态的线程唤醒。
留神下面是一个有限循环,也就是线程调用CountDownLatch的await办法的时候并没有来自Object的wait、Condition队列来让以后线程陷入期待,而是一直的去获取state变量。Park有停车的意思,所以shouldParkAfterFailedAcquire是否是在获取状态失败之后,该当先劳动一下? shouldParkAfterFailedAcquire有波及到结点状态的利用, 咱们先来介绍状态,再来看shouldParkAfterFailedAcquire的源码.
waitStatus是一个整型变量,一共有以下几个候选值:
- 0 Node被初始化的时候默认值
- CANCELLED = 1,
参考资料中的美团技术团队的文章《从ReentrantLock的实现看AQS的原理及利用》将该状态解释为:
示意线程获取锁的申请曾经勾销了。 我看了感觉有点了解不动, 这篇文章是从ReentrantLock动手的,然而我并不了解为什么获取锁的申请会被勾销。
This node is cancelled due to timeout or interrupt. Nodes never leave this state. In particular, a thread with cancelled node never again blocks.
因为超时或者该线程被打断,这个结点被勾销,这个状态将会被固定,处于勾销状态的线程结点不会再被阻塞。
- SIGNAL = -1
美团技术团队《从ReentrantLock的实现看AQS的原理及利用》将该状态解释为: 示意以后线程曾经筹备好了, 就等资源开释了。
《java并发编程系列:牛逼的AQS(上)》(文末放的有参考链接):示意后边的节点对应的线程处于期待状态。
但两种解释是互补的,联合起来了解就是: 示意以后线程曾经筹备好了,就等资源开释了。同时后边的节点对应的线程处于期待状态。
这里将对于这个结点的相干正文列以下:
waitStatus value to indicate successor's thread needs unparking
后继结点线程须要解除期待状态。(Parking 对应的中文译为:泊车, 这里是将线程当作一辆车? 队列是个停车场? )
The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels. To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then,on failure, block.
以后结点的后继结点处于阻塞或者将要被阻塞(凭借park,难以翻译,只好中英混)。所以以后结点开释或者被勾销肯定要解除后继结点的阻塞状态. 为了防止竞争,acquire的相干办法肯定是先从signal判断,再是原子重试获取,而后失败、阻塞。
- CONDITION = -2
示意节点在期待队列中,节点线程期待唤醒。
- PROPAGATE = -3
以后线程处在SHARED状况下,该字段才会应用,下一次的同步状态将会被无条件的流传上来。(这句话咱们前面会讲,以后咱们的指标是从CountDownLatch到AQS,来取得对AQS有一个宏观的了解)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱结点的状态 int ws = pred.waitStatus; // 前继结点处于唤醒,以后结点能够被平安的被阻塞 if (ws == Node.SIGNAL) return true; // 通过看正文能够看明确 ws > 0,阐明以后结点pred处于勾销状态。 // 将该结点pred从队列外面移除,接着向前遍历 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将pred,也就是以后结点的前一个结点设置为-1. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { // 用的是UnSafe,所以这里咱们临时不必关怀它的实现原理,只须要晓得线程调用这个办法 // 相当于申请操作系统阻塞线程 LockSupport.park(this); // 如果以后线程曾经被中断,且满足shouldParkAfterFailedAcquire // 抛出异样。中断了就没有被阻塞这个概念。 return Thread.interrupted();}
这里咱们曾经大抵明确了,线程在调用CountDownLatch办法的await办法,事实上会进入到AQS的队列中,如果以后线程的前驱结点处于唤醒状态,那么就先将该线程阻塞一段时间。既然这里线程陷入了阻塞,那么工作实现的时候是如何告诉这里的线程的呢? 咱们接着看doReleaseShared办法:
private void doReleaseShared() { // 有限循环 for (;;) { // 获取头结点 Node h = head; if (h != null && h != tail) { // 获取头结点的状态 int ws = h.waitStatus; // 如果以后结点处于唤醒状态 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒其后继结点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
CountDownLatch的思路
当初咱们能够捋一捋CountDownLatch的思路了,CountDownLatch借助于AbstractQueuedSynchronizer来实现, AbstractQueuedSynchronizer外面保护了一个state变量,调用countDown办法的时候调用的是CountDownLatch外部类的tryReleaseShared办法来对state进行递加。调用CountDownLatch事实是间接调用AQS的acquireSharedInterruptibly办法,acquireSharedInterruptibly办法会结构出一个队列,如果工作没实现,会调用LockSupport来让调用await办法的线程陷入阻塞。工作实现的时候,最终通过doReleaseShared,唤醒陷入阻塞的线程。
由CountDownLatch到AQS
下面咱们曾经根本引出了AQS,然而并没有很深刻,有些办法咱们没有细讲,只是当作一个黑盒,在探索CountDownLatch的过程中,咱们发现目前JDK工具外面的大多数线程同步工具类,像ReentrantLock、ReentrantReadWriteLock、Semaphore、TheadPoolExecutor外面都有AbstractQueuedSynchronizer的呈现这是一个弱小的线程同步框架,本篇的思路是由CountDownLatch到AQS,从了解CountDownLatch的设计原理,而后再简略的引出AQS,为前面整体介绍AbstractQueuedSynchronizer做铺垫,重点领会思维。
参考资料
- Java魔法类:Unsafe利用解析 https://tech.meituan.com/2019...
- CountDownLatch源码解析 https://qingtianblog.com/arch...
- 大白话聊聊Java并发面试问题之谈谈你对AQS的了解?【石杉的架构笔记】 https://juejin.cn/post/684490...
- 不可不说的Java“锁”事 https://tech.meituan.com/2018...
- java并发编程系列:牛逼的AQS(上) https://juejin.cn/post/684490...
- 从ReentrantLock的实现看AQS的原理及利用 https://tech.meituan.com/2019... 美团技术团队