关于java:JUC解析-抽象队列同步器AQSAbstractQueuedSynchronizer

2次阅读

共计 6271 个字符,预计需要花费 16 分钟才能阅读完成。

形象队列同步器(AQS-AbstractQueuedSynchronizer)

从名字上来了解:

  • 形象: 是抽象类, 具体由子类实现
  • 队列: 数据结构是队列, 应用队列存储数据
  • 同步: 基于它能够实现同步性能

咱们就从这几个方面来动手解读, 但首先, 咱们得先晓得以下几个它的特点, 以便于了解

AbstractQueuedSynchronizer 特点

1.AQS 能够实现独占锁和共享锁。

2. 独占锁 exclusive 是一个乐观锁。保障只有一个线程通过一个阻塞点,只有一个线程能够取得锁。

3. 共享锁 shared 是一个乐观锁。能够容许多个线程阻塞点,能够多个线程同时获取到锁。它容许一个资源能够被多个读操作,或者被一个写操作拜访,然而两个操作不能同时拜访。

4.AQS 应用一个 int 类型的成员变量 state 来示意同步状态,当 state>0 时示意曾经获取了锁,当 state = 0 无锁。它提供了三个办法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态 state 进行操作,能够确保对 state 的操作是平安的。

5.AQS 是通过一个 CLH 队列实现的(CLH 锁即 Craig, Landin, and Hagersten (CLH) locks,CLH 锁是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。CLH 锁也是一种基于链表的可扩大、高性能、偏心的自旋锁,申请线程只在本地变量上自旋,它一直轮询前驱的状态,如果发现前驱开释了锁就完结自旋。)

形象

咱们来扒一扒源码能够看到它继承于 AbstractOwnableSynchronizer 它是一个抽象类.

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

AQS 外部应用了一个 volatile 的变量 state 来作为资源的标识。同时定义了几个获取和扭转 state 的 protected 办法,子类能够笼罩这些办法来实现本人的逻辑.

能够看到类中为咱们提供了几个 protected 级别的办法, 它们别离是:

// 创立一个队列同步器实例, 初始 state 是 0
protected AbstractQueuedSynchronizer() {}

// 返回同步状态的以后值。protected final int getState() {return state;}

// 设置同步状态的值
protected final void setState(int newState) {state = newState;}

// 独占形式。尝试获取资源,胜利则返回 true,失败则返回 false。protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
    }    
    
    
// 独占形式。尝试开释资源,胜利则返回 true,失败则返回 false。protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
    }
    
    
// 共享形式。尝试获取资源。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
    }    
    
    
// 共享形式。尝试开释资源,如果开释后容许唤醒后续期待结点返回 true,否则返回 false。protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
    }
    

这些办法尽管都是 protected 办法,然而它们并没有在 AQS 具体实现,而是间接抛出异样,AQS 实现了一系列次要的逻辑
由此可知,AQS 是一个 形象的 用于 构建锁和同步器 框架 ,应用 AQS 能简略且高效地结构出利用宽泛的同步器,比方咱们提到的ReentrantLockSemaphoreReentrantReadWriteLockSynchronousQueueFutureTask 等等皆是 基于 AQS的。

咱们本人也能利用 AQS 十分轻松容易地结构出自定义的同步器,只有子类实现它的几个 protected 办法就能够了.

队列

AQS 类自身实现的是具体线程期待队列的保护(如 获取资源失败入队 / 唤醒出队 等)。它外部应用了一个先进先出(FIFO)的双端队列(CLH),并应用了两个指针 head 和 tail 用于标识队列的头部和尾部。其数据结构如图:


队列并不是间接贮存线程,而是贮存 领有线程的 Node 节点

咱们来看看 Node 的构造:

static final class Node {
    // 标记一个结点(对应的线程)在共享模式下期待
    static final Node SHARED = new Node();
    // 标记一个结点(对应的线程)在独占模式下期待
    static final Node EXCLUSIVE = null; 

    // waitStatus 的值,示意该结点(对应的线程)已被勾销
    static final int CANCELLED = 1; 
    // waitStatus 的值,示意后继结点(对应的线程)须要被唤醒
    static final int SIGNAL = -1;
    // waitStatus 的值,示意该结点(对应的线程)在期待某一条件
    static final int CONDITION = -2;
    
    //waitStatus 的值,示意有资源可用,新 head 结点须要持续唤醒后继结点
    //(共享模式下,多线程并发开释资源,而 head 唤醒其后继结点后,// 须要把多进去的资源留给前面的结点;设置新的 head 结点时,会持续唤醒其后继结点)static final int PROPAGATE = -3;

    // 期待状态,取值范畴,-3,-2,-1,0,1
    volatile int waitStatus;
    volatile Node prev; // 前驱结点
    volatile Node next; // 后继结点
    volatile Thread thread; // 结点对应的线程
    Node nextWaiter; // 期待队列里下一个期待条件的结点


    // 判断共享模式的办法
    final boolean isShared() {return nextWaiter == SHARED;}

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 其它办法疏忽,能够参考具体的源码
}

