文章会首发公众号,搜寻关注:Different Java

摘要

  1. 如何保障线程平安
  2. JDK并发包中的同步控制
  3. JDK并发包中锁的特点
  4. 为什么要讲AQS
  5. AQS的外围数据结构
  6. AQS排他锁如何申请
  7. AQS排他锁如何开释

1. 如何保障线程平安

Java多线程在对共享资源进行拜访时,如果不加以控制会存在线程平安问题,当咱们应用多线程对共享资源拜访时,通常会线程共享资源的进行拜访线程数的管制:

  • 共享锁:咱们会对共享资源分派许可,只有拿到许可的线程才能够拜访到共享资源,否则就须要期待有许可可用(当然这里也能够放弃期待)。拿到许可的线程在来到共享区间(不再访问共享资源)时必须要偿还许可,这样处于期待的线程才有可能拿到许可执行。(信号量采纳的就是这种机制)
  • 排他锁:只有一个线程能够拜访到共享资源,其余线程在未拿到排他锁时只能在共享区间外期待。当持有锁的线程开释锁当前,其余期待的线程才有可能获取到锁执行

2. JDK并发包中的同步控制

Java提供了大量的工具让咱们对共享资源进行多线程访问控制,其中很多人最相熟的是synchronized关键字,除了该关键字,JDK的并发包中也提供了大量的类来进行同步控制,次要有:

  • ReentrantLock
  • Semaphore
  • ReadWriteLock
  • CountDownLatch
  • CyclicBarrier
  • LockSupport
  • Condition

3. JDK包中的锁的特点

3.1 ReentrantLock

ReentrantLock的作用和synchronized的作用基本一致,然而ReentrantLock应该须要手动lock(如果锁已被占用则须要期待)和unlock,并且反对线程中断,反对偏心锁(依照申请锁的程序取得锁),尝试取得锁的时候能够指定最大等待时间(如果不指定工夫则不进行期待,拿到锁返回true,否则返回false)。

ReentrantLock搭配Condition的await()以及signal()相当于synchronized搭配Object.wait()以及notify()。

3.2 Semaphore

Semaphore(信号量)容许多个线程同时拜访一个共享资源,在结构信号量时必须指定容许的最大线程数,在应用信号量时,咱们会尝试获取一个许可,如果获取失败,则须要期待,直到有线程开释许可或者以后线程被中断。

3.3 ReadWriteLock

ReadWriteLock(读写锁)是读写拆散的锁,读与读之间不须要阻塞互斥,但读写、写写之间须要阻塞互斥。ReadWriteLock中能够获取一个读锁和一个写锁,通过读写锁能够进步读多写少的程序的并发运行效率。

3.4 CountDownLatch

CountDownLatch是一个倒计数器,在结构CountDownLatch时咱们须要传一个数字,通过调用CountDownLatch的await办法,咱们能够阻塞以后线程,直至倒计数器减到0,通过CountDownLatch的countdown()办法,咱们能够将倒计数器的值减1。

CountDownLatch的作用就好比乘坐飞机,咱们必须等所有的旅客检票实现当前,而后飞机能力腾飞。

3.4 CyclicBarrier

CyclicBarrier(循环栅栏),循环栅栏的作用相似CountDownLatch,然而他会在将计数器减到0当前从新复原原始计数器的值,循环的概念也就是这么来的,而且循环栅栏反对在计数器减到0当前触发一个动作(Runnable接口的实现类)。

3.5 LockSupport

LockSupport是一个线程阻塞工具,机制相似于信号量,通过park()能够生产一个许可,如果以后没有许可,线程会被阻塞,unpark()开释一个许可。LockSupport中的许可最多只有一个。

LockSupport不会造成死锁,假如咱们的线程限先执行了unpark,而后再调用park也不会阻塞(因为unpark曾经开释了一个许可可被取得应用)。

Thread.resume()假如先于Thread.suspend()办法先执行,则会导致线程死锁。

4. 为什么要讲AQS

通过上图咱们能够看出,前文咱们所提到的锁在其外部都有一个Sync类(采纳了组合的形式),而这个Sync类都继承自AbstractQueuedSynchronizer类。

4.1 AQS外围数据结构

在AQS中与两个外部类:

  • ConditionObject:保留着条件变量期待队列(由Condition.await()引起的期待),最终实现也是Node
  • Node:同步期待队列的具体实现类(保留在期待在这个锁上的线程)

