共计 9312 个字符,预计需要花费 24 分钟才能阅读完成。
详解 AQS(AbstractQueuedSynchronizer)
一, 是什么, 有什么作用?
概念: 形象队列同步器, 是 Java 一系列锁以及同步器的底层实现框架
作用: 实现像 ReentrantLock,CountDownLatch,Semaphore 这样的工具
二, 类的架构以及实现逻辑图
类架构图
类逻辑图
类中根本属性
/**
* 头节点
*/
private transient volatile Node head;
/**
* 尾节点
*/
private transient volatile Node tail;
/**
* 用户自定义线程状态, 该状态用于各种同步器的实现, 例如 ReentrantLock 的 state 就代表是否获取到资源和已重入 * 次数
*/
private volatile int state;
/**
* 阻塞队列节点对象, 能够看出 AQS 是一个 FIFO 的双向队列
*/
static final class Node {
/**
* 标记该线程是获取共享资源时被阻塞放入 AQS 队列的
*/
static final Node SHARED = new Node();
/**
* 标记该线程时获取独占资源时被阻塞放入 AQS 队列的
*/
static final Node EXCLUSIVE = null;
/**
* 线程被勾销了
*/
static final int CANCELLED = 1;
/**
* 线程须要被唤醒
*/
static final int SIGNAL = -1;
/**
* 线程在条件队列外面期待
*/
static final int CONDITION = -2;
/**
* 开释共享资源时须要告诉其余结点
*/
static final int PROPAGATE = -3;
/**
* 记录以后线程期待状态:
* 1, CANCELLED: 线程被勾销了
* -1, SIGNAL: 线程须要被唤醒
* -2, CONDITION: 线程在条件队列外面期待
* -3, PROPAGATE: 开释共享资源时须要告诉其余结点
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继结点
*/
volatile Node next;
/**
* 用来寄存进入 AQS 外面的线程
*/
volatile Thread thread;
/**
* 下一个期待者
*/
Node nextWaiter;
/**
* 以后线程是否是共享线程
*/
final boolean isShared() {return nextWaiter == SHARED;}
/**
* 获取前一个节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
/**
* 设置线程类型:
* 1, shared
* 2, exclusive
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/**
* 设置线程状态:
* 1, cancelled
* -1, signel
* -2, condition
* -3, propagate
*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
三, 共享操作和独占操作
在 AQS 中, 对于资源的获取能够分为共享式和独占式
共享式: 该资源能够同时被多个线程所持有, 然而如果超出所定义的范畴就会被放入到阻塞队列中
独占式: 该资源每次只能有一个线程所持有, 其余申请该资源的线程都会被退出到阻塞队列中
1, 独占式操作
获取资源: 由子类实现获取资源的逻辑, 如果获取胜利, 间接批改 state 值
获取失败, 首先将以后线程包装为一个独占式的线程节点, 而后插入到队列尾部, 再应用 CLH 算法, 一直轮询获取资源, 如果胜利则返回, 在过程中如果线程被中断是不会响应的(除了应用 Condition 将以后线程放到条件队列中, 而后勾销以后线程的状态)
public final void acquire(int arg) {
/**
* 先试图获取资源, 如果获取不到, 则将其放入队列, 并且应用 LockSupport 的 park 阻塞以后线程
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 尾节点不为空, 将以后节点插入到队列尾部
if (pred != null) {
node.prev = pred;
//CAS 将以后线程插入到队列尾部, 如果胜利间接返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
/**
* 尾节点为空或者插入到尾节点失败, 则应用死循环始终尝试往队列中插入节点, 直至胜利
*/
enq(node);
return node;
}
private Node enq(final Node node) {for (; ;) {
Node t = tail;
//1, 尾节点为空
if (t == null) {//2, 设置头节点(CAS), 并且将头节点和尾节点置为统一
if (compareAndSetHead(new Node()))
tail = head;
} else {
//3, 连贯以后节点和尾节点
node.prev = t;
//4, 将尾节点设置为 t, 并且再连贯 t 和尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 该办法次要用来已独占不间断的形式获取队列中已存在的线程.
* 使线程在队列中获取资源, 始终获取到资源后再返回, 如果在过程中被中断, 则返回 true, 返回到上一级被阻塞, 否则返回 false
* CLH 锁: 是一个基于单链表的高性能, 偏心的自旋锁, 申请线程只在本地变量上自旋, 它一直轮询前驱的状态, 如果发现前驱开释了锁就完结自旋
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ;) {
/**
* 获取以后节点的前一个节点,
* CLH 要害, 始终获取以后节点的前驱节点并处于自旋状态
*/
final Node p = node.predecessor();
/**
* 如果 p 为头节点, 而后以后节点尝试去获取资源,
* 如果获取胜利, 而后设置以后节点为头节点
*/
if (p == head && tryAcquire(arg)) {
// 头节点为以后节点
setHead(node);
// 出队
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 依据以后线程的状态值判断以后线程是否应该被阻塞,
* 如果为 true 则将以后线程阻塞并且将以后线程的终端标记位擦除
* 再到条件块内设置中断标记(并非本线程)
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/**
* 以后线程被中断, 设置以后线程的状态为被勾销
*/
if (failed)
cancelAcquire(node);
}
}
private void cancelAcquire(Node node) {if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors, 跳过被勾销的线程
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 以后节点的后一个节点
Node predNext = pred.next;
// 设置以后线程的状态为被勾销
node.waitStatus = Node.CANCELLED;
// 如果以后节点为尾节点, 删除掉以后节点
if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
} else {
/**
* 以后节点的前一个节点不为头节点并且前一个节点的线程不为空,
* 并且前一个节点的状态为期待唤醒或者前一个节点状态不为被勾销并且设置为期待唤醒胜利
*/
int ws;
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
// 以后节点的下一个节点不为空并且下一个节点的状态为期待唤醒状态, 删除以后节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/**
* 找到一个以后节点之前的状态不为被勾销了的线程
* 能够为须要被唤醒, 在条件队列中期待, 开释资源是须要告诉其余节点
*/
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 如果以后线程的状态不为期待唤醒, 将其设置为期待唤醒
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
return Thread.interrupted();}
开释资源: 调用子类实现的办法开释资源, 开释胜利批改 state, 而后唤醒间隔头节点最近的一个阻塞节点
public final boolean release(int arg) {
/**
* 在这一步就开释资源, 并且开释后唤醒队列的第一个节点
* 应用 LockSupport 的 unpark()办法开释以后线程
*/
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 找出与头节点最近的一个阻塞节点, 并开释
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
/**
* 线程处于阻塞或须要被唤醒的状态
*/
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取以后节点的下一个节点
Node s = node.next;
// 如果以后节点为空或者其不在阻塞队列中
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到间隔尾节点最远的一个阻塞的线程(间隔头节点最近)
if (t.waitStatus <= 0)
s = t;
}
// 找到间隔头节点最近的一个阻塞节点, 将其与 LockSupport 关联, 进入期待状态
if (s != null)
LockSupport.unpark(s.thread);
}
2, 共享式操作
获取资源: 和独占式类似, 不同之处在于, 共享式会对批改后的 state 值进行判断, 如果大于 0 示意还能够持续获取(即共享也不是所有人都能够获取, 有肯定的限量)
开释资源: 和独占式的区别在于, 只有头节点处于待唤醒状态时才会开释
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 将以后线程退出到队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头节点, 则持续尝试获取资源
if (p == head) {int r = tryAcquireShared(arg);
// 获取资源胜利, 共享所在
if (r >= 0) {
// 设置头节点, 并且唤醒其余节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 以后线程在获取资源的过程中被中断, 则设置以后线程中断标记
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 前驱节点不为头节点或者获取资源失败, 须要判断以后线程是否须要阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 获取资源失败则将以后线程置为勾销状态 waitStatus=CANCELLED
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/**
* 流传变量大于 0 或者头节点为空或者头节点处于被唤醒状态
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
/**
* 以后节点的后继节点为空或者其是共享式节点, 则
*/
if (s == null || s.isShared())
doReleaseShared();}
}
private void doReleaseShared() {for (; ;) {
Node h = head;
if (h != null && h != tail) {
// 获取头节点期待状态
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 设置头节点的状态为 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点(从后继节点中找出一个状态值小于 0 的节点)
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
四, 条件变量
1, 是什么?
相似于 Object 类的 wait 和 notify 操作, 然而更加的灵便, 能够指定唤醒具体的线程
2, 类构造
3, 次要办法
挂起线程: await, 将以后线程节点的状态批改为 CONDITION 而后退出到条件队列中
public final void await() throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
// 创立新的节点, 并插入到条件队列尾部
Node node = addConditionWaiter();
// 开释以后线程获取到的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果以后线程的状态为 CONDITION 则将其阻塞
while (!isOnSyncQueue(node)) {
// 将以后线程阻塞
LockSupport.park(this);
/**
* 如果以后线程没有被中断, 则尝试着将 node 节点的状态批改为勾销, 并且放入到 AQS 队列中
* 放入胜利: 抛出异样
* 放入失败: 尝试获取资源, 获取胜利执行中断
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 革除掉曾经勾销了的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 尾节点不为空并且不在条件队列中
if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
// 从新定位尾节点
t = lastWaiter;
}
// 创立一个节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 向链表尾部插入元素
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* 尝试开释锁, 开释胜利, 返回之前的状态
* 如果开释失败将以后 node 线程的状态批改为已勾销 (1) 并抛出异样
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
//
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {throw new IllegalMonitorStateException();
}
} finally {if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
// 以后节点的状态的 CONDITION 并且其为头节点则肯定不在 AQS 队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果以后节点的后继节点不为空则肯定在 AQS 队列中
if (node.next != null) // If has successor, it must be on queue
return true;
// 从后向前遍历以后节点是否在队列中
return findNodeFromTail(node);
}
/**
* 从后向前寻找该节点
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (; ;) {if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
* 删除所有节点状态不为 CONDITION 的节点
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
//help GC
t.nextWaiter = null;
if (trail == null)
// 获取头节点
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
// 指针向后挪动
t = next;
}
}
唤醒线程: 每次唤醒条件队列的队首元素
public final void signal() {
// 如果以后线程没有获取到锁, 间接抛出异样
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 将条件队列的头元素挪动到 AQS 队列
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 判断头节点是否为最初一个节点
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//help GC
first.nextWaiter = null;
// 将头节点的状态批改为 0 并且插入到 AQS 队列中
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 将节点从条件队列放入到期待队列中
*/
final boolean transferForSignal(Node node) {
// 设置 node 节点状态为 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 设置胜利插入到 AQS 队列尾部
Node p = enq(node);
int ws = p.waitStatus;
// 如果 node 线程被勾销 (只有 CANCELLED 大于 0) 或者设置 node 状态为期待唤醒失败, 则持续挂起
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
正文完