关于java:☕JAVA技术之旅让我们认识一下AQS的原理2

30次阅读

共计 7958 个字符,预计需要花费 20 分钟才能阅读完成。

前提概要

理解过 JUC 的源码,咱们就能够晓得 JUC 上面很多工具的实现都是依附 AQS,而AQS 中用于保留期待线程的队列就是CLH,下图就是并发编程 AQS 的根底家族谱图。

CLH 队列

定义概念

CLH 是一个 FIFO 的队列 ,队列的每一个节点都是一个Node 对象。以后线程获取同步状态失败的时候就会进入 CLH 队列。而当首节点同步状态被开释的时候会告诉后置节点再次去获取同步状态。

CLH 也是一种基于单向链表 (隐式创立) 的高性能、偏心的自旋锁 。申请加锁的线程只须要在其前驱节点的本地变量上自旋, 极大地缩小了不必要的处理器缓存同步的次数,升高了总线和内存的开销。

CLH锁即Craig, Landin, and Hagersten (CLH) locks。CLH 锁是一个自旋锁。能确保无饥饿性。提供先来先服务的公平性。

算法设计

算法思路

放弃时序的锁基本思路就是将期待获取锁的线程放入汇合,锁开释后,期待线程之一获取到锁

如何排队

CLH 应用反向单链表的模式进行排队。也就是后继节点被动询问,而不是前继节点被动告诉。

是否偏心

CLH 是偏心锁。即后申请获取锁的排在队列开端。

如何唤醒

CLH 通过每个线程自旋。每个期待线程通过一直自旋前继节点状态判断是否能获取到锁。


算法实现

CLH 排队应用偏心形式,锁外部须要保留【尾节点】的援用,每次排队线程退出到队列最开端

CLH 锁应用反向链表形式进行排队,那么每个线程就须要保护【本人的状态】和放弃一个【前向节点的援用】

CLH 应用一个【boolean 值】示意以后线程获取锁状态,并且此状态存储在前置节点中,而非本节点外部。false 示意以后线程开释锁;true 示意以后线程期待获取锁或曾经获取到锁。


AQS 根本介绍

AQS 将线程封装到一个 Node 外面,并保护一个 CLH Node FIFO 队列,它是一个非阻塞的 FIFO 队列,也就是说在并发条件下往此队列做插入或移除操作不会阻塞。是通过自旋锁和 CAS 保障节点插入和移除的原子性,实现无锁疾速插入。

AbstractQueuedSynchronizer 次要就是保护了一个 state 属性、一个 FIFO 队列和线程的阻塞与解除阻塞操作

  • state 示意同步状态,它的类型为 32 位整型,对 state 的更新必须要保障原子性
  • 这里的队列是一个双向链表,每个节点外面都有一个 prev 和 next,它们别离是前一个节点和后一个节点的援用
  • 留神的是此双向链表除了链头其余每个节点外部都蕴含一个线程,而链头能够了解为一个空节点。

具体构造如下图所示:

每一个节点的根本构造组成部分为:


WaitStatus 的为节点状态:

waitStatus 示意的是后续节点状态 ,这是因为 AQS 中应用 CLH 队列实现线程的构造治理, 而 CLH 构造正是用前一节点某一属性示意以后节点的状态,这样更容易实现勾销和超时性能

有五种状态:
SIGNAL:值为 -1,示意以后节点的后续节点中的线程通过 park 被阻塞了,以后节点在

开释或勾销时要通过 unpark 解除它的阻塞。

CANCELLED:值为 1,示意以后节点的线程因为超时或中断被勾销了
CONDITION:值为 -2,示意以后节点在 condition 队列中
PROPAGATE:值为 -3,共享模式的头结点可能处于此状态,示意无条件往下流传,引入此状态是为了优化锁竞争,使队列中线程有序地一个一个唤醒
INITIAL: 值为 0,除了以上四种状态的第五种状态,个别是节点初始状态

前驱节点 prev

