共计 6271 个字符,预计需要花费 16 分钟才能阅读完成。
形象队列同步器(AQS-AbstractQueuedSynchronizer)
从名字上来了解:
- 形象: 是抽象类, 具体由子类实现
- 队列: 数据结构是队列, 应用队列存储数据
- 同步: 基于它能够实现同步性能
咱们就从这几个方面来动手解读, 但首先, 咱们得先晓得以下几个它的特点, 以便于了解
AbstractQueuedSynchronizer 特点
1.AQS 能够实现独占锁和共享锁。
2. 独占锁 exclusive 是一个乐观锁。保障只有一个线程通过一个阻塞点,只有一个线程能够取得锁。
3. 共享锁 shared 是一个乐观锁。能够容许多个线程阻塞点,能够多个线程同时获取到锁。它容许一个资源能够被多个读操作,或者被一个写操作拜访,然而两个操作不能同时拜访。
4.AQS 应用一个 int 类型的成员变量 state 来示意同步状态,当 state>0 时示意曾经获取了锁,当 state = 0 无锁。它提供了三个办法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态 state 进行操作,能够确保对 state 的操作是平安的。
5.AQS 是通过一个 CLH 队列实现的(CLH 锁即 Craig, Landin, and Hagersten (CLH) locks,CLH 锁是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。CLH 锁也是一种基于链表的可扩大、高性能、偏心的自旋锁,申请线程只在本地变量上自旋,它一直轮询前驱的状态,如果发现前驱开释了锁就完结自旋。)
形象
咱们来扒一扒源码能够看到它继承于 AbstractOwnableSynchronizer
它是一个抽象类.
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
AQS 外部应用了一个 volatile 的变量 state
来作为资源的标识。同时定义了几个获取和扭转 state 的 protected 办法,子类能够笼罩这些办法来实现本人的逻辑.
能够看到类中为咱们提供了几个 protected 级别的办法, 它们别离是:
// 创立一个队列同步器实例, 初始 state 是 0
protected AbstractQueuedSynchronizer() {}
// 返回同步状态的以后值。protected final int getState() {return state;}
// 设置同步状态的值
protected final void setState(int newState) {state = newState;}
// 独占形式。尝试获取资源,胜利则返回 true,失败则返回 false。protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
// 独占形式。尝试开释资源,胜利则返回 true,失败则返回 false。protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
// 共享形式。尝试获取资源。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
// 共享形式。尝试开释资源,如果开释后容许唤醒后续期待结点返回 true,否则返回 false。protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
这些办法尽管都是 protected 办法,然而它们并没有在 AQS 具体实现,而是间接抛出异样,AQS 实现了一系列次要的逻辑
由此可知,AQS 是一个 形象的
用于 构建锁和同步器
的框架
,应用 AQS 能简略且高效地结构出利用宽泛的同步器,比方咱们提到的ReentrantLock
,Semaphore
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等等皆是 基于 AQS
的。
咱们本人也能利用 AQS 十分轻松容易地结构出自定义的同步器,只有子类实现它的几个 protected 办法就能够了.
队列
AQS 类自身实现的是具体线程期待队列的保护(如 获取资源失败入队
/ 唤醒出队
等)。它外部应用了一个先进先出(FIFO)的双端队列(CLH),并应用了两个指针 head 和 tail 用于标识队列的头部和尾部。其数据结构如图:
队列并不是间接贮存线程,而是贮存 领有线程的 Node 节点
。
咱们来看看 Node 的构造:
static final class Node {
// 标记一个结点(对应的线程)在共享模式下期待
static final Node SHARED = new Node();
// 标记一个结点(对应的线程)在独占模式下期待
static final Node EXCLUSIVE = null;
// waitStatus 的值,示意该结点(对应的线程)已被勾销
static final int CANCELLED = 1;
// waitStatus 的值,示意后继结点(对应的线程)须要被唤醒
static final int SIGNAL = -1;
// waitStatus 的值,示意该结点(对应的线程)在期待某一条件
static final int CONDITION = -2;
//waitStatus 的值,示意有资源可用,新 head 结点须要持续唤醒后继结点
//(共享模式下,多线程并发开释资源,而 head 唤醒其后继结点后,// 须要把多进去的资源留给前面的结点;设置新的 head 结点时,会持续唤醒其后继结点)static final int PROPAGATE = -3;
// 期待状态,取值范畴,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的线程
Node nextWaiter; // 期待队列里下一个期待条件的结点
// 判断共享模式的办法
final boolean isShared() {return nextWaiter == SHARED;}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 其它办法疏忽,能够参考具体的源码
}
// AQS 外面的 addWaiter 公有办法
private Node addWaiter(Node mode) {
// 应用了 Node 的这个构造函数
Node node = new Node(Thread.currentThread(), mode);
// 其它代码省略
}
过 Node 咱们能够实现两个队列,一是通过 prev 和 next 实现 CLH 队列(线程同步队列, 双向队列),二是 nextWaiter 实现 Condition 条件上的期待线程队列(单向队列),这个 Condition 次要用在 ReentrantLock 类中
同步
两种同步形式:
- 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如 ReentrantLock。
- 共享模式(Share):同时能够被多个线程获取,具体的资源个数能够通过参数指定。如 Semaphore/CountDownLatch。
同时实现两种模式的同步类,如 ReadWriteLock
获取资源
获取资源的入口是 acquire(int arg)办法。arg 是要获取的资源的个数,在独占模式下始终为 1。
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
首先调用 tryAcquire(arg)尝试去获取资源。后面提到了 这个办法是在子类具体实现的
如果获取资源失败,就通过 addWaiter(Node.EXCLUSIVE)办法把这个线程插入到期待队列中。其中传入的参数代表要插入的 Node 是独占式
的。这个办法的具体实现:
private Node addWaiter(Node mode) {
// 生成该线程对应的 Node 节点
Node node = new Node(Thread.currentThread(), mode);
// 将 Node 插入队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 应用 CAS 尝试,如果胜利就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果期待队列为空或者上述 CAS 失败,再自旋 CAS 插入
enq(node);
return node;
}
//AQS 中会存在多个线程同时抢夺资源的状况,// 因而必定会呈现多个线程同时插入节点的操作,// 在这里是通过 CAS 自旋的形式保障了操作的线程安全性。// 自旋 CAS 插入期待队列
private Node enq(final Node node) {for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
若设置胜利就代表本人获取到了锁,返回 true。状态为 0 设置 1 的动作在内部就有做过一次,外部再一次做只是晋升概率,而且这样的操作绝对锁来讲不占开销。
如果状态不是 0,则断定以后线程是否为排它锁的 Owner,如果是 Owner 则尝试将状态减少 acquires(也就是减少 1),如果这个状态值越界,则会抛出异样提醒,若没有越界,将状态设置进去后返回 true(实现了相似于偏差的性能,可重入,然而无需进一步征用)。
如果状态不是 0,且本身不是 owner,则返回 false。
当初通过 addWaiter 办法,曾经把一个 Node放到期待队列尾部
了。而处于期待队列的结点是 从头结点一个一个去获取资源的
。具体的实现咱们来看看 acquireQueued 办法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {final Node p = node.predecessor();
// 如果 node 的前驱结点 p 是 head,示意 node 是第二个结点,就能够尝试去获取资源了
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将 head 指向该结点。// 所以 head 所指的结点,就是以后获取到资源的那个结点或 null。setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果本人能够劳动了,就进入 waiting 状态,直到被 unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
这里 parkAndCheckInterrupt 办法外部应用到了 LockSupport.park(this),顺便简略介绍一下 park。
LockSupport 类是 Java 6 引入的一个类,提供了根本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数:
park(boolean isAbsolute, long time):阻塞以后线程
unpark(Thread jthread):使给定的线程进行阻塞
所以结点进入期待队列后,是 调用 park 使它进入阻塞状态
的。只有头结点的线程是处于沉闷状态的。
acquire 办法 获取资源的流程:
当然,获取资源的办法除了 acquire 外,还有以下三个:
- acquireInterruptibly:申请可中断的资源(独占模式)
- acquireShared:申请共享模式的资源
-
acquireSharedInterruptibly:申请可中断的资源(共享模式)
可中断的意思是,在线程中断时可能会抛出 InterruptedException
开释资源
开释资源相比于获取资源来说,会简略许多。在 AQS 中只有一小段实现。
源码:
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease 办法
这个动作能够认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是 1),如果后果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它的线程有机会进行执行。
在排它锁中,加锁的时候状态会减少 1(当然能够本人批改这个值),在解锁的时候减掉 1,同一个锁,在能够重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种状况下才会返回 true。
这一点大家写代码要留神,如果是在循环体中 lock()或成心应用两次以上的 lock(), 而最终只有一次 unlock(),最终可能无奈开释锁。导致死锁.
private void unparkSuccessor(Node node) {
// 如果状态是正数,尝试把它设置为 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 失去头结点的后继结点 head.next
Node s = node.next;
// 如果这个后继结点为空或者状态大于 0
// 通过后面的定义咱们晓得,大于 0 只有一种可能,就是这个结点已被勾销
if (s == null || s.waitStatus > 0) {
s = null;
// 期待队列中所有还有用的结点,都向前挪动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为空,if (s != null)
LockSupport.unpark(s.thread);
}
办法 unparkSuccessor(Node),意味着真正要开释锁了,它传入的是 head 节点, 外部首先会产生的动作是获取 head 节点的 next 节点,如果获取到的节点不为空,则间接通过:“LockSupport.unpark()”办法来开释对应的被挂起的线程.
关注公众号:java 宝典