靓仔靓女们好,咱们又见面了,我是公众号:java小杰要加油,现就职于京东,致力于分享java相干常识,包含但不限于并发、多线程、锁、mysql以及京东面试真题

AQS介绍

  • AQS全称是AbstractQueuedSynchronizer,是一个形象队列同步器,JUC并发包中的大部分的并发工具类,都是基于AQS实现的,所以了解了AQS就算是四舍五入把握了JUC了(好一个四舍五入学习法)那么AQS到底有什么神奇之处呢?有什么特点呢?让咱们明天就来拔光它,一探到底!

    • state:代表被抢占的锁的状态
    • 队列:没有抢到锁的线程会包装成一个node节点寄存到一个双向链表中

AQS大略长这样,如图所示:

你说我轻易画的,我可不是轻易画的啊,我是有bear而来,来看下AQS根本属性的代码

那么这个Node节点又蕴含什么呢?来吧,展现。

那么咱们就能够把这个队列变的更具体一点

怎么忽然进去个exclusiveOwnerThread?还是保留以后取得锁的线程,哪里来的呢
还记得咱们AQS一开始继承了一个类吗

这个exclusiveOwnerThread就是它外面的属性

再次回顾总结一下,AQS属性如下:

  1. state:代表被抢占的锁的状态
  2. exclusiveOwnerThread:以后取得锁的线程
  3. 队列:没有抢到锁的线程会包装成一个node节点寄存到一个双向链表中

    • Node节点 :

       * thread:  以后node节点包装的线程 * waitStatus:以后节点的状态 * pre: 以后节点的前驱节点 * next: 以后节点的后继节点 * nextWaiter:示意以后节点对锁的模式,独占锁的话就是null,共享锁为Node() 

好了,咱们对AQS大略是什么货色什么构造长什么样子有了个分明的认知,上面咱们间接上硬菜,从源码角度剖析下,AQS加锁,它这个构造到底是怎么变动的呢?

注:以下剖析的都是独占模式下的加锁

  • 独占模式 : 锁只容许一个线程取得 NODE.EXCLUSIVE
  • 共享模式 :锁容许多个线程取得 NODE.SHARED

