乐趣区

关于java:连肝4天这瞬间戳中面试官小心心的AQS大餐给大家安排上

点赞的靓仔,你时人群中最闪耀的光辉

前言

AQS,英文全称 AbstractQueuedSynchronizer,间接翻译为形象的队列同步器。是 JDK1.5 呈现的一个用于解决并发问题的工具类,由赫赫有名的 Doug Lea 打造,与 synchornized 关键字不同的是,AQS 是通过代码解决并发问题。

回顾并发问题

并发问题是指在多线程运行环境下,共享资源平安的问题。
当初的银行账户,通过银行卡和手机银行都能够操作账户,如果咱们同时拿着银行卡和存折去银行搞事件,会怎么样呢?

package demo.pattren.aqs;

public class Money {
    /**
     * 假如当初账户有 1000 块钱
     */
    private int money = 1000;
    /**
     * 取钱
     */
    public void drawMoney(){this.money--;}
    public static void main(String[] args) throws InterruptedException {Money money = new Money();
        for(int i=0; i<1000; i++){new Thread(() -> {
                try {Thread.sleep(1);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                money.drawMoney();},i + "").start();}
        Thread.sleep(2000);
        System.out.println("以后账户余额:" + money.money);
    }
}


这样想着是不是马上能够去银行搞一波事件?哈哈,你想太多了,如果能这样搞,银行早破产了。咱们次要是来剖析一下呈现这个问题的起因,JVM 内存是 JMM 构造的,每个线程操作的数据是从主内存中复制的一个和备份,而多个线程就会存在多个备份,当线程中的备份数据被批改时,会将值刷新到主内存,比方多个线程同时获取到了账户的余额为 500 元,A 线程存钱 100,线程 A 将 600 刷新到主内存,$\color{red}{主内存并不会被动告诉其余线程此时值曾经被批改}$,所以主内存的值此时与其余线程的值是不同的,如果其余线程再操作账户余额,是在 500 的根底上进行的,这显然不是咱们想要的后果。

解决并发问题

JDK 提供了多种解决多线程平安的形式。

volatile 关键字

volatile 是 JDK 提供的关键字,用来润饰变量,volatile 润饰的变量可能保障多个线程下的可见性,如上个案例,A 批改了账户的余额,而后将最新的值刷新到主内存,此时主内存会将最新的值同步到其余线程。

volatile 解决了多线程下数据读取统一的问题,$\color{red}{即保障可见性,然而其并不能保障写操作的原子性}$,

当多个线程同时写操作的时候,即多个线程同时去将线程中最新的值刷新到主内存,将会呈现问题。

通过 volatile 关键字润饰 money 变量,发下并不能解决线程平安问题。

原子操作类

原子操作类是 JDK 提供的一系列保障原子操作的工具类,原子类能够保障多线程环境下对其值的操作是平安的。

package demo.pattren.aqs;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicMoney {
    /**
     * 假如当初账户有 1000 块钱
     */
    private AtomicInteger money = new AtomicInteger(1000);
    /**
     * 取钱
     */
    public void drawMoney(){
        //AtomicInteger 的自减操作
        this.money.getAndDecrement();}
    public static void main(String[] args) throws InterruptedException {AtomicMoney money = new AtomicMoney();
        for(int i=0; i<1000; i++){new Thread(() -> {
                try {Thread.sleep(1);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                money.drawMoney();},i + "").start();}
        Thread.sleep(2000);
        System.out.println("以后账户余额:" + money.money);
    }
}


屡次测试后果都是 0,与预期统一。原子操作类是应用 CAS(Compare and swap 比拟并替换)的机制来保障操作的原子性,绝对于锁,他的并发性更高。

synchronized 关键字

synchronized 关键字是 jvm 层面来保障线程平安的,通过在代码块前后增加 monitorenter 与 monitorexit 命令来保障线程的平安,而且在 JDK1.6 对 synchronized 关键字做了较大的优化,性能有了较大的晋升。能够确定的是,通过 synchronized 必定能够保障线程平安,所以应用 synchronized 也是很好的抉择,当然 synchronized 锁的降级不可逆特色,导致在高并发下性能是不能很好的保障。

Lock 锁

终于迎来了本篇文章的配角,后面的内容,其实与文章的主题 AQS 并没有间接的关联,就简略带过。后面很多都是 JVM 层面来保障线程平安的,而 AQS 则是齐全通过代码层面来解决线程平安的。
(PS:大节题目明明是 Lock 锁,怎么写 AQS 了,骗我读书少)

博主怕挨打,正在全力解释中 ~。先上类图压场!

如上图,右边是形象队列同步器,而左边则是应用队列同步器实现的性能——锁、信号量、发令枪等。
能够先不看源码,咱们本人思考,要以纯代码的形式实现该当思考哪些问题?

  1. 线程互斥:能够应用 state 状态进行判断,state=0,则能够获取到锁,state>0, 则不能获取。
  2. 排队等待:不能获取锁的线程该当存储起来,当锁开释后能够持续获取锁执行。
  3. 线程唤醒:当锁开释后,处于期待状态的线程该当被唤醒。
  4. 锁重入:如何解决同一个进入多个加锁的办法(不解决的话分分钟死锁给你看)。

对于 1、2 两点,难度应带不大,而 3、4 两点如何去设计呢?咱们通过伪代码预演操作流程。

在业务端,是这样操作的。

  加锁
  {须要被锁住的代码}
  开释锁

加锁与开释锁的逻辑

    if(state == 0)
      获取到锁
      set(state == 1)
    else
      持续期待
      while(true){if(state == 0)
             再次尝试获取锁
      }

这样设计之后,整个操作流程再次变成了串行操作。

这和咱们去食堂排队打饭是一样的,食堂不可能为每个学生都凋谢一个窗口,所以多个学生就会争抢无限的窗口,如果没有肯定的管制,那么食堂每到吃饭的时候都是乱套的,一群学生围着窗口同时去打饭,想想都是如许的恐怖。而由此呈现了排队的机制,一个窗口同一时间打饭的人只能有一个,以后一个人来到窗口后,前面排队的学生能力去打饭。

源码解读

上面咱们深刻 JDK 源码,领略大师级的代码设计。
业务调用代码:

package demo.aqs;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockMoney {Lock lock = new ReentrantLock();
    /**
     * 假如当初账户有 1000 块钱
     */
    private int money = 1000;
    //private int money = 1000;
    /**
     * 取钱
     */
    public void drawMoney(){lock.lock();
        this.money--;
        lock.unlock();}
    public static void main(String[] args) throws InterruptedException {LockMoney money = new LockMoney();
        for(int i=0; i<1000; i++){new Thread(() -> {
                try {Thread.sleep(1);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                money.drawMoney();},i + "").start();}
        Thread.sleep(2000);
        System.out.println("以后账户余额:" + money.money);
    }
}

追踪 Lock 办法:
间接看源码根本一会儿就晕车,我尝试绘制出 lock 办法的调用链路。而后联合源码解释。


大家跟着箭头走一遍源码,多多少少可能领会到 AQS 的实现机制。

NonfairSync.lock

final void lock() {
    //CAS 尝试将 state 从 0 更新为 1,更新胜利则执行 if 上面的代码。if (compareAndSetState(0, 1))
        // 获取锁胜利,执行线程执行
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 获取锁失败,线程入队列
        acquire(1);
}

看到这段代码,是不是霎时明确后面提到的 1、2 两点问题。首先 compareAndSetState 办法是应用 Unsafe 间接操作内存并且应用乐观锁的形式,可能保障有且仅有一个线程可能操作胜利,是多线程平安的。即设置将 state 设置为 1 胜利的线程可能抢占到锁 ( 线程互斥 ),而没有设置胜利的线程将进行入队操作( 排队等待),这样感觉霎时清朗了许多,那咱们接着往下看。

AbstractQueuedSynchronizor.acquire

 public final void acquire(int arg) {
    //tryAcquire 失败并且 acquireQueued 胜利,则调用 selfInterrupt
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 当线程获取锁失败并且线程阻塞失败会中断线程
        selfInterrupt();}

AbstractQueuedSynchronizor 的 tryAcquire 办法, 其最终调用到了 Sync 的 nonfairTryAcquire

 final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
    // 获取以后锁的状态值
    int c = getState();
    // state = 0, 示意以后锁为闲暇状态,其实这一段代码和后面 lock 的办法是一样的
    if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 不等于 0 则判断以后线程是否为持有锁的线程,如果是则执行代码,这里解决了重入锁问题
    else if (current == getExclusiveOwnerThread()) {// 以后状态值 + 1(能够看后面的传参)
        int nextc = c + acquires;
        // 囧,这里是超出了 int 的最大值才会呈现这样的状况
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新 state 的值
        setState(nextc);
        return true;
    }
    return false;
}

通过浏览源码,能够发现,tryAcquire 办法在以后线程获取锁胜利或者是重入锁的状况下返回 true,否则返回 false。而同时这个办法解决了下面提到的第 4 点锁重入的问题。ok,感觉越来越靠近假相了,接着看 addWaiter 办法。
了解 addWaiter 办法的代码,先看办法中用的得 Node 对象。Node 对象是对 Thread 对象的封装,使其具备线程的性能,同时他还有 prev、next 等属性。那么很明了,Node 是一个链表构造的对象

   // 前一个结点
   volatile Node prev;
   // 下一个结点
   volatile Node next;

同时 AbstractQueuedSynchronizor 中蕴含 head、tail 属性

 //Node 链表的头结点
 private transient volatile Node head;
 //Node 链表的尾结点
 private transient volatile Node tail;
private Node addWaiter(Node mode) {
    // 将以后线程包装为 Node 对象
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点,当这段代码第一次运行的时候,并没有尾结点
    // 所以必定值为 null,那么会执行上面的 enq 办法
    Node pred = tail;
    // 当再次运行代码的时候,尾结点不再为 null(enq 办法初始化了尾结点, 能够先往下看 enq 办法源码)
    if (pred != null) {
        // 以后结点的前置结点指向之前的尾结点
        node.prev = pred;
        //CAS 尝试将尾结点从 pred 设置为 node
        if (compareAndSetTail(pred, node)) {
            // 设置胜利则将 pred 的 next 结点执行 node
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

下面的解释听着有点绕脑袋。

不焦急,咱们先看 enq 办法

private Node enq(final Node node) {
    // 死循环
    for (;;) {
        // 获取尾结点
        Node t = tail;
        // 尾结点为空,则初始化尾结点和头结点为同一个新创建的 Node 对象
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将以后结点设为为尾结点,并将前一个尾结点的 next 指向以后结点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                // 退出循环
                return t;
            }
        }
    }
}

enq 具体做了什么事件呢:

  1. 第一次循环,初始化头结点与尾结点 new Node()
  2. 第二次循环,将以后线程封装的 Node 对象设置为尾结点,并将前一个尾结点的 next 指向此 Node

这里须要一些工夫 + 空间的想象力,但如果对链表构造比拟相熟的话,这里了解也是不太艰难的。
咱们动静的想一想执行过程:

  1. 第一个线程进入 lock 办法,此时是必定能够获取到锁,间接执行,不会进入到 addWaiter 办法
  2. 第二个线程进入 lock 办法,咱们假如第一个线程还没有开释锁,此时进入执行 enq 办法,enq 进行链表的初始化。

  1. 第三个线程以及更多的线程进入 lock 办法,此时不再执行 enq 办法,而是在初始化之后的链表进行链接。

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
  // 局部变量
  boolean failed = true;
  try {
      // 局部变量
      boolean interrupted = false;
      // 死循环
      for (;;) {
          // 获取前置结点
          final Node p = node.predecessor();
          // 前置结点为 head 并且尝试获取锁胜利,则不阻塞
          if (p == head && tryAcquire(arg)) {setHead(node);
              p.next = null; // help GC
              failed = false;
              return interrupted;
          }
          // 阻塞操作,判断是否应该阻塞 并且 阻塞是否胜利
          if (
                // 是否在抢占锁失败后阻塞
              shouldParkAfterFailedAcquire(p, node) &&
              //Unsafe 操作使线程阻塞
              parkAndCheckInterrupt())
              interrupted = true;
      }
  } finally {if (failed)
          cancelAcquire(node);
  }
}

shouldParkAfterFailedAcquire 剖析

//Node pred 前置结点, Node node 以后结点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前置结点的期待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 唤醒信号,即前结点失常,就设置 waitStatus 为 SIGNAL,示意前置结点能够唤醒以后结点,那          * 么以后结点才会安心的被阻塞(如果前置结点不失常,可能就会导致本人不能被唤醒,那必定不          * 能安心睡觉的)
         */
        return true;
    if (ws > 0) {
        /*
         * 找到前置结点中 waitStatus <= 0 的 Node 结点并设置为以后结点的前置结点
         * 此状态示意结点不是处于失常状态,那么将他从链表中删除,直到找到状态失常的结点
         */
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 当 waitStatus = 0 或者 PROPAGATE(-3) 时,CAS 设置值为 SIGNAL(-1)
         * 此状态示意线程失常,但没有设置唤醒,个别为 tail 的前一个结点,那么须要将其设置为可唤醒          * 状态(SIGNAL)
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

图解如下。

至此,咱们理解了 AQS 对须要期待的线程存储的过程。
而 AQS 的解锁以及偏心锁、非偏心锁,共享锁、独享锁等后续跟上。

参考资料:

https://www.cnblogs.com/water…
https://www.jianshu.com/p/d61…

退出移动版