如何对待AQS?

看了Synchronized的实现形式之后,再来看JDK的AQS,感觉就比较简单了,它的行为有点像银行排队,银行有很多窗口解决很多业务,不同的窗口解决不同的业务,比方有集体业务,也有金融业务,也有公司业务等,每个窗口都有很多人排队。一般来讲当你在窗口解决业务的时候是不会有人来打搅你的,不个别的时候是什么时候呢?那就是材料散发窗口,谁来了都给你一张材料,一边看去吧。

而且当你去解决业务的时候,你可能有些材料遗记带了,而后你又要从新去取材料,取完之后回来持续排队。

所以说,代码源于生存,今人诚不欺我。

带着问题看源码

  1. AQS中是如何实现线程阻塞的?
  2. AQS为什么反对多个Condition?
  3. 线程唤醒的时候程序是怎么的?

带着问题去看源码比一猛子扎进源码的陆地好,所以我倡议你带着这几个问题去看AQS源码。

CLH队列

AQS是一个抽象类,很多类都是基于它来实现本人的特有的性能的,比方Lock,Semaphore,CountDownLatch等,咱们都晓得对于一个共享变量,为了线程平安,同一时刻必定只能有一个线程线程能够写这个变量,也就是只有一个线程能拿到锁。那么那些没有拿到锁的线程是怎么被阻塞的呢?

AQS中保护了一个基于链表实现的FIFO队列(官网叫它CLH锁),那些没有获取到锁的线程会被封装后放入都在这个队列外面,咱们来看下这个队列的定义

static final class Node {  // 共享模式  static final Node SHARED = new Node();    // 排它模式,默认  static final Node EXCLUSIVE = null;  // 以后线程的一个期待状态,默认值为0   volatile int waitStatus;  // 前一个节点  volatile Node prev;  // 后一个节点  volatile Node next;  // 入队到这个节点的线程  volatile Thread thread;  // 下一个期待condition的节点或者是非凡的节点值: SHARED  Node nextWaiter;}

这里共享模式和排它模式是什么意思呢?排它模式是示意同时只能有一个线程获取到锁执行,而共享模式示意能够同时有多个线程执行,比方Semaphore/CountDownLatch。

waitStatus除了默认值外还有三个值,别离代表不同的含意

  1. CANCELLED = 1 : 示意以后节点曾经不能在获取锁了,以后节点因为超时或者中断而进入该状态,进入该状态的节点状态不会再发生变化,同时后续还会从队列中移除。
  2. SIGNAL = -1 : 示意以后节点须要去唤醒后继节点。 后继节点入队时,会将以后节点的状态更新为SIGNAL
  3. CONDITION = -2 : 以后节点处于条件队列中,当其余线程调用了condition.signal()后,节点会从条件队列本义到同步队列中
  4. PROPAGATE = -3 : 当共享锁开释的时候,这个状态会被传递到其余后继节点

其实本来的CLH队列是没有prev这个指针的,它存在的意义是那些勾销锁获取的线程须要被移除掉。

Synchronized的wait只能有一个条件队列(WaitSet),而AQS则是反对多个条件队列,其中条件队列的要害数据结构如下:

 public class ConditionObject implements Condition, java.io.Serializable {    // condition queue中的头节点  private transient Node firstWaiter;    // condition queue中的尾节点  private transient Node lastWaiter;  /**   * 创立一个新的condition   */  public ConditionObject() { }  // 相似于Object的notify  public final void signal() {    ...  }    // 相似于Object的wait  public final void await() throws InterruptedException {    ...  } }

AQS反对多个Condition的起因就是每个ConditionObject都对应一个队列,所以它能够反对多个Condition。

如果多个线程申请同一把锁,有的线程阻塞在申请锁下面,有的锁阻塞到期待某个条件成立。 那么当持有锁的线程让出锁的时候,哪个线程应该获取到锁呢?

当获取了锁的线程调用了signal()时,它又会不会从条件队列(condition queue)中出队一个node,退出到同步队列中呢?答案是会的。

如果我说AQS剖析结束,你们会不会打我?

哈哈哈,各位放下键盘,扶我起来,我还能在水一会儿。

AQS中除了这两个类之外,还有AQS自身还有几个重要的属性,它们独特形成了AQS这个框架类的根底构造