AQS加锁源码——acquire

 public final void acquire(int arg) {        if (!tryAcquire(arg) &&                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

乍一看这是什么啊,没关系,咱们能够把它画成流程图不便咱们了解,流程图如下

上面咱们来一个一个剖析,图文并茂,来吧宝贝儿。

AQS加锁源码——tryAcquire

 protected boolean tryAcquire(int arg) {        throw new UnsupportedOperationException();    }

这是什么状况?怎么间接抛出了异样?其实这是由AQS子类重写的办法,就相似lock锁,由子类定义尝试获取锁的具体逻辑

咱们平时应用lock锁时往往如下 (若不想看lock锁怎么实现的能够间接跳转到下一节

ReentrantLock lock = new ReentrantLock();        lock.lock();        try{           //todo        }finally {            lock.unlock();        }

咱们看下lock.lock()源码

    public void lock() {        sync.lock();    }

这个sync又是什么呢,咱们来看下lock类的总体属性就好了

所以咱们来看下 默认非偏心锁的加锁实现

static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;                final void lock() {            //将state状态从0设为1 CAS形式            if (compareAndSetState(0, 1))                //如果设定胜利的话,则将以后线程(就是本人)设为占有锁的线程                setExclusiveOwnerThread(Thread.currentThread());            else                //设置失败的话,就以后线程没有抢到锁,而后进行AQS父类的这个办法                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            //调用非偏心锁的办法            return nonfairTryAcquire(acquires);        }    }

当初压力又来到了nonfairTryAcquire(acquires)这里

 final boolean nonfairTryAcquire(int acquires) {            //取得以后线程            final Thread current = Thread.currentThread();            //取得以后锁的状态            int c = getState();            //如果锁的状态是0的话,就表明还没有线程获取到这个锁            if (c == 0) {                //进行CAS操作,将锁的状态改为acquires,因为是可重入锁,所以这个数字可能是>0的数字                if (compareAndSetState(0, acquires)) {                    //将以后持有锁的线程设为本人                    setExclusiveOwnerThread(current);                    //返回 获取锁胜利                    return true;                }            }// 如果以后锁的状态不是0,判断以后获取锁的线程是不是本人,如果是的话            else if (current == getExclusiveOwnerThread()) {                //则重入数加acquires  (这里acquires是1)  1->2  3->4 这样                int nextc = c + acquires;                if (nextc < 0) // overflow   异样检测                    throw new Error("Maximum lock count exceeded");                //将锁的状态设为以后值                setState(nextc);                //返回获取锁胜利                return true;            }            //以后获取锁的线程不是本人,获取锁失败,返回            return false;        }

由此可见,回到方才的问题,AQS中的tryAcquire是由子类实现具体逻辑的

AQS加锁源码——addWaiter

如果咱们获取锁失败的话,就要把以后线程包装成一个Node节点,那么具体是怎么包装的呢,也须要化妆师经纪人吗?
咱们来看下源码就晓得了
addWaiter(Node.EXCLUSIVE), arg) 这就代表增加的是独占模式的节点

 private Node addWaiter(Node mode) {        //将以后线程包装成一个Node节点        Node node = new Node(Thread.currentThread(), mode);        // 申明一个pred指针指向尾节点        Node pred = tail;        //尾节点不为空        if (pred != null) {            //将以后节点的前置指针指向pred            node.prev = pred;            //CAS操作将以后节点设为尾节点,tail指向以后节点            if (compareAndSetTail(pred, node)) {                //pred下一节点指针指向以后节点                pred.next = node;                //返回以后节点 (此时以后节点就曾经是尾节点)                return node;            }        }        //如果尾节点为空或者CAS操作失败        enq(node);        return node;    }

其中node的构造函数是这样的

 Node(Thread thread, Node mode) {     // Used by addWaiter      this.nextWaiter = mode;      this.thread = thread;  }

咱们能够通过图解的办法来更直观的来看下addWaiter做了什么

由图可知,如果已经尾节点不为空的时候,node节点会退出到队列开端,那么如果已经尾节点为空或者CAS失败调用
enq(node);会怎么样呢?

AQS加锁源码——enq

 private Node enq(final Node node) {        //死循环,直到有返回值        for (;;) {            //申明一个t的指针指向tail            Node t = tail;            //如果尾巴节点为空            if (t == null) { // Must initialize                //则CAS设置一个节点为头节点(头节点并没有包装线程!)这也是提早初始化头节点                if (compareAndSetHead(new Node()))                    //将尾指针指向头节点                    tail = head;            } else {  //如果尾节点不为空,则阐明这是CAS失败              // 将node节点前驱节点指向t                node.prev = t;                //持续CAS操作将本人设为尾节点                if (compareAndSetTail(t, node)) {                    //将t的next指针指向本人 (此时本人真的是尾节点了)                    t.next = node;                    //返回本人节点的前置节点,队列的倒数第二个                    return t;                }            }        }    }
  • 队列中的头节点,是提早初始化的,加锁时用到的时候才去输入话,并不是一开始就有这个头节点的
  • 头节点并不保留任何线程

end 尾分叉

     // 将node节点前驱节点指向t                    node.prev = t;                    1      //持续CAS操作将本人设为尾节点      if (compareAndSetTail(t, node)) { 2          //将t的next指针指向本人 (此时本人真的是尾节点了)          t.next = node;                3          //返回本人节点的前置节点,队列的倒数第二个          return t;      }

咱们留神到,enq函数有下面三行代码,3是在2执行胜利后才会执行的,因为咱们这个代码无时无刻都在并发执行,存在一种可能就是

1执行胜利,2执行失败(cas并发操作),3没有执行,所以就只有一个线程1,2,3都执行胜利,其余线程1执行胜利,2,3没有执行胜利,呈现尾分叉状况,如图所示

这些分叉失败的节点,在当前的循环中他们还会执行1,直总会指向新的尾节点,1,2,3这么执行,早晚会入队

AQS加锁源码——acquireQueued

final boolean acquireQueued(final Node node, int arg) {        // 是否有异样产生        boolean failed = true;        try {            //中断标记            boolean interrupted = false;            //开始自旋,要么以后线程尝试去拿到锁,要么抛出异样            for (;;) {                //取得以后节点的上一个节点                final Node p = node.predecessor();                //判断这个节点是不是头节点,如果是头节点则获取锁                if (p == head && tryAcquire(arg)) {                    //将此节点设为头节点                    setHead(node);                    //原来的头节点出队                    p.next = null; // help GC                    failed = false;                    //返回是否中断                    return interrupted;                }                // 阐明p不是头节点                // 或者                // p是头节点然而获取锁失败                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    // 中断标记设为true                    interrupted = true;            }        } finally {           //如果有异样产生的话            if (failed)            //勾销以后线程竞争锁,将以后node节点状态设置为cancel                cancelAcquire(node);        }    }

其中有一行代码是setHead(node);

 private void setHead(Node node) {        head = node;        node.thread = null;  //将head节点的线程置为空        node.prev = null;    }
  • 为什么要将头节点的线程置为空呢,是因为在 tryAcquire(arg)中就曾经记录了以后获取锁的线程了,在记录就多此一举了,咱们看前文中提到的nonfairTryAcquire(acquires)其中有一段代码
   if (compareAndSetState(0, acquires)) {          //将以后持有锁的线程设为本人             setExclusiveOwnerThread(current);            //返回 获取锁胜利          return true;      }

可见 setExclusiveOwnerThread(current);就曾经记录了取得锁的线程了

咱们acquireQueued返回值是中断标记,true示意中断过,false示意没有中断过,还记得咱们一开始吗,回到最后的终点

public final void acquire(int arg) {        if (!tryAcquire(arg) &&                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

如果返回了true,代表此线程有中断过,那么调用 selfInterrupt();办法,将以后线程中断一下

 static void selfInterrupt() {        Thread.currentThread().interrupt();    }

AQS加锁源码——shouldParkAfterFailedAcquire

程序运行到这里就阐明

 // 阐明p不是头节点// 或者// p是头节点然而获取锁失败if (shouldParkAfterFailedAcquire(p, node) &&    parkAndCheckInterrupt())    // 中断标记设为true    interrupted = true;}

咱们来剖析下shouldParkAfterFailedAcquire(p, node)的源码外面到底做了什么?

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        //获取以后节点的前置节点的状态        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)           //如果是SIGNAL(-1)状态间接返回true,代表此节点能够挂起            //因为前置节点状态为SIGNAL在适当状态 会唤醒后继节点            return true;        if (ws > 0) {            //如果是cancelled            do {                //则从后往前依此跳过cancelled状态的节点                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            //将找到的符合标准的节点的后置节点指向以后节点            pred.next = node;        } else {            //否则将前置节点期待状态设置为SIGNAL            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }

其中的node.prev = pred = pred.prev;能够看成

pred = pred.prev;node.prev = pred;

可见一顿操作后,队列中跳过了节点状态为cancelled的节点

AQS加锁源码——parkAndCheckInterrupt

shouldParkAfterFailedAcquire返回true时就代表容许以后线程挂起而后就执行 parkAndCheckInterrupt()这个函数

 private final boolean parkAndCheckInterrupt() {        // 挂起以后线程   线程卡在这里不再下执行,直到unpark唤醒        LockSupport.park(this);        return Thread.interrupted();    }

所以以后线程就被挂起啦

AQS加锁源码——cancelAcquire

咱们还记得前文中提到acquireQueued中的一段代码

  try {                  } finally {            if (failed)                cancelAcquire(node);        }

这是抛出异样时解决节点的代码,上面来看下源代码

private void cancelAcquire(Node node) {        //过滤掉有效节点        if (node == null)            return;        //以后节点线程置为空        node.thread = null;        //获取以后节点的前一个节点        Node pred = node.prev;        //跳过勾销的节点        while (pred.waitStatus > 0)            node.prev = pred = pred.prev;        //记录过滤后的节点的后置节点        Node predNext = pred.next;        //将以后节点状态改为CANCELLED        node.waitStatus = Node.CANCELLED;        // 如果以后节点是tail尾节点 则将从后往前找到第一个非勾销状态的节点设为tail尾节点        if (node == tail && compareAndSetTail(node, pred)) {            //如果设置胜利,则tail节点前面的节点会被设置为null            compareAndSetNext(pred, predNext, null);        } else {            int ws;            //如果以后节点不是首节点的后置节点            if (pred != head &&  //并且                    //如果前置节点的状态是SIGNAL                ((ws = pred.waitStatus) == Node.SIGNAL || //或者                        //状态小于0 并且设置状态为SIGNAL胜利                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&                    //并且前置节点线程不为null时                pred.thread != null) {                //记录下以后节点的后置节点                Node next = node.next;                //如果后置节点不为空 并且后置节点的状态小于0                if (next != null && next.waitStatus <= 0)                    //把以后节点的前驱节点的后继指针指向以后节点的后继节点                    compareAndSetNext(pred, predNext, next);            } else {                //唤醒以后节点的下一个节点                unparkSuccessor(node);            }            //将以后节点下一节点指向本人            node.next = node; // help GC        }    }

看起来太简单了,不过没关系,咱们能够拆开看,其中有这一段代码

       //以后节点线程置为空        node.thread = null;        //获取以后节点的前一个节点        Node pred = node.prev;        //跳过勾销的节点        while (pred.waitStatus > 0)            node.prev = pred = pred.prev;        //记录过滤后的节点的后置节点        Node predNext = pred.next;        //将以后节点状态改为CANCELLED        node.waitStatus = Node.CANCELLED;

如图所示

通过while循环从后往前找到signal状态的节点,跳过两头cancelled状态的节点,同时将以后节点状态改为CANCELLED

咱们能够把这简单的判断条件转换成图来直观的看一下

  • 以后节点是尾节点时,队列变成这样

  • 以后节点是head后继节点

  • 以后节点不是尾节点也不是头节点的后继节点(队列中的某个一般节点)

总结

太不容易了家人们,终于到了这里,咱们再来总结一下整体的流程

    public final void acquire(int arg) {        if (!tryAcquire(arg) &&                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

1.基于AQS实现的子类去实现tryAcquire尝试获取锁

2.如果获取锁失败,则把以后节点通过addWaiter办法包装成node节点插入队列

  • 如果尾节点为空或者CAS操作失败则调用enq办法保障胜利插入到队列,若节点为空则初始化头节点

3.acquireQueued办法,入队后的节点持续获取锁(此节点的前置节点是头节点)或者挂起

    • shouldParkAfterFailedAcquire判断节点是否应该挂起

      • 如果以后节点的前置节点是signal状态,则返回true,能够挂起
      • 如果以后节点的前置节点是cancelled,则队列会从以后节点的前一个节点开始从后向前遍历跳过cacelled状态的节点,将以后节点和非cacelled状态的节点连接起来,返回false,不能够挂起
      • 否则将前置节点期待状态设置为SIGNAL,返回false,不能够挂起
    • parkAndCheckInterrupt挂起以后线程
    • cancelAcquire将以后节点状态改为cancelld
    1. selfInterrupt(); 设置中断标记,将中断补上

    往期精彩举荐

    • 京东这道面试题你会吗?
    • ?线程池为什么能够复用,我是蒙圈了。。。
    • 学会了volatile,你变心了,我看到了
    • mysql能够靠索引,而我只能靠打工,加油,打工人!

    絮絮叨叨

    如果大家感觉这篇文章对本人有一点点帮忙的话

    若文章有误欢送指出,靓仔靓女,咱们下篇文章见,扫一扫,关注我,开启咱们的故事