AQS的源码剖析

NonfairSync 加锁源码剖析

    static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        /**         * Performs lock.  Try immediate barge, backing up to normal         * acquire on failure.         */        final void lock() {            // 插队获取,不必排队,能够进步吞吐量            if (compareAndSetState(0, 1))                // 获取锁胜利 设置 ExclusiveOwnerThread = currentThread                setExclusiveOwnerThread(Thread.currentThread());            else                // 重要 AQS 排队                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }
    /**     * Acquires in exclusive mode, ignoring interrupts.  Implemented     * by invoking at least once {@link #tryAcquire},     * returning on success.  Otherwise the thread is queued, possibly     * repeatedly blocking and unblocking, invoking {@link     * #tryAcquire} until success.  This method can be used     * to implement method {@link Lock#lock}.     *     * @param arg the acquire argument.  This value is conveyed to     *        {@link #tryAcquire} but is otherwise uninterpreted and     *        can represent anything you like.     */    public final void acquire(int arg) {        // tryAcquire(1) 模版办法 , 再次尝试加锁,加锁胜利返回Ture,失败返回False        // addWaiter(null) CAS自旋 排到队尾        // acquireQueued CAS自旋 获取锁 / 中断(park)        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            // 获取锁后 检测到interrept = true,自我中断一次            selfInterrupt();    }````
    /**     * Performs non-fair tryLock.  tryAcquire is implemented in     * subclasses, but both need nonfair try for trylock method.     */    final boolean nonfairTryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        // 再次获取锁 ,偏心锁须要判断是不是第一个期待的Node        if (c == 0) {            if (compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        }        // 获取重入锁,重入锁间接 state + 1 就好        else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            if (nextc < 0) // overflow                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */// 插入 队列尾部private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    // 疾速入队 优化    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    // 自旋 入队    enq(node);    return node;}/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */private Node enq(final Node node) {    for (;;) {        Node t = tail;        // Head 为 null 时,先建设Head        if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;        // 把以后节点插入最初        } else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */// CAS 自旋 要么胜利,要么中断final boolean acquireQueued(final Node node, int arg) {    // 加锁失败 标记    boolean failed = true;    try {        // 加锁过程中 是否中断过        boolean interrupted = false;        for (;;) {            // node.predecessor 前置节点            final Node p = node.predecessor();            // 判断 是否是 第一个排队期待的线程            // head 个别是 正在解决的线程节点,队列中第二个节点才是第一个排队期待的线程节点            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            // 没有加锁胜利,判断是否中断            // 前一个Node.waitStatus == -1 (前一个线程正在期待锁资源)            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops.  Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */// 通过 pred.waitStatus 判断 node 是否须要挂起private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    // 前继节点(上一个线程)还没拿到锁,挂起    // pred.waitStatus == Node.SIGNAL , 前继节点 在期待资源    if (ws == Node.SIGNAL)        /*         * This node has already set status asking a release         * to signal it, so it can safely park.         */        return true;    if (ws > 0) {        /*         * Predecessor was cancelled. Skip over predecessors and         * indicate retry.         */        // 删除 node 后面的有效节点        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {        /*         * waitStatus must be 0 or PROPAGATE.  Indicate that we         * need a signal, but don't park yet.  Caller will need to         * retry to make sure it cannot acquire before parking.         */        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */private final boolean parkAndCheckInterrupt() {    // LockSupport.park 中断    LockSupport.park(this);    // 返回是否被中断标记    return Thread.interrupted();}