  // wait queue(或者叫做同步队列)的头节点  private transient volatile Node head;  // wait queue(或者叫同步队列)的尾节点  private transient volatile Node tail;  // 资源  private volatile int state;

看到了head和tail,是不是一个双向链表的含意跃然纸上了。

AQS 中的state变量,对不同的子类有不同的含意,该变量对 ReentrantLock 来说示意加锁状态,对Semaphore来说则是信号量的数量。总之它们都是基于AQS中的state来实现各自独有性能的。

这里的state我感觉更应该叫做资源,就相似于去银行排队时取的票。对于ReentrantLock来说,这个资源只有1个,对于Semaphore或者CountDownLatch来说资源能够有很多个。

Unsafe解说

unsafe是一个十分弱小的类,通过它提供的API,咱们能够操作内存,批改对象属性值,甚至内存屏障,加锁解锁都能通过它实现

图片来自于: https://tech.meituan.com/2019...

AQS中就应用了unsafe来实现线程的阻塞以及批改属性的值,这里把AQS中波及到unsafe的办法列出来

// 阻塞以后线程public native void park(boolean isAbsolute, long time);// 解锁指定的线程public native void unpark(Object thread);// 获取字段的偏移地址public native long objectFieldOffset(Field f);// CAS更新对象的值为x,当当初的值是expectedpublic final native boolean compareAndSwapObject(Object o, long offset,                                                     Object expected,                                                     Object x);// CAS更新public final native boolean compareAndSwapInt(Object o, long offset,                                                  int expected,                                                  int x);

park和unpark的底层实现其实还是之前咱们解说过的pthread,这里不再解说,感兴趣的能够去看看之前对于synchronized的文章。

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))... thread->parker()->park(isAbsolute != 0, time);...UNSAFE_END

批改对象的属性值,咱们用一段代码来演示

public class UnSafeDemo {  public static void main(String[] args) throws Exception {    Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");    theUnsafe.setAccessible(true);    Unsafe unsafe = (Unsafe) theUnsafe.get(null);    Student student = new Student();    student.setAge(18);    long ageOffset = unsafe.objectFieldOffset(Student.class.getDeclaredField("age"));    // cas批改student的age为20    unsafe.compareAndSwapInt(student, ageOffset, 18, 20);    System.out.println(student.getAge());  }}class Student {  int age;  public int getAge() {      return age;  }  public void setAge(int age) {      this.age = age;  }}

下面代码的输入后果为 20,咱们通过首先通过objectFieldOffset获取到age属性的内存偏移地址,而后通过compareAndSwapInt将age属性的值批改为20。

比方对于一维数组a[10],基地址是0x000000,那么a[1]的内存地址就是0x000001 = 0x000000(基地址) + 1(偏移量)

源码剖析

其余类如果要想将AQS作为基类以实现本人的性能,只须要依据需要实现以下办法就行了

