关于java:AQS

90次阅读

共计 8178 个字符,预计需要花费 21 分钟才能阅读完成。

文章会首发公众号,搜寻关注:Different Java

摘要

  1. 如何保障线程平安
  2. JDK 并发包中的同步控制
  3. JDK 并发包中锁的特点
  4. 为什么要讲 AQS
  5. AQS 的外围数据结构
  6. AQS 排他锁如何申请
  7. AQS 排他锁如何开释

1. 如何保障线程平安

Java 多线程在对共享资源进行拜访时,如果不加以控制会存在线程平安问题,当咱们应用多线程对共享资源拜访时,通常会线程共享资源的进行拜访线程数的管制:

  • 共享锁:咱们会对共享资源分派许可,只有拿到许可的线程才能够拜访到共享资源,否则就须要期待有许可可用 (当然这里也能够放弃期待)。拿到许可的线程在来到共享区间(不再访问共享资源)时必须要偿还许可,这样处于期待的线程才有可能拿到许可执行。(信号量采纳的就是这种机制)
  • 排他锁:只有一个线程能够拜访到共享资源,其余线程在未拿到排他锁时只能在共享区间外期待。当持有锁的线程开释锁当前,其余期待的线程才有可能获取到锁执行

2. JDK 并发包中的同步控制

Java 提供了大量的工具让咱们对共享资源进行多线程访问控制,其中很多人最相熟的是 synchronized 关键字,除了该关键字,JDK 的并发包中也提供了大量的类来进行同步控制,次要有:

  • ReentrantLock
  • Semaphore
  • ReadWriteLock
  • CountDownLatch
  • CyclicBarrier
  • LockSupport
  • Condition

3. JDK 包中的锁的特点

3.1 ReentrantLock

ReentrantLock 的作用和 synchronized 的作用基本一致,然而 ReentrantLock 应该须要手动 lock(如果锁已被占用则须要期待)和 unlock,并且反对线程中断,反对偏心锁 (依照申请锁的程序取得锁),尝试取得锁的时候能够指定最大等待时间 (如果不指定工夫则不进行期待,拿到锁返回 true,否则返回 false)。

ReentrantLock 搭配 Condition 的 await() 以及 signal() 相当于 synchronized 搭配 Object.wait() 以及 notify()。

3.2 Semaphore

Semaphore(信号量) 容许多个线程同时拜访一个共享资源,在结构信号量时必须指定容许的最大线程数,在应用信号量时,咱们会尝试获取一个许可,如果获取失败,则须要期待,直到有线程开释许可或者以后线程被中断。

3.3 ReadWriteLock

ReadWriteLock(读写锁) 是读写拆散的锁,读与读之间不须要阻塞互斥,但读写、写写之间须要阻塞互斥。ReadWriteLock 中能够获取一个读锁和一个写锁,通过读写锁能够进步读多写少的程序的并发运行效率。

3.4 CountDownLatch

CountDownLatch 是一个倒计数器,在结构 CountDownLatch 时咱们须要传一个数字,通过调用 CountDownLatch 的 await 办法,咱们能够阻塞以后线程,直至倒计数器减到 0,通过 CountDownLatch 的 countdown() 办法,咱们能够将倒计数器的值减 1。

CountDownLatch 的作用就好比乘坐飞机,咱们必须等所有的旅客检票实现当前,而后飞机能力腾飞。

3.4 CyclicBarrier

CyclicBarrier(循环栅栏),循环栅栏的作用相似 CountDownLatch,然而他会在将计数器减到 0 当前从新复原原始计数器的值,循环的概念也就是这么来的,而且循环栅栏反对在计数器减到 0 当前触发一个动作 (Runnable 接口的实现类)。

3.5 LockSupport

LockSupport 是一个线程阻塞工具,机制相似于信号量,通过 park() 能够生产一个许可,如果以后没有许可,线程会被阻塞,unpark() 开释一个许可。LockSupport 中的许可最多只有一个。

LockSupport 不会造成死锁,假如咱们的线程限先执行了 unpark,而后再调用 park 也不会阻塞 (因为 unpark 曾经开释了一个许可可被取得应用)。