次要是为了实现超时及勾销语义、解锁本身等语义,前驱节点勾销后,后驱节点只需向前找到一个未勾销的前驱节点即可

后续节点 next

次要是为了优化后续节点的查找,防止每次从尾部向前查找;

nextWaiter

用于 示意 condition 队列的后续节点,此时 prev 和 next 属性将不再应用,而且节点状态处于 Node.CONDITION;

下面是对节点及节点组成队列的构造的介绍,接着介绍 AQS 相干的一些操作,包含锁的获取与开释、队列的治理、同步状态的更新、线程阻塞与唤醒、勾销中断与超时中断等等。


AQS 运作原理

依据下面的学习,咱们晓得一个线程在尝试获取锁失败后将被阻塞并退出期待队列中,它是一个怎么的队列?又是如何治理此队列,咱们先简略的看看大略的逻辑和思路。

什么是同步器

  • 多线程并发的执行之间通过某种共享状态来同步,只有当状态满足 A 条件,能力触发线程执行 B。这个独特的语义能够称之为同步器。能够认为大多数的锁机制都能够基于同步器定制来实现的。而 JUC(java.util.concurrent)里的思维是,将这些场景形象进去的语义通过对立的同步框架来反对。
  • JUC 里所有的这些锁机制都是基于 AQS(AbstractQueuedSynchronizer)框架上构建的。上面简略介绍下 AQS(AbstractQueuedSynchronizer)能够参考 Doug Lea 的论文 The java.util.concurrent Synchronizer Framework。
  • Lock 的实现类其实都是构建在 AbstractQueuedSynchronizer 上,每个 Lock 实现类都持有本人外部类 Sync 的实例,而这个 Sync 就是继承AbstractQueuedSynchronizer(AQS)。 为何要实现不同的 Sync 呢?另外还有 AQS 的 State 机制。当前文章会举例说明不同同步器内的 Sync 与 state 实现。

AQS 框架如何构建同步器

同步器的基本功能

一个同步器至多须要蕴含两个性能

(1)获取(设置)同步状态:如果容许,则获取锁,如果不容许就阻塞线程,直到同步状态容许获取

(2)开释同步状态:批改同步状态,并且唤醒期待线程

AQS 同步机制同时思考了如下需要

(1)独占锁和共享锁两种机制

(2)线程阻塞后,如果须要勾销,须要反对中断

(3)线程阻塞后,如果有超时要求,应该反对超时后中断的机制

同步状态的获取与开释

AQS 实现了一个同步器的根本构造,上面以独占锁与共享锁离开探讨,来阐明 AQS 怎么实现获取、开释同步状态

独占模式

独占获取:tryAcquire 自身不会阻塞线程,如果返回 true 胜利就持续,如果返回 false 那么就阻塞线程并退出阻塞队列。

public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 获取失败,则退出期待队列 
        selfInterrupt();} 

独占且可中断模式获取:反对中断勾销

public final void acquireInterruptibly(int arg) 
                         throws InterruptedException {if (Thread.interrupted()) 
        throw new InterruptedException(); 
        if (!tryAcquire(arg)) 
          doAcquireInterruptibly(arg); 
 } 

独占且反对超时模式获取:带有超时工夫,如果通过超时工夫则会退出。

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws 
                            InterruptedException {if (Thread.interrupted()) 
       throw new InterruptedException(); 
       return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); 
}

独占模式开释:开释胜利会唤醒后续节点


public final boolean release(int arg) {if (tryRelease(arg)) { 
        Node h = head; 
        if (h != null && h.waitStatus != 0) 
            unparkSuccessor(h); 
            return true; 
    } 
    return false; 
} 

共享模式

共享模式获取

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) 
        doAcquireShared(arg); 
 }

可中断模式共享获取

public final void acquireSharedInterruptibly(int arg) throws 
       InterruptedException {if (Thread.interrupted()) 
            throw new InterruptedException(); 
        if (tryAcquireShared(arg) < 0) 
            doAcquireSharedInterruptibly(arg); 
 } 
 

