AQS 的源码剖析
NonfairSync 加锁源码剖析
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 插队获取,不必排队,能够进步吞吐量
if (compareAndSetState(0, 1))
// 获取锁胜利 设置 ExclusiveOwnerThread = currentThread
setExclusiveOwnerThread(Thread.currentThread());
else
// 重要 AQS 排队
acquire(1);
}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {// tryAcquire(1) 模版办法 , 再次尝试加锁, 加锁胜利返回 Ture,失败返回 False
// addWaiter(null) CAS 自旋 排到队尾
// acquireQueued CAS 自旋 获取锁 / 中断(park)if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 获取锁后 检测到 interrept = true,自我中断一次
selfInterrupt();}
````
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
// 再次获取锁,偏心锁须要判断是不是第一个期待的 Node
if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
// 获取重入锁,重入锁间接 state + 1 就好
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
// 插入 队列尾部
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 疾速入队 优化
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 自旋 入队
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {for (;;) {
Node t = tail;
// Head 为 null 时,先建设 Head
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
// 把以后节点插入最初
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
// CAS 自旋 要么胜利,要么中断
final boolean acquireQueued(final Node node, int arg) {
// 加锁失败 标记
boolean failed = true;
try {
// 加锁过程中 是否中断过
boolean interrupted = false;
for (;;) {
// node.predecessor 前置节点
final Node p = node.predecessor();
// 判断 是否是 第一个排队期待的线程
// head 个别是 正在解决的线程节点,队列中第二个节点才是第一个排队期待的线程节点
if (p == head && tryAcquire(arg)) {setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 没有加锁胜利,判断是否中断
// 前一个 Node.waitStatus == -1 (前一个线程正在期待锁资源)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
// 通过 pred.waitStatus 判断 node 是否须要挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前继节点(上一个线程)还没拿到锁,挂起
// pred.waitStatus == Node.SIGNAL,前继节点 在期待资源
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 删除 node 后面的有效节点
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// LockSupport.park 中断
LockSupport.park(this);
// 返回是否被中断标记
return Thread.interrupted();}