关于java:AQS的源码逐行分析

40次阅读

共计 4225 个字符,预计需要花费 11 分钟才能阅读完成。

AQS 的源码剖析

NonfairSync 加锁源码剖析


    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            // 插队获取,不必排队,能够进步吞吐量
            if (compareAndSetState(0, 1))
                // 获取锁胜利 设置 ExclusiveOwnerThread = currentThread
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 重要 AQS 排队
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {// tryAcquire(1) 模版办法 , 再次尝试加锁, 加锁胜利返回 Ture,失败返回 False
        // addWaiter(null) CAS 自旋 排到队尾
        // acquireQueued CAS 自旋 获取锁 / 中断(park)if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 获取锁后 检测到 interrept = true,自我中断一次
            selfInterrupt();}

````
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
        int c = getState();
        // 再次获取锁,偏心锁须要判断是不是第一个期待的 Node
        if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 获取重入锁,重入锁间接 state + 1 就好
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
// 插入 队列尾部
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 疾速入队 优化
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 自旋 入队
    enq(node);
    return node;
}



/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {for (;;) {
        Node t = tail;
        // Head 为 null 时,先建设 Head
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        // 把以后节点插入最初
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
// CAS 自旋 要么胜利,要么中断
final boolean acquireQueued(final Node node, int arg) {
    // 加锁失败 标记
    boolean failed = true;
    try {
        // 加锁过程中 是否中断过
        boolean interrupted = false;
        for (;;) {
            // node.predecessor 前置节点
            final Node p = node.predecessor();
            // 判断 是否是 第一个排队期待的线程
            // head 个别是 正在解决的线程节点,队列中第二个节点才是第一个排队期待的线程节点
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 没有加锁胜利,判断是否中断
            // 前一个 Node.waitStatus == -1 (前一个线程正在期待锁资源)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}



/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
// 通过 pred.waitStatus 判断 node 是否须要挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 前继节点(上一个线程)还没拿到锁,挂起
    // pred.waitStatus == Node.SIGNAL,前继节点 在期待资源
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        // 删除 node 后面的有效节点
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    // LockSupport.park 中断
    LockSupport.park(this);
    // 返回是否被中断标记
    return Thread.interrupted();}


正文完
 0