共享模式带定时获取

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws 
             InterruptedException {if (Thread.interrupted()) 
        throw new InterruptedException(); 
    return tryAcquireShared(arg) >= 0 || 
        doAcquireSharedNanos(arg, nanosTimeout); 
} 

共享锁开释

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared(); 
        return true; 
    } 
    return false; 
} 

留神以上框架只定义了一个同步器的根本构造框架,的根本办法里依赖的 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 四个办法在 AQS 里没有实现,这四个办法不会波及线程阻塞,而是由各自不同的应用场景依据状况来定制:


protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException(); 
} 

protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
} 

protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException(); 
} 

protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException(); 
} 
状态获取、开释胜利或失败的后续行为:线程的阻塞、唤醒机制

有别于 wait 和 notify。这里利用 jdk1.5 开始提供的 LockSupport.park()和 LockSupport.unpark() 的本地办法实现,实现线程的阻塞和唤醒。

失去锁的线程禁用 (park) 和唤醒(unpark),也是间接 native 实现(这几个 native 办法的实现代码在 hotspot\src\share\vm\prims\unsafe.cpp 文件中,然而要害代码 park 的最终实现是和操作系统相干的,比方 windows 下实现是在 os_windows.cpp 中,有趣味的同学能够下载 jdk 源码查看)。

唤醒一个被 park()线程次要伎俩包含以下几种:

  1. 其余线程调用以被 park()线程为,参数的 unpark(Thread thread)。
  2. 其余线程中断被 park()线程, 如:waiters.peek().interrupt()waiters 为存储线程对象的队列.
  3. 不知起因的返回。

park()办法返回并不会报告到底是上诉哪种返回,所以返回好最好查看下线程状态,如

   LockSupport.park();  // 禁用以后线程
   if(Thread.interrupted){//doSomething}

AbstractQueuedSynchronizer(AQS)对于这点实现得相当奇妙,如下所示

private void doAcquireSharedInterruptibly(int arg) 
             throws InterruptedException {final Node node = addWaiter(Node.SHARED);
    try {for (;;) {final Node p = node.predecessor();
         if (p == head) {int r = tryAcquireShared(arg);
            if (r >= 0) {setHeadAndPropagate(node, r);
                // help GC
                p.next = null; 
                return;
            }    
      }
    //parkAndCheckInterrupt()会返回 park 住的线程在被 unpark 后的线程状态,如果线程中断,跳出循环。if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            break;
      }
    } catch (RuntimeException ex) {cancelAcquire(node);
      throw ex;
    }    
    // 只有线程被 interrupt 后才会走到这里
    cancelAcquire(node);
    throw new InterruptedException();}


// 在 park()住的线程被 unpark()后,第一工夫返回以后线程是否被打断
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
    return Thread.interrupted();}

线程阻塞队列的保护

阻塞线程节点队列 CHL Node queue

AQS 里将阻塞线程封装到一个外部类 Node 里。并保护一个 CHL Node FIFO 队列。CHL 队列是一个非阻塞的 FIFO 队列,也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保障节点插入和移除的原子性。实现无锁且疾速的插入。对于非阻塞算法能够参考 Java 实践与实际: 非阻塞算法简介。CHL 队列对应代码如下:

/**

* CHL 头节点

*/ 

private transient volatile Node head; 

/**

* CHL 尾节点

*/ 

private transient volatile Node tail; 

Node 节点是对 Thread 的一个封装,构造大略如下:static final class Node { 

 /** 代表线程曾经被勾销 */

    static final int CANCELLED =  1; 

    /** 代表后续节点须要唤醒 */ 

    static final int SIGNAL    = -1; 

    /** 代表线程在期待某一条件 /

    static final int CONDITION = -2;

    /** 标记是共享模式 */ 

    static final Node SHARED = new Node(); 

    /** 标记是独占模式 */ 

    static final Node EXCLUSIVE = null; 

    /**
     * 状态位,别离能够使 CANCELLED、SINGNAL、CONDITION、0
     */ 

    volatile int waitStatus; 

    /**

    * 前置节点

    */ 

    volatile Node prev; 

    /**
    * 后续节点
    */ 

    volatile Node next; 

    /**

    * 节点代表的线程

    */ 

    volatile Thread thread; 

    /**

    * 连贯到期待 condition 的下一个节点

    */ 

    Node nextWaiter; 

} 

入列

通过下面的 CLH 同步队列结构图咱们根本能够猜到,CLH 同步队列入列是怎么回事。就是将以后同步队列的最初一个节点的 next 指向增加的节点,并将增加节点的 prev 指向最初一个节点。同时须要将 tail 指向新增加的节点。

上面咱们来查看一下具体的办法 addWaiter(Node mode)的源码。


private Node addWaiter(Node mode){
    // 新建 node
    Node node =new Node(Thread.currentThread(), mode);
    // 疾速尝试增加尾节点
    Node pred = tail;
    if(pred !=null) {
      node.prev = pred;
      // 应用 cas 设置尾节点
      if(compareAndSetTail(pred, node)) {
           pred.next = node;
            return node;
      }
    }
    // 循环设置尾节点
    enq(node);
    return node;
}

在 addWaiter 办法中咱们会尝试获取尾节点并进行尾节点的设置,如果胜利就间接返回,如果没有胜利就调用 enq(final Node node)进行设置,上面咱们来看看 enq(final Node node)办法的具体实现。


private Node enq(final Node node){
  // 死循环尝试,直到胜利
    for(;;) {
      Node t = tail;
     //tail 不存在,设置成首节点  
     if(t ==null) {
        // Must initialize
         if(compareAndSetHead(new Node()))
            tail = head;
     }else{
      // 设置尾尾节点
       node.prev = t;
       if(compareAndSetTail(t, node)) {
          t.next = node;
          return t;
       }
    }
  }
}

AQS 尽管实现了 acquire,和 release 办法是可能阻塞的,然而外面调用的 tryAcquire 和 tryRelease 是由子类来定制的且是不阻塞的可。以认为同步状态的保护、获取、开释动作是由子类实现的性能,而动作胜利与否的后续行为时有 AQS 框架来实现。

咱们能够看见下面应用了 CAS 来进行首节点和尾节点的设置,以达到线程平安的目标。只有尾节点设置胜利了才会返回,否则会始终进行上来。

出列

CLH 同步队列遵循 FIFO 的规定,首节点的线程开释同步状态后。将会唤醒后继结点 next,而后继节点将会尝试获取同步状态,如果获取同步状态胜利,将会将本人设置成首节点 同时须要留神的是,这个过程后继节点会断开需前节点的关联,具体流程形如下图。

总结

从源码能够看出 AQS 实现根本的性能:

1. 同步器根本范式、构造

2. 线程的阻塞、唤醒机制

3. 线程阻塞队列的保护

AQS 尽管实现了 acquire,和 release 办法,然而外面调用的 tryAcquire 和 tryRelease 是由子类来定制的。能够认为同步状态的保护、获取、开释动作是由子类实现的性能,而动作胜利与否的后续行为时有 AQS 框架来实现,还有以下一些公有办法,用于辅助实现以上的性能:


final boolean acquireQueued(final Node node, int arg):申请队列

private Node enq(final Node node) : 入队

private Node addWaiter(Node mode):以 mode 创立创立节点,并退出到队列

private void unparkSuccessor(Node node):唤醒节点的后续节点,如果存在的话。private void doReleaseShared():开释共享锁

private void setHeadAndPropagate(Node node, int propagate):设置头,并且如果是共享模式且 propagate 大于 0,则唤醒后续节点。private void cancelAcquire(Node node):勾销正在获取的节点

private static void selfInterrupt():自我中断

private final boolean parkAndCheckInterrupt():park 并判断线程是否中断

正文完
 0