关于java:JAVA并发编程AbstractQueuedSynchronizer之AQS概念和源码阅读

3次阅读

共计 4215 个字符,预计需要花费 11 分钟才能阅读完成。

1.AQS 是什么

2. 能干什么

3.AQS 为什么是 JUC 内容中最重要的基石

4.AQS 外部体系架构

5. 从咱们的 ReentrantLock 开始解读 AQS

6. 总结

1.AQS 是什么
AQS————全称 AbstractQueuedSynchronizer, 形象队列同步器
咱们能够看一下源码中的解释:

意思就是说,它是用来构建锁或者其它同步器组件的重量级根底框架及整个 JUC 的基石 ,通过 内置的 FIFO 队列来实现资源获取线程的排队工作 ,并通过 一个 int 类变量示意持有锁的状态。

简而言之
1) 这是一个 基石框架 (模板办法设计模式)
2) 就是将获取资源的线程排了个队,让它有序获取资源。

2. 能干什么
假如咱们在应用 ReentrantLock 的时候,必然会有线程阻塞,有线程阻塞那就必须排队,就得须要队列。
抢占资源 失败 的线程 持续去期待 ,然而 等待线程任然保留获取锁的可能 获取锁的流程还在持续。
这就是 AQS,题咱们实现了构建锁和队列,以及让线程有序获取资源。

3.AQS 为什么是 JUC 内容中最重要的基石

咱们先来看看实现 AQS 的类具体有哪些:

有 ReentrantLock,CountDownLatch,ReentrantReadWriteLock,Semaphore 等等。

此时咱们要理清一个关系:
锁:面向的都是锁的使用者,定义了程序员和锁交互的应用层 api,暗藏了实现细节,开发者调用即可。

同步器:面向锁的实现者,比方并发大神 DougLee, 提出一套标准,并简化了锁的实现,屏蔽了同步状态的治理,阻塞线程排队和告诉,唤醒机制等等。

4.AQS 外部体系架构

让咱们点到源码外面看一下:

咱们搜一下上面这个变量

/**
* The synchronization state.
*/
private volatile int state;

AQS 就是应用下面这一个 volatile 的 int 类型的变量来示意被占用的锁的同步状态

再搜一下上面这个变量:

static final class Node {

// 还有一些变量没有放进去,重点解释这几个

volatile Node prev;

volatile Node next;

volatile Thread thread;
//....

}

AQS 应用这个 链表的 FIFO 对垒来实现资源获取的排队工作,将每条要去抢占的线程封装成一个 Node 节点来实现锁的调配,通过 CAS 实现对 state 值的批改。

也就是说,咱们通过 队列(治理须要排队的线程)+state 变量(治理公共资源类)实现了 AQS 的根本构造。

咱们再来看一下 Node 外部:

volatile int waitStatus;

Node 通过 waitState 的成员变量来阻塞或者唤醒队列中的线程,waitStatus 的状态如图所示,不过也能够看源码正文

最初吗,咱们用一张图来展现,AQS 的根本构造

5. 从咱们的 ReentrantLock 开始解读 AQS

咱们从 ReentrantLock 作为一个突破点,来浏览一下 AQS 的源码。

ReentrantLock lock = new ReentrantLock();
lock.lock();
try {TimeUnit.MINUTES.sleep(60);   
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {lock.unlock();
        }

咱们点击 lock,能够看出,lock 底层是调用了一个 sync 变量的一个 lock 办法

    public void lock() {sync.lock();
    }

而 sync:

abstract static class Sync extends AbstractQueuedSynchronizer {//......}

能够看出:
Lock 接口的实现类,根本就是通过聚合了一个队列同步器的子类实现线程的访问控制的。

咱们持续往 lock 外面点,会发现有一个偏心锁和一个非偏心锁:

而后咱们持续往里点击,查看抢占锁的 acquire 办法,会发现会有图上的区别

非偏心锁抢占资源的时候,不须要断定队列后面是否还有排队线程。

偏心锁抢占资源的时候,须要断定度列前是否有排队线程。

那么其实,咱们只须要看懂非偏心锁的源码,其实也就看懂了偏心锁的源码。

咱们先梳理一下获取锁的整体流程:

咱们从 NonfairSync 的 lock 办法开始看:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            // 以后线程尝试着将锁资源的状态从 0 变为 1
            // 如果胜利
            // 将资源占用的线程设置为本线程
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            // 如果失败
            // 走 acquire 办法
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
        }
    }

持续查看 acquire(1)办法

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

tryAcquire:

        protected final boolean tryAcquire(int acquires) {
            // 点击这个办法来到上面这个办法
            return nonfairTryAcquire(acquires);
        }


        final boolean nonfairTryAcquire(int acquires) {
            // 先取得以后线程
            final Thread current = Thread.currentThread();
            int c = getState();
            // 如果以后锁的状态为 0(未被占用状态)if (c == 0) {
                // 尝试应用 cas 去占用
                if (compareAndSetState(0, acquires)) {
                    // 设置占用该资源的线程为本线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果以后锁的状态不为 0,然而占用该锁的线程是本线程
            else if (current == getExclusiveOwnerThread()) {
                // 相当于可重入锁了,而后状态标记位 +1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

addWaiter:

    private Node addWaiter(Node mode) {
       // 将以后线程设置为一个 Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
       // 如果尾节点不为空
        Node pred = tail;
        if (pred != null) {
            // 将本线程的上一个节点设置为尾节点
            node.prev = pred;
            // 应用 cas 替换尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果尾节点为空,则初始化
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        // 死循环
        for (;;) {
            Node t = tail;
            // 如果尾节点是空,则创立一个傀儡节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
             // 如果尾节点不为空,则替换尾节点并返回,跳出循环
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {final Node p = node.predecessor();
                // 如果 node 的前一个节点是头节点(队列前再无其它节点),且抢锁胜利
                if (p == head && tryAcquire(arg)) { 
                    // 设置以后的节点为头结点
                    setHead(node);
                    // 原来的头结点的下一个节点置空,帮忙 GC
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果 node 的前一个节点不是投节点
                // 更改以后节点的运行状态
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 暂停该线程
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

selfInterrupt:

    返回该线程的中断状态
    static void selfInterrupt() {Thread.currentThread().interrupt();}

再来看看 unlock 办法:

    public final boolean release(int arg) {
         // 尝试开释锁,如果锁的状态是 0,则胜利开释
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒头结点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

6. 总结

明天咱们理解了 AQS 概念和源码浏览,AQS 作为整个 JUC 的基石框架,是十分重要的。

正文完
 0