  // 尝试获取锁  protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();  }  // 尝试开释锁  protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException();  }  // 尝试获取共享锁  protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException();  }  // 尝试开释共享锁  protected boolean tryReleaseShared(int arg) {    throw new UnsupportedOperationException();  }  // 锁是否被独占  protected boolean isHeldExclusively() {    throw new UnsupportedOperationException();  }

AQS中获取锁的办法是在acquire()中,开释锁的办法是release(),如果咱们想要实现自定义的锁的时候,只须要依据本人的需要实现对应的tryXXX办法就行了,为了简化剖析,咱们只剖析独占锁的源码。

获取锁

public final void acquire(int arg) {  if (!tryAcquire(arg) &&    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    selfInterrupt();}

咱们假如tryAcquire返回false(不关怀子类实现),也就是当初锁曾经被其余线程占有了。当初又有线程来获取锁必定是会获取失败的,所以失败的线程会被封装成Node插入到队列中

private Node addWaiter(Node mode) {    // 将nextWaiter设置为EXCLUSIVE,示意节点正以独占模式期待  Node node = new Node(Thread.currentThread(), mode);  // Try the fast path of enq; backup to full enq on failure  Node pred = tail;  if (pred != null) {      node.prev = pred;      // CAS将tail节点更新为node      if (compareAndSetTail(pred, node)) {          pred.next = node;          return node;      }  }  // 如果执行到这里就存在两种状况  // 1. pred == null,示意是第一次插入,以后同步队列中没有其余线程  // 2. 表明有其余线程批改了tail的值,导致CAS批改tail失败  enq(node);  return node;}// CAS更新tail节点private Node enq(final Node node) {  for (;;) {      Node t = tail;      // 解决第一次插入的状况,这里head = new Node()      if (t == null) {          if (compareAndSetHead(new Node()))              tail = head;      } else {          // 解决竞争强烈的状况或者第一次插入后未建设链路关系的状况          node.prev = t;          if (compareAndSetTail(t, node)) {              t.next = node;              return t;          }      }  }}

下面逻辑很简略,就是结构一个排它锁模式的node,插入队列中(尾插入)。

然而你尤其须要留神到enq中,当节点第一次插入的时候,head的值是new Node(),它是一个虚构节点,这个节点自身没有可运行的线程。

在链表中,应用这种虚构节点很失常,有时候更加有利于操作。

enq执行实现后就造成了这样的链路关系:

acquireQueued的源码如下

final boolean acquireQueued(final Node node, int arg) {  boolean failed = true;  try {    boolean interrupted = false;    for (;;) {      final Node p = node.predecessor();      // 1. 如果前一个节点是头节点,则尝试再次获取锁(机会更大)      if (p == head && tryAcquire(arg)) {          setHead(node);          p.next = null; // help GC          failed = false;          return interrupted;      }      // 2. 获取锁失败,须要查看并更新节点状态      if (shouldParkAfterFailedAcquire(p, node) &&          parkAndCheckInterrupt())          interrupted = true;    }  } finally {      // 3. 如果线程被中断,须要将节点从队列中移除      if (failed)          cancelAcquire(node);  }}

逻辑也比拟简单明了,在线程被阻塞之前,如果前一个节点是头节点(head),那么在尝试去获取一次锁,如果胜利了就间接返回。

如果还是失败的话,就更新下节点状态,而后将线程阻塞

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  int ws = pred.waitStatus;  // 前置节点曾经是SIGNAL了,能够释怀的park了  if (ws == Node.SIGNAL)      return true;  if (ws > 0) {      // 如果前置节点状态是CANCELLED,须要将节点移除      do {          node.prev = pred = pred.prev;      } while (pred.waitStatus > 0);      pred.next = node;  } else {      // 这就是咱们之前说的在入队的时候,会通过CAS将前置节点的状态设置为SIGNAL      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  }  return false;}

线程阻塞的代码更简略

private final boolean parkAndCheckInterrupt() {  // 调用了unsafe的park阻塞以后线程  LockSupport.park(this);  return Thread.interrupted();}

至此,获取锁的源码就解析实现了。

从下面的代码,咱们能够晓得 head --> next --> next --> tail 这样的数据结构组成了同步队列,期待获取锁的线程会被封装成node插入到队列中去。

开释锁

release()中次要调用了unparkSuccessor()办法来唤醒后继节点,源码如下

// 传入的node是head节点private void unparkSuccessor(Node node) {    // 1. 获取结点状态  int ws = node.waitStatus;  // 2. 应用CAS更新waitStatus为默认值0  if (ws < 0)      compareAndSetWaitStatus(node, ws, 0);  // 3. 通过循环找到后继节点中状态不为CANCELLED的节点  Node s = node.next;  if (s == null || s.waitStatus > 0) {      s = null;      for (Node t = tail; t != null && t != node; t = t.prev)          if (t.waitStatus <= 0)              s = t;  }  // 4. 调用unsafe解锁线程  if (s != null)      LockSupport.unpark(s.thread);}

之前提到过head节点是虚构节点,所以会调用unsafe.park去解锁head.next节点

条件变量(Condition)

Java 语言内置的管程(synchronized)里只有一个条件变量,而咱们的AQS是反对多个条件变量的,Java中定义了一个接口,叫做Condition,AQS的外部类ConditionObject实现了这个接口。

public interface Condition { // 使得以后线程被阻塞,直到收到信号(signal)或者线程被中断 void await() throws InterruptedException; // 唤醒一个期待线程 void signal();  // 唤醒所有的期待线程 void signalAll(); // 省略其余办法}

Condition中线程期待和告诉须要调用await()signal()signalAll(),它们的语义和 wait()notify()notifyAll() 是雷同的。然而不一样的是,Condition里只能应用后面的 await()signal()signalAll(),而前面的 wait()notify()notifyAll() 只有在 synchronized 实现的管程里能力应用。

await

public final void await() throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    // 将以后线程结构成一个waiter节点,waitStatue的值为-2(CONDITION),将它插入到条件队列中    Node node = addConditionWaiter();    // 开释持有的锁    int savedState = fullyRelease(node);    int interruptMode = 0;    // 如果不在同步队列中,则阻塞以后线程    while (!isOnSyncQueue(node)) {        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    // 在同步队列中,有可能被唤醒了,须要去从新获取锁。并解决中断逻辑    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);} private Node addConditionWaiter() {  Node t = lastWaiter;    // 如果lastWaiter不是出于condition状态,则须要移除掉  if (t != null && t.waitStatus != Node.CONDITION) {      unlinkCancelledWaiters();      t = lastWaiter;  }  // 将以后线程构建为一个condition node  Node node = new Node(Thread.currentThread(), Node.CONDITION);  if (t == null)      firstWaiter = node;  else      t.nextWaiter = node;  // 批改lastWaiter的指向  lastWaiter = node;  return node;}

await()办法的逻辑也比较简单,其执行的次要流程如下

  1. 如果以后线程曾经被中断则抛出异样,未中断则执行步骤2
  2. 将以后线程结构为condition node,并插入到条件队列中
  3. 如果condition node不在同步队列中(有可能被唤醒后,移出条件队列了),则调用unsafe.park阻塞以后线程

通过以上代码剖析,await之后条件队列会造成上面这样的数据结构

firstWaiter --> nextWaiter --> nextWaiter --> lastWaiter
这里的firsetWaiter并不是虚构节点,当只有一个线程在条件队列中时,firstWaiter == node == lastWaiter

signal

public final void signal() {  if (!isHeldExclusively())      throw new IllegalMonitorStateException();  // 获取第一个期待节点,进行唤醒  Node first = firstWaiter;  if (first != null)      doSignal(first);}private void doSignal(Node first) {  do {      // 将节点从条件队列中移除      if ( (firstWaiter = first.nextWaiter) == null)          lastWaiter = null;      first.nextWaiter = null;  } while (!transferForSignal(first) &&           (first = firstWaiter) != null);}// 将节点从条件队列挪动到同步队列final boolean transferForSignal(Node node) {   // CAS批改waitStatus失败,就证实node处于CANCELLED状态  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))      return false;  // enq的源码下面下面获取锁有提到过,就是将节点插入同步队列并返回前置节点  Node p = enq(node);  int ws = p.waitStatus;  // 如果前置节点状态是CANCELLED或者批改waitStatus状态为SIGNAL失败,那么须要唤醒刚插入的节点  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))      LockSupport.unpark(node.thread);  return true;}

其实signal的逻辑也是很简略的,不晓得为什么写到signal的时候,我就想起了我过后写synchronized的时候,一样的索然无味,无欲无求。我想可能是没人给我点赞。

说回signal,它的次要逻辑如下

  1. 将第一个期待节点从期待队列中移除
  2. 批改第一个期待节点的waitStatus为0
  3. 将节点入队到同步队列,并返回前一个节点
  4. 判断前一个节点是否处于CANCELLED或者waitStatus是否失常批改,如果不能则解锁刚入队的节点

你须要留神的是,signal并没有开释本人取得的锁。开释锁的操作依然是通过release的。

总结

通过源码的剖析,我置信你肯定能够答复上之前的三个问题。也让咱们晓得了AQS和Synchronized其实外在实现形式是很相似的。

AQS中有同步队列和条件队列,signal的时候是将节点从条件队列转移到同步队列。
Synchronized中有EntryList和WaitSet。notify的时候将线程从WaitSet挪动到EntryList中。

同样的,看源码我集体更喜爱沿着一条线去看,而不是大而全的看,这样更容易把控整个脉络。