前言

在之前咱们曾经对局部JDK源码做了介绍:

啃碎JDK源码(一):String
啃碎JDK源码(二):Integer
啃碎JDK源码(三):ArrayList
啃碎JDK源码(四):HashMap
啃碎JDK源码(五):ConcurrentHashMap
啃碎JDK源码(六):LinkedList

明天咱们正式开始介绍juc包上面的类,也就是和多线程打交道的中央,和锁打交道的类用的比拟的的无非就是 ReentrantLockReentrantReadWriteLock 等,然而咱们明天要介绍的是 AbstractQueuedSynchronizer 这个抽象类,也就是面试中常常被问到的 AQS,因为不论是ReentrantLock 还是ReentrantReadWriteLock 以及其它的一些都是基于它实现的,所以很有必要先来理解一下。

注释

AQS的全称为(AbstractQueuedSynchronizer),咱们能够把它看成一个帮忙咱们实现锁的同步器,它基于FIFO(先进先出)的队列实现的,并且外部保护了一个状态变量 state,通过原子更新这个状态变量即能够实现加锁解锁操作。

来看下 AbstractQueuedSynchronizer 的继承构造

能看到 ReentrantLock等并不是间接继承 AbstractQueuedSynchronizer,而是其内部类 Sync

接着来看看一些重要的属性:

// 队列的头节点private transient volatile Node head;    // 队列的尾节点private transient volatile Node tail;// 管制加锁解锁的状态变量private volatile int state;

定义了一个状态变量和一个队列,状态变量用来管制加锁解锁,队列用来搁置期待的线程

这个 state变量很重要,用来做状态标识。比方说在 ReentrantLock 外面它示意获取锁的线程数,如果等于0示意还没有线程获取锁,1示意有线程获取了锁。大于1示意重入锁的数量

留神,这几个变量都要应用 volatile 关键字来润饰,因为是在多线程环境下操作,要保障它们的值批改之后对其它线程立刻可见。

还有咱们的 Node外部类