// AQS 外面的 addWaiter 公有办法
private Node addWaiter(Node mode) {
    // 应用了 Node 的这个构造函数
    Node node = new Node(Thread.currentThread(), mode);
    // 其它代码省略
}

过 Node 咱们能够实现两个队列,一是通过 prev 和 next 实现 CLH 队列(线程同步队列, 双向队列),二是 nextWaiter 实现 Condition 条件上的期待线程队列(单向队列),这个 Condition 次要用在 ReentrantLock 类中

同步

两种同步形式:

  • 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如 ReentrantLock。
  • 共享模式(Share):同时能够被多个线程获取,具体的资源个数能够通过参数指定。如 Semaphore/CountDownLatch。

同时实现两种模式的同步类,如 ReadWriteLock

获取资源

获取资源的入口是 acquire(int arg)办法。arg 是要获取的资源的个数,在独占模式下始终为 1。

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

首先调用 tryAcquire(arg)尝试去获取资源。后面提到了 这个办法是在子类具体实现的
如果获取资源失败,就通过 addWaiter(Node.EXCLUSIVE)办法把这个线程插入到期待队列中。其中传入的参数代表要插入的 Node 是独占式 的。这个办法的具体实现:

private Node addWaiter(Node mode) {
    // 生成该线程对应的 Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    // 将 Node 插入队列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 应用 CAS 尝试,如果胜利就返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果期待队列为空或者上述 CAS 失败,再自旋 CAS 插入
    enq(node);
    return node;
}

//AQS 中会存在多个线程同时抢夺资源的状况,// 因而必定会呈现多个线程同时插入节点的操作,// 在这里是通过 CAS 自旋的形式保障了操作的线程安全性。// 自旋 CAS 插入期待队列
private Node enq(final Node node) {for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

若设置胜利就代表本人获取到了锁,返回 true。状态为 0 设置 1 的动作在内部就有做过一次,外部再一次做只是晋升概率,而且这样的操作绝对锁来讲不占开销。
如果状态不是 0,则断定以后线程是否为排它锁的 Owner,如果是 Owner 则尝试将状态减少 acquires(也就是减少 1),如果这个状态值越界,则会抛出异样提醒,若没有越界,将状态设置进去后返回 true(实现了相似于偏差的性能,可重入,然而无需进一步征用)。
如果状态不是 0,且本身不是 owner,则返回 false。

当初通过 addWaiter 办法,曾经把一个 Node放到期待队列尾部 了。而处于期待队列的结点是 从头结点一个一个去获取资源的。具体的实现咱们来看看 acquireQueued 办法:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {final Node p = node.predecessor();
            // 如果 node 的前驱结点 p 是 head,示意 node 是第二个结点,就能够尝试去获取资源了
            if (p == head && tryAcquire(arg)) {
                // 拿到资源后,将 head 指向该结点。// 所以 head 所指的结点,就是以后获取到资源的那个结点或 null。setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果本人能够劳动了,就进入 waiting 状态,直到被 unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

这里 parkAndCheckInterrupt 办法外部应用到了 LockSupport.park(this),顺便简略介绍一下 park。
LockSupport 类是 Java 6 引入的一个类,提供了根本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数:
park(boolean isAbsolute, long time):阻塞以后线程
unpark(Thread jthread):使给定的线程进行阻塞

所以结点进入期待队列后,是 调用 park 使它进入阻塞状态 的。只有头结点的线程是处于沉闷状态的。

acquire 办法 获取资源的流程:

当然,获取资源的办法除了 acquire 外,还有以下三个:

  • acquireInterruptibly:申请可中断的资源(独占模式)
  • acquireShared:申请共享模式的资源
  • acquireSharedInterruptibly:申请可中断的资源(共享模式)

    可中断的意思是,在线程中断时可能会抛出 InterruptedException

开释资源

开释资源相比于获取资源来说,会简略许多。在 AQS 中只有一小段实现。

源码:

public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease 办法
这个动作能够认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是 1),如果后果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它的线程有机会进行执行。

在排它锁中,加锁的时候状态会减少 1(当然能够本人批改这个值),在解锁的时候减掉 1,同一个锁,在能够重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种状况下才会返回 true。
这一点大家写代码要留神,如果是在循环体中 lock()或成心应用两次以上的 lock(), 而最终只有一次 unlock(),最终可能无奈开释锁。导致死锁.


private void unparkSuccessor(Node node) {
    // 如果状态是正数,尝试把它设置为 0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 失去头结点的后继结点 head.next
    Node s = node.next;
    // 如果这个后继结点为空或者状态大于 0
    // 通过后面的定义咱们晓得,大于 0 只有一种可能,就是这个结点已被勾销
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 期待队列中所有还有用的结点,都向前挪动
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后继结点不为空,if (s != null)
        LockSupport.unpark(s.thread);
}

办法 unparkSuccessor(Node),意味着真正要开释锁了,它传入的是 head 节点, 外部首先会产生的动作是获取 head 节点的 next 节点,如果获取到的节点不为空,则间接通过:“LockSupport.unpark()”办法来开释对应的被挂起的线程.

关注公众号:java 宝典

正文完
 0