Thread.resume() 假如先于 Thread.suspend() 办法先执行,则会导致线程死锁。

4. 为什么要讲 AQS

通过上图咱们能够看出,前文咱们所提到的锁在其外部都有一个 Sync 类 (采纳了组合的形式),而这个 Sync 类都继承自 AbstractQueuedSynchronizer 类。

4.1 AQS 外围数据结构

在 AQS 中与两个外部类:

  • ConditionObject:保留着条件变量期待队列(由 Condition.await() 引起的期待),最终实现也是 Node
  • Node:同步期待队列的具体实现类 (保留在期待在这个锁上的线程)

4.2 Node

Node 的数据结构如下:

  • volatile Node prev:期待链表中的上一个元素
  • volatile Node next:期待链表中的下一个元素
  • volatile Thread thread:以后线程对象
  • Node nextWaiter:下一个期待在条件变量队列中的节点

除了上述 4 个属性外,还有一个重要的属性就是:

  • volatile int waitStatus

该属性示意的是节点在队列中的状态,括号中的为状态的 int 值:

  • 初始状态(0)
  • CANCELLED(1):线程勾销了期待,如果在获得锁的过程中产生了一些异样,则可能呈现勾销的状况,比方期待过程中呈现了中断异样或者 timeout
  • SIGNAL(-1):示意节点须要被唤醒
  • CONDITION(-2):示意线程期待在条件变量中
  • PROPAGATE(-3)

4.3 ConditionObject

因为咱们这篇文章咱们只专一剖析一般的锁,不波及条件变量期待,所以读者敌人们可自行浏览源码。

5. 排他锁申请

上面这张图是 ReentrantLock 应用排他锁的流程图:

上面咱们剖析一下排他锁的应用过程,咱们能够以 ReentrantLock 为切入点,剖析一下 AQS 的实现:

public void lock() {sync.lock();
}

咱们在调用 ReentrantLock 的 lock 办法时,将会调用 Sync 的 lock 办法 (),这里的 Sync 是一个抽象类,咱们这里次要看非偏心锁的实现形式,也就是 NonfairSync 的 lock 办法,如下:

final void lock() {
    // 1
    if (compareAndSetState(0, 1))
    // 2
        setExclusiveOwnerThread(Thread.currentThread());
    else
    // 3
        acquire(1);
}
  1. 通过 CAS 尝试性取得锁(扭转 AQS 中的 state 属性值为 1),如果尝试取得锁胜利,执行第 2 步骤,否则执行第三步骤
  2. CAS 获取锁胜利当前,设置持有锁的线程为以后线程
  3. 如果没有获取到锁,则调用 acquire 尝试性取得许可,因为这里是排他锁,有且只有一个线程能够持有该锁

5.1 acquire

acquire 办法是 AQS 中的一个办法,如下:

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

首先调用 tryAcquire 再次尝试获取锁,该办法在 AQS 中的实现是抛出 UnsupportedOperationException 异样,也就是说子类必须要实现这个办法,上面咱们看一下 tryAcquire 在 ReentrantLock$NonfairSync 办法中的实现,该办法最终会调用 ReentrantLock$Sync 中的 nonfairTryAcquire(int acquires) 办法如下:

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

这里的次要逻辑就是先获取一下锁的状态,如果锁的状态是未占有 (state 值为 0),则再次应用 CAS 尝试去占有锁,如果尝试胜利,则返回 true。

通过 CAS 尝试获取锁失败当前,判断锁是否被以后线程持有,如果是,调用 setState(nextc) 将 AQS 中的 state 的值 +1,而后返回 true(这里解决了重入的问题)。

上述两种状况如果都不成立,则返回 false,示意以后线程获取锁失败。

5.2 acquireQueued 办法

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

当咱们尝试获取锁失败当前,会调用 acquireQueued 该线程增加至期待队列,那么这个期待队列中的 Node 是怎么构建进去的呢?答案就是 addWaiter 办法。

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter 办法中首先构建了一个 Node 节点,Node 节点中的线程为以后线程,而后尝试将该节点增加到 AQS 的期待队列中,如果原来期待队列的尾节点不为 NULL 的会采纳 CAS 疾速失败的办法进行增加,如果胜利增加,则返回这个新节点,否则调用 enq 增加新节点到期待队列中,办法如下:

private Node enq(final Node node) {for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果队列中还没有节点,CAS 初始化一个空的头结点,并把尾节点设置为头结点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // CAS 设置以后节点 node 为尾节点,并把原来的尾节点的 next 指向以后节点 node
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如果队列中临时还没有节点的话,就初始化一个空的头节点,并将尾节点设置为头结点,而后将以后节点 node 的前一个节点改成 tail 节点,并通过 CAS 直至胜利将 node 改为 tail 节点,并且原先的 tail 节点的 next 批改为以后节点 note,而后返回以后节点 node 的前置节点(也就是原来的 tail 节点)。

当节点增加胜利当前,咱们会被动调用 acquireQueued 尝试获取锁,办法如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 开启自旋
        for (;;) {
            // 获取以后节点 node 的前继节点
            final Node p = node.predecessor();
            // 如果前继节点是头结点(虚节点),那么意味着以后节点是第一个数据节点,那么久尝试获取锁,如果获取锁胜利,将以后节点设置为头结点,setHead 办法会将节点中的线程和 prev 节点都置为空
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果 node 的前继节点不是头结点或者获取锁失败,那么须要判断以后节点是否须要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 获取如果产生异样,则勾销获取锁申请
        if (failed)
            cancelAcquire(node);
    }
}

如果判断以后节点线程是否须要挂起,次要依附 shouldParkAfterFailedAcquire 来判断,如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

通过上述源码,咱们能够看到,只有以后节点的前继节点的状态是 SINGAL(待唤醒状态)时,以后节点线程能够被挂起( 这里次要是为了避免有限循环资源节约 )。上述代码还解决前置节点被勾销了非凡状况,将以后节点的前置节点批改为队列中最初一个 waitSatus 不是勾销状态的节点。

将以后节点的线程挂起办法如下,实现采纳的 LockSupport。

private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
    return Thread.interrupted();}

勾销获取所申请的办法是 cancelAcquire,如下:

private void cancelAcquire(Node node) {
    // 过滤有效节点
    if (node == null)
        return;

    node.thread = null;

    // 跳过所有处于勾销的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 找到最初一个无效节点的后继节点
    Node predNext = pred.next;

    // 将以后节点的状态设置为勾销
    node.waitStatus = Node.CANCELLED;

    // 如果以后节点是尾节点,设置尾节点为最初一个无效节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 将最初一个无效节点的 next 设置为 null
        compareAndSetNext(pred, predNext, null);
    } else {
        // 如果最初一个无效节点不是头结点并且(最初一个无效节点是 SINGAL 状态或者能够被设置为 SGINAL 状态),并且最初一个无效节点的线程不为 null
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            // 如果以后节点后的后置节点是无效节点(存在且状态不为勾销状态),设置最初一个无效节点的 next 为以后节点的后继节点
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 上述条件如果均不成立,唤醒以后节点的后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

6. 排他锁开释

public void unlock() {sync.release(1);
}

ReentrantLock 通过 unlock 办法来解锁,该办法会调用 AQS 中的 release 办法,如下:

public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease 是由具体的 AQS 的子类来实现,上面看一下 ReentrantLock 中的 Sync 中的实现,如下:

protected final boolean tryRelease(int releases) {int c = getState() - releases;
    // 只有持有锁的线程才能够执行 unlock 操作
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 解锁胜利,设置独占线程为空
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

上面再回到 AQS 的 release 办法,如下:

public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

如果解锁胜利,并且以后节点状态不是初始状态,而后就调用 unparkSuccessor 办法唤醒 head 后继节点的线程,办法如下:

private void unparkSuccessor(Node node) {

    // 获取以后节点状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 获取节点的后继节点
    Node s = node.next;
    // 如果节点的后继节点为 null 或者曾经被勾销,则从尾节点找到一个未勾销的节点进行唤醒
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

解锁的过程还是绝对比较简单,当一个线程在开释 ReentrantLock 的时候,须要从期待队列中唤醒其余须要唤醒的节点。

本期的 Java AQS 介绍到这,我是 shysh95,咱们下期再见!

正文完
 0