static final class Node {        // 标识一个节点是共享模式        static final Node SHARED = new Node();        // 标识一个节点是互斥模式        static final Node EXCLUSIVE = null;        // 标识线程已勾销        static final int CANCELLED =  1;        // 标识后继节点须要唤醒        static final int SIGNAL    = -1;               // 标识线程期待在一个条件上        static final int CONDITION = -2;        // 标识前面的共享锁须要无条件的流传        static final int PROPAGATE = -3;        // 以后节点保留的线程对应的期待状态        volatile int waitStatus;        // 上一个结点        volatile Node prev;        // 下一个结点        volatile Node next;        // 以后结点保留的线程        volatile Thread thread;        // 下一个期待在条件上的节点(Condition锁时应用)        Node nextWaiter;        // 是否是共享模式        final boolean isShared() {            return nextWaiter == SHARED;        }        // 获取前一个节点        final Node predecessor() throws NullPointerException {            Node p = prev;            if (p == null)                throw new NullPointerException();            else                return p;        }        Node() {            }        Node(Thread thread, Node mode) {            // 把共享模式还是互斥模式存储到nextWaiter这个字段外面了            this.nextWaiter = mode;            this.thread = thread;        }        Node(Thread thread, int waitStatus) {            // 期待的状态,在Condition中应用            this.waitStatus = waitStatus;            this.thread = thread;        }    }

下面是一个规范的双向链表构造,保留着以后线程、前一个节点、后一个节点以及线程的状态等信息。属性比拟多,看不懂没关系,前面用到会从新讲一下。

那么在源码外面是如何批改这些变量的呢?其实就是通过咱们之前说的 CAS 来批改,如果不理解的话请参考 一文看懂CAS

比方说 state 状态变量的批改

// 获取Unsafe类的实例private static final Unsafe unsafe = Unsafe.getUnsafe();// 状态变量state的偏移量private static final long stateOffset;static {    try {        stateOffset = unsafe.objectFieldOffset                  (AbstractQueuedSynchronizer.class.getDeclaredField("state"));   .......   } catch (Exception ex) { throw new Error(ex); }}

外围是 compareAndSetState 办法:

protected final boolean compareAndSetState(int expect, int update) {    // 如果以后值等于except,那么更新成update return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}

既然 AbstractQueuedSynchronizer 是一个抽象类,那么子类要实现哪些接口呢?

比如说用来加锁的 tryAcquire 办法

// 互斥模式下尝试获取锁protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}

能够看到只是抛出了异样,并且值得注意的是该办法并没有定义成形象办法,因为只有实现一部分办法就能够本人手动编写一个锁了,定义成 protect 也是不便子类去实现,除此之外还有

// 互斥模式下尝试开释锁protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException();}// 共享模式下尝试获取锁protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException();}// 共享模式下尝试开释锁protected int tryReleaseShared(int arg) {    throw new UnsupportedOperationException();}// 以后线程是否持有锁protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException();}

基于AQS手动实现一个锁

当初咱们尝试一下基于AQS手动实现一个锁:

/** 实现Lock接口*/public class MyLock implements Lock {    private final Sync sync;    public MyLock() {        sync = new Sync();    }    // 定义一个外部类Sync继承AbstractQueuedSynchronizer    private static class Sync extends AbstractQueuedSynchronizer {        // 尝试获取独占锁        @Override        protected boolean tryAcquire(int arg) {            // AQS办法:CAS更新state状态变量            if (compareAndSetState(0, 1)) {                // AQS办法:设置以后线程为持有锁的线程                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        // 尝试开释独占锁        @Override        protected boolean tryRelease(int arg) {            if (getState() == 0)                throw new IllegalMonitorStateException();            setExclusiveOwnerThread(null);            // AQS办法:以后线程已持有锁,能够间接批改state值,不须要通过CAS批改            setState(0);            return true;        }        // 锁是否已被开释        @Override        protected boolean isHeldExclusively() {            return getState() == 1;        }        Condition newCondition() {            return new ConditionObject();        }    }    @Override    public void lock() {        sync.acquire(1);    }    @Override    public void lockInterruptibly() throws InterruptedException {        sync.acquireInterruptibly(1);    }    @Override    public boolean tryLock() {        return sync.tryAcquire(1);    }    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return sync.tryAcquireNanos(1, unit.toNanos(time));    }    @Override    public void unlock() {        sync.release(1);    }    @Override    public Condition newCondition() {        return sync.newCondition();    }}

好了,咱们的锁曾经写完了,外围就是应用一个动态外部类Sync继承AQS,实现加锁、开释锁等办法,其实咱们相熟的 ReentrantLock也是这样的实现原理,当初咱们来测试一下:

public class TestLock {    private final Lock lock = new MyLock();    private volatile int count = 1;    private static class WorkThread extends Thread {        private final TestLock myLock;        public WorkThread(TestLock myLock) {            this.myLock = myLock;        }        @Override        public void run() {            myLock.execute();        }    }    public void execute() {        lock.lock();        try {            System.out.println(Thread.currentThread().getName() + "获取到的count=" + count++);        } finally {            lock.unlock();        }    }    public static void main(String[] args) {        TestLock myLock = new TestLock();        // 启动100个线程        for (int i = 1; i <= 100; i++) {            new WorkThread(myLock).start();        }    }}

咱们启动100个线程对 count 加一,看看打印后果:

能够看到最初确实是加到了100,也就是说咱们的锁是可用的。

然而,咱们当初的锁是不可重入锁,学过ReentrantLock的同学应该晓得它是可重入锁,也就是在线程持有锁的状况下能够从新取得锁,如果咱们改一下execute办法:

public void execute() {    lock.lock();    try {        System.out.println(Thread.currentThread().getName() + "获取到的count=" + count++);        if (count == 5) {            execute();        }    } finally {        lock.unlock();    }}

当 count == 5 时执行调用本身,看下执行后果:

能够看到线程被阻塞了,因为以后持有锁的线程不能从新获取锁,所以咱们须要对 tryAcquiretryRelease 办法进行革新:

        // 尝试获取独占锁        @Override        protected boolean tryAcquire(int arg) {            // 获取以后线程            Thread currentThread = Thread.currentThread();            int state = getState();            if (state == 0) {                // AQS办法:CAS更新state状态变量                if (compareAndSetState(0, 1)) {                    // AQS办法:设置以后线程为持有锁的线程                    setExclusiveOwnerThread(currentThread);                    return true;                } else if (currentThread == getExclusiveOwnerThread()) {                    // 因为是独占锁,所以同一时刻只能有一个线程能获取到锁,如果以后的锁是被以后线程获取过了,则将状态变量+1                    int newState = state + arg;                    // 设置新的状态变量                    setState(newState);                    return true;                }            }            return false;        }        // 尝试开释独占锁        @Override        protected boolean tryRelease(int arg) {            // 判断以后锁开释是以后线程锁独占的,如果判断不成立则抛出异样            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            int newState = getState() - arg;            boolean free = false;            if (newState == 0) {                // 如果状态为0了则阐明以后线程能够开释对锁的持有了                setExclusiveOwnerThread(null);                free = true;            }       // AQS办法:以后线程已持有锁,能够间接批改state值,不须要通过CAS批改            setState(newState);            return free;        }

tryAcquire 办法次要加了判断:如果 state 不为0的时候,判断以后线程是否曾经和锁绑定,曾经绑定的话则将 state+1 同时返回true

tryRelease办法中次要减少了开释锁的时候是对 state 变量逐次减一当减到0的时候才将锁与以后线程绑定的状态去除,开释锁。

从新运行下曾经不会阻塞了,如果不懂的中央看下正文就明确了。

AQS源码分析

那么当线程获取不到锁的时候是如何期待的呢?又是什么时候被唤醒的呢?接下来咱们一步步追随源码看看到底做了什么?

独占模式

AQS独占模式和共享模式,首先来看看独占模式,看下lock加锁办法:

@Overridepublic void lock() {    sync.acquire(1);}

这里能够看到并不是调用咱们重写的 tryAcquire 办法,而是调用父类 AbstractQueuedSynchronizer 的办法:

public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt(); }

此办法是独占模式下线程获取共享资源的顶层入口。如果获取锁胜利,线程间接返回,否则进入期待队列,直到获取锁为止,且整个过程疏忽中断的影响。

首先是调用 tryAcquire办法来获取尝试获取锁,跟过来看一下AQS的实现:

protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}

能够看到该办法定义为protect,也就是由咱们子类去实现的,如果获取锁失败的话那么会进入期待队列,来看看addWaiter办法:

/* * 将以后线程增加到期待队列的队尾,并返回以后线程所在的节点 */private Node addWaiter(Node mode) {    // 以独占模式把以后线程封装成一个Node节点    Node node = new Node(Thread.currentThread(), mode);    // 尝试将结点放到队列尾部    Node pred = tail;    if (pred != null) {        node.prev = pred;        // 应用CAS把node作为尾节点        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    // 尾节点为空或者利用CAS把node设为尾节点失败时通过enq办法进行入队    enq(node);    return node;}  /*   * 采纳for循环自旋的形式把node插入到队列中   */  private Node enq(final Node node) {        for (;;) {            Node t = tail;            // 队列为空,须要初始化            if (t == null) {                if (compareAndSetHead(new Node()))                    tail = head;            } else {                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }
  • addWaiter办法用于将以后线程增加到期待队列的队尾,并返回以后线程所在的节点。
  • enq 办法中采纳了十分经典的自旋操作,只有通过CAS把node设为尾节点后,以后线程能力退出该办法,否则的话,以后线程一直的尝试,直到能把节点增加到队列中为止。

持续看一下acquireQueued办法:

  /*   * 通过自旋获取锁   */  final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            // 默认线程没有被中断过            boolean interrupted = false;            for (;;) {                // 获取该节点的前驱节点                final Node p = node.predecessor();                // 如果前驱节点是头节点并且以后线程获取到锁                if (p == head && tryAcquire(arg)) {                    // 设置以后节点为头节点                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                // 挂起以后线程                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                // 移除以后节点                cancelAcquire(node);        }    }

该办法比较复杂,咱们来仔细分析一下:

  • 首先获取前一个节点,如果前驱节点是头节点的话则尝试获取锁,如果获取锁胜利的话设置以后节点为头节点
  • 否则调用 shouldParkAfterFailedAcquire 办法判断是否须要挂起以后线程

shouldParkAfterFailedAcquire办法从名字也能看进去是当获取锁失败后用来判断是否须要挂起以后线程,实现性能简略的讲就是把以后node节点的无效前驱(无效是指waitStatus不是CANCELLED的)找到,并且将无效前驱的状态设置为SIGNAL,之后便返回true代表马上能够阻塞了。来看看实现代码:

    /*     * 判断是否须要挂起以后线程     */    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)            /*             * 前驱节点曾经设置了SIGNAL,如果前驱变成了head,并且head的代表线程             * exclusiveOwnerThread开释了锁,就会来依据这个SIGNAL来唤醒本人             */            return true;        if (ws > 0) {            /*             * 发现传入的前驱的状态大于0,即CANCELLED。阐明前驱节点曾经因为超时或响应了中断,而勾销了本人。             * 所以须要向前遍历直到找到一个<=0的节点             */            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            /*             * 如果是其余状况,那么CAS尝试设置前驱节点为SIGNAL             */            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }    

shouldParkAfterFailedAcquire返回true的状况下,持续看parkAndCheckInterrupted办法

  private final boolean parkAndCheckInterrupt() {        LockSupport.park(this);        return Thread.interrupted();    }

调用LockSupport的 park 办法挂起以后线程,返回该线程是否被中断过,如果被中断过,间接设置 interrupted = true
(LockSupport类是Java6引入的一个类,提供了根本的线程同步原语,有趣味的小伙伴能够去理解一下)

最初来看下 cancelAcquire办法:

  /**   * 勾销以后节点   */  private void cancelAcquire(Node node) {        // Ignore if node doesn't exist        if (node == null)            return;        // 把以后节点的线程设为 null        node.thread = null;        // 和后面一样向前遍历找到无效的前驱节点        Node pred = node.prev;        while (pred.waitStatus > 0)            node.prev = pred = pred.prev;           Node predNext = pred.next;        // 把node节点的ws设为CANCELLED        node.waitStatus = Node.CANCELLED;        // 如果node是尾节点,利用CAS把前驱节点设为尾节点,后继节点为null不便GC        if (node == tail && compareAndSetTail(node, pred)) {            compareAndSetNext(pred, predNext, null);        } else {            // 前驱节点不是头结点 && 线程不为空 && waitStatus为singal            // 利用CAS把node的next设为pred的next节点            int ws;            if (pred != head &&                ((ws = pred.waitStatus) == Node.SIGNAL ||                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&                pred.thread != null) {                Node next = node.next;                if (next != null && next.waitStatus <= 0)                    compareAndSetNext(pred, predNext, next);            } else {                // node是头结点,唤起它的后继节点                unparkSuccessor(node);            }            node.next = node; // help GC        }    }

最初如果acquireQueued办法返回true,须要进行自我中断。因为parkAndCheckInterrupted办法不响应中断,并且外部调用了Thread.interrupted办法革除中断标记位。所以当该办法返回true(被中断)时,须要手动弥补中断标记位。

static void selfInterrupt() {     Thread.currentThread().interrupt();}

流程总结

  • tryAcquire()尝试间接去获取锁,如果胜利则间接返回;
  • addWaiter()将该线程退出期待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在队列中期待直到获取锁。如果在整个期待过程中被中断过,则返回true,否则返回false。
  • 如果线程在期待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
独占式锁获取流程

调用同步器的acquire(int arg)办法能够获取同步状态,该办法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。获取流程:

  1. 以后线程通过tryAcquire()办法尝试获取锁,胜利则间接返回,失败则进入队列排队期待,通过CAS获取同步状态。
  2. 如果尝试获取锁失败的话,结构同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)办法,将节点退出到同步队列的队列尾部。
  3. 最初调用acquireQueued(final Node node, int args)办法,使该节点以死循环的形式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued办法以后线程在死循环中获取同步状态,而只有前驱节点是头节点的时候能力尝试获取锁(同步状态)( p == head && tryAcquire(arg))。

下面看完了加锁的流程,接下来看看是如何开释锁的?

   public final boolean release(int arg) {        // tryRelease由子类实现        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                // 唤醒后继节点                unparkSuccessor(h);            return true;        }        return false;    }

代码比较简单,tryRelease 和下面一样也是有事子类去实现,如果开释锁胜利的话那么咱们须要调用 unparkSuccessor 办法去唤醒后继节点,看下具体实现:

  private void unparkSuccessor(Node node) {        int ws = node.waitStatus;        if (ws < 0)            // 将以后节点的状态批改为0            compareAndSetWaitStatus(node, ws, 0);        // 如果间接后继为空或者它的waitStatus大于0(曾经放弃获取锁了),咱们就遍历整个队列,获取第一个须要唤醒的节点        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        if (s != null)            // 唤醒节点            LockSupport.unpark(s.thread);    }

代码比较简单,这里就不开展细说了。

共享模式

下面讲完了独占模式,当初来讲下共享模式,所谓共享模式就是同一个时刻容许多个线程持有锁,比方说 ReentrantReadWriteLock 就是实现了共享模式的AQS,间接上代码:

  public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);   }      /*    * 共享模式获取锁    */    private void doAcquireShared(int arg) {        // 退出队列尾部        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                // head是曾经拿到锁的节点                if (p == head) {                    // 尝试获取锁,返回的r是残余的资源数量,如果大于0那么须要唤醒后续节点                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        // 将head指向本人,还有残余资源能够再唤醒之后的线程                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

这个办法大抵上看起来和独占模式是很相像的。区别只在于独占模式下,在本办法中获取到资源后,只是将本节点设置为head节点。而共享模式下,设置完head节点后,还须要判断是否须要唤醒多个线程,看一下如何唤醒线程:

private void setHeadAndPropagate(Node node, int propagate) {        Node h = head; // Record old head for check below        setHead(node);        /*         * 什么状况下要唤醒后继结点?         * 1.资源残余数大于0,有残余资源必定是要唤醒后继结点的         * 2.头结点不存在。         * 3.头结点状态小于0,意味着后继节点要求node(也就是以后head)唤醒后继结点         */        if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            if (s == null || s.isShared())                doReleaseShared();        }    }

doReleaseShared办法外面才是真正来唤醒线程:

private void doReleaseShared() {            for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    // 因为该办法可能被并发调用,为了防止不必要的唤醒节约,因为通过cas来抢占唤醒权力。                    // 抢占成功者执行真正的后继结点唤醒工作。如果失败则进行重试                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    unparkSuccessor(h);                }                // 如果ws==0,可能是head结点曾经没有后继结点,也有可能是因为head节点的后继结点唤醒权被其余线程刚抢占胜利。                // 如果没有后继结点,显然不须要做任何事件                // 如果是唤醒权被其余线程抢占,则不须要做任何事件。因为唤醒是在队列上进行流传的。所以这里就cas将head节点的状态值批改为 PROPAGATE。用来表白该线程唤醒操作用意曾经传播。然而会由别的线程真正的执行后续的唤醒动作。同样,如果失败了,则重试。                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }

加锁流程

tryAcquireShared办法也是由子类去实现,然而AQS曾经把其返回值的语义定义好了:负值代表获取失败;0代表获取胜利,但没有残余资源;负数示意获取胜利,还有残余资源,其余线程还能够去获取。所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,胜利则间接返回;
  2. 失败则通过doAcquireShared()进入期待队列park(),直到被unpark()/interrupt()并胜利获取到资源才返回。整个期待过程也是疏忽中断的。
  3. doAcquireShared(int)此办法用于将以后线程退出期待队列尾部劳动,直到其余线程开释锁唤醒本人,本人胜利拿到相应的资源后才返回。

看完加锁的办法,当初来看共享模式下的开释锁办法:

  public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }

能够看到解锁也是用的doReleaseShared办法,代码比较简单这里不再开展细说。

总结

明天无关AQS的知识点就介绍到这里,有什么不对的中央请多多指教!