AQS的源码剖析
NonfairSync 加锁源码剖析
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { // 插队获取,不必排队,能够进步吞吐量 if (compareAndSetState(0, 1)) // 获取锁胜利 设置 ExclusiveOwnerThread = currentThread setExclusiveOwnerThread(Thread.currentThread()); else // 重要 AQS 排队 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { // tryAcquire(1) 模版办法 , 再次尝试加锁,加锁胜利返回Ture,失败返回False // addWaiter(null) CAS自旋 排到队尾 // acquireQueued CAS自旋 获取锁 / 中断(park) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取锁后 检测到interrept = true,自我中断一次 selfInterrupt(); }````
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 再次获取锁 ,偏心锁须要判断是不是第一个期待的Node if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 获取重入锁,重入锁间接 state + 1 就好 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */// 插入 队列尾部private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 疾速入队 优化 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 自旋 入队 enq(node); return node;}/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */private Node enq(final Node node) { for (;;) { Node t = tail; // Head 为 null 时,先建设Head if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; // 把以后节点插入最初 } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */// CAS 自旋 要么胜利,要么中断final boolean acquireQueued(final Node node, int arg) { // 加锁失败 标记 boolean failed = true; try { // 加锁过程中 是否中断过 boolean interrupted = false; for (;;) { // node.predecessor 前置节点 final Node p = node.predecessor(); // 判断 是否是 第一个排队期待的线程 // head 个别是 正在解决的线程节点,队列中第二个节点才是第一个排队期待的线程节点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 没有加锁胜利,判断是否中断 // 前一个Node.waitStatus == -1 (前一个线程正在期待锁资源) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */// 通过 pred.waitStatus 判断 node 是否须要挂起private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前继节点(上一个线程)还没拿到锁,挂起 // pred.waitStatus == Node.SIGNAL , 前继节点 在期待资源 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 删除 node 后面的有效节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */private final boolean parkAndCheckInterrupt() { // LockSupport.park 中断 LockSupport.park(this); // 返回是否被中断标记 return Thread.interrupted();}