4.2 Node

Node的数据结构如下:

  • volatile Node prev:期待链表中的上一个元素
  • volatile Node next:期待链表中的下一个元素
  • volatile Thread thread:以后线程对象
  • Node nextWaiter:下一个期待在条件变量队列中的节点

除了上述4个属性外,还有一个重要的属性就是:

  • volatile int waitStatus

该属性示意的是节点在队列中的状态,括号中的为状态的int值:

  • 初始状态(0)
  • CANCELLED(1):线程勾销了期待,如果在获得锁的过程中产生了一些异样,则可能呈现勾销的状况,比方期待过程中呈现了中断异样或者timeout
  • SIGNAL(-1):示意节点须要被唤醒
  • CONDITION(-2):示意线程期待在条件变量中
  • PROPAGATE(-3)

4.3 ConditionObject

因为咱们这篇文章咱们只专一剖析一般的锁,不波及条件变量期待,所以读者敌人们可自行浏览源码。

5. 排他锁申请

上面这张图是ReentrantLock应用排他锁的流程图:

上面咱们剖析一下排他锁的应用过程,咱们能够以ReentrantLock为切入点,剖析一下AQS的实现:

public void lock() {    sync.lock();}

咱们在调用ReentrantLock的lock办法时,将会调用Sync的lock办法(),这里的Sync是一个抽象类,咱们这里次要看非偏心锁的实现形式,也就是NonfairSync的lock办法,如下:

final void lock() {    // 1    if (compareAndSetState(0, 1))    // 2        setExclusiveOwnerThread(Thread.currentThread());    else    // 3        acquire(1);}
  1. 通过CAS尝试性取得锁(扭转AQS中的state属性值为1),如果尝试取得锁胜利,执行第2步骤,否则执行第三步骤
  2. CAS获取锁胜利当前,设置持有锁的线程为以后线程
  3. 如果没有获取到锁,则调用acquire尝试性取得许可,因为这里是排他锁,有且只有一个线程能够持有该锁

5.1 acquire

acquire办法是AQS中的一个办法,如下:

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

首先调用tryAcquire再次尝试获取锁,该办法在AQS中的实现是抛出UnsupportedOperationException异样,也就是说子类必须要实现这个办法,上面咱们看一下tryAcquire在ReentrantLock$NonfairSync办法中的实现,该办法最终会调用ReentrantLock$Sync中的nonfairTryAcquire(int acquires)办法如下:

final boolean nonfairTryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {        if (compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}

这里的次要逻辑就是先获取一下锁的状态,如果锁的状态是未占有(state值为0),则再次应用CAS尝试去占有锁,如果尝试胜利,则返回true。

通过CAS尝试获取锁失败当前,判断锁是否被以后线程持有,如果是,调用setState(nextc)将AQS中的state的值+1,而后返回true(这里解决了重入的问题)。

上述两种状况如果都不成立,则返回false,示意以后线程获取锁失败。

5.2 acquireQueued办法

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

当咱们尝试获取锁失败当前,会调用acquireQueued该线程增加至期待队列,那么这个期待队列中的Node是怎么构建进去的呢?答案就是addWaiter办法。

private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    enq(node);    return node;}

addWaiter办法中首先构建了一个Node节点,Node节点中的线程为以后线程,而后尝试将该节点增加到AQS的期待队列中,如果原来期待队列的尾节点不为NULL的会采纳CAS疾速失败的办法进行增加,如果胜利增加,则返回这个新节点,否则调用enq增加新节点到期待队列中,办法如下:

private Node enq(final Node node) {    for (;;) {        Node t = tail;        if (t == null) { // Must initialize            // 如果队列中还没有节点,CAS初始化一个空的头结点,并把尾节点设置为头结点            if (compareAndSetHead(new Node()))                tail = head;        } else {            node.prev = t;            // CAS设置以后节点node为尾节点,并把原来的尾节点的next指向以后节点node            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

如果队列中临时还没有节点的话,就初始化一个空的头节点,并将尾节点设置为头结点,而后将以后节点node的前一个节点改成tail节点,并通过CAS直至胜利将node改为tail节点,并且原先的tail节点的next批改为以后节点note,而后返回以后节点node的前置节点(也就是原来的tail节点)。

当节点增加胜利当前,咱们会被动调用acquireQueued尝试获取锁,办法如下:

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        // 开启自旋        for (;;) {            // 获取以后节点node的前继节点            final Node p = node.predecessor();            // 如果前继节点是头结点(虚节点),那么意味着以后节点是第一个数据节点,那么久尝试获取锁,如果获取锁胜利,将以后节点设置为头结点,setHead办法会将节点中的线程和prev节点都置为空            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            // 如果node的前继节点不是头结点或者获取锁失败,那么须要判断以后节点是否须要挂起            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        // 获取如果产生异样,则勾销获取锁申请        if (failed)            cancelAcquire(node);    }}

如果判断以后节点线程是否须要挂起,次要依附shouldParkAfterFailedAcquire来判断,如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)        /*         * This node has already set status asking a release         * to signal it, so it can safely park.         */        return true;    if (ws > 0) {        /*         * Predecessor was cancelled. Skip over predecessors and         * indicate retry.         */        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {        /*         * waitStatus must be 0 or PROPAGATE.  Indicate that we         * need a signal, but don't park yet.  Caller will need to         * retry to make sure it cannot acquire before parking.         */        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

通过上述源码,咱们能够看到,只有以后节点的前继节点的状态是SINGAL(待唤醒状态)时,以后节点线程能够被挂起(这里次要是为了避免有限循环资源节约)。上述代码还解决前置节点被勾销了非凡状况,将以后节点的前置节点批改为队列中最初一个waitSatus不是勾销状态的节点。

将以后节点的线程挂起办法如下,实现采纳的LockSupport。

private final boolean parkAndCheckInterrupt() {    LockSupport.park(this);    return Thread.interrupted();}

勾销获取所申请的办法是cancelAcquire,如下:

private void cancelAcquire(Node node) {    // 过滤有效节点    if (node == null)        return;    node.thread = null;    // 跳过所有处于勾销的节点    Node pred = node.prev;    while (pred.waitStatus > 0)        node.prev = pred = pred.prev;    // 找到最初一个无效节点的后继节点    Node predNext = pred.next;    // 将以后节点的状态设置为勾销    node.waitStatus = Node.CANCELLED;    // 如果以后节点是尾节点,设置尾节点为最初一个无效节点    if (node == tail && compareAndSetTail(node, pred)) {        // 将最初一个无效节点的next设置为null        compareAndSetNext(pred, predNext, null);    } else {        // 如果最初一个无效节点不是头结点并且(最初一个无效节点是SINGAL状态或者能够被设置为SGINAL状态),并且最初一个无效节点的线程不为null        int ws;        if (pred != head &&            ((ws = pred.waitStatus) == Node.SIGNAL ||             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&            pred.thread != null) {            Node next = node.next;            // 如果以后节点后的后置节点是无效节点(存在且状态不为勾销状态),设置最初一个无效节点的next为以后节点的后继节点            if (next != null && next.waitStatus <= 0)                compareAndSetNext(pred, predNext, next);        } else {            // 上述条件如果均不成立,唤醒以后节点的后继节点            unparkSuccessor(node);        }        node.next = node; // help GC    }}

6. 排他锁开释

public void unlock() {    sync.release(1);}

ReentrantLock通过unlock办法来解锁,该办法会调用AQS中的release办法,如下:

public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

tryRelease是由具体的AQS的子类来实现,上面看一下ReentrantLock中的Sync中的实现,如下:

protected final boolean tryRelease(int releases) {    int c = getState() - releases;    // 只有持有锁的线程才能够执行unlock操作    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    boolean free = false;    if (c == 0) {        // 解锁胜利,设置独占线程为空        free = true;        setExclusiveOwnerThread(null);    }    setState(c);    return free;}

上面再回到AQS的release办法,如下:

public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

如果解锁胜利,并且以后节点状态不是初始状态,而后就调用unparkSuccessor办法唤醒head后继节点的线程,办法如下:

private void unparkSuccessor(Node node) {    // 获取以后节点状态    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 获取节点的后继节点    Node s = node.next;    // 如果节点的后继节点为null或者曾经被勾销,则从尾节点找到一个未勾销的节点进行唤醒    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)        // 唤醒线程        LockSupport.unpark(s.thread);}

解锁的过程还是绝对比较简单,当一个线程在开释ReentrantLock的时候,须要从期待队列中唤醒其余须要唤醒的节点。

本期的Java AQS介绍到这,我是shysh95,咱们下期再见!