关于java:JDK源码分析AbstractQueuedSynchronizer2

35次阅读

共计 8506 个字符,预计需要花费 22 分钟才能阅读完成。

1. 概述

前文「JDK 源码剖析 -AbstractQueuedSynchronizer(1)」初步剖析了 AQS,其中提到了 Node 节点的「独占模式」和「共享模式」,其实 AQS 也次要是围绕对这两种模式的操作进行的。

Node 节点是对线程 Thread 类的封装,因而两种模式能够了解如下:

  • 独占模式(exclusive):线程对资源的拜访是排他的,即某个工夫只能一个线程独自拜访资源;
  • 共享模式(shared):与独占模式不同,多个线程能够同时拜访资源。

本文先剖析独占模式下的各种操作,前面再剖析共享模式。

2. 独占模式

2.1 办法概述

独占模式下的操作次要有以下几个办法(可与后面剖析的 Lock 接口的办法类比):

  1. acquire(int arg)

    以独占模式获取资源,疏忽中断;能够类比 Lock 接口的 lock 办法;

  2. acquireInterruptibly(int arg)

    以独占模式获取资源,响应中断;能够类比 Lock 接口的 lockInterruptibly 办法;

  3. tryAcquireNanos(int arg, long nanosTimeout)

    以独占模式获取资源,响应中断,且有超时期待;能够类比 Lock 接口的 tryLock(long, TimeUnit) 办法;

  4. release(int arg)

    开释资源,能够类比 Lock 接口的 unlock 办法。

2.2 办法剖析

2.2.1 独占模式获取资源(疏忽中断)

这几种获取资源的办法很多中央是相似的。咱们先从 acquire 办法开始剖析,如下:

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

该办法看似很短,其实是外部做了封装。这几行代码蕴含了如下四个操作步骤:

  1. tryAcquire
  2. addWaiter(Node.EXECUSIVE)
  3. acquireQueued(final Node node, arg))
  4. selfInterrupt

下面的四个步骤不肯定全副执行,上面顺次进行剖析。

  • step 1: tryAcquire
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

该办法的作用是尝试以独占模式获取资源,若胜利则返回 true。

能够看到该办法是一个 protected 办法,而且 AQS 中该办法间接抛出了异样,其实是它把实现委托给了子类。这也是 ReentrantLock、CountdownLatch 等类(严格来说是其内部类 Sync)的实现性能不同的中央,这些类正是通过对该办法的不同实现来制订了本人的“游戏规则”。

若 step 1 中的 tryAcquire 办法返回 true,则示意以后线程获取资源胜利,办法间接返回,该线程接下来就能够“随心所欲”了;否则示意获取失败,接下来会顺次执行 step 2 和 step 3。

  • step 2: addWaiter(Node.EXECUSIVE)
private Node addWaiter(Node mode) {
    // 将以后线程封装为一个 Node 节点,指定 mode
    // PS: 独占模式 Node.EXECUSIVE, 共享模式 Node.SHARED
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 通过 CAS 操作设置主队列的尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 尾节点 tail 为 null,示意主队列未初始化
    enq(node);
    return node;
}

enq 办法:

private Node enq(final Node node) {for (;;) {
        Node t = tail;
        // 尾节点为空,表明以后队列未初始化
        if (t == null) { // Must initialize
            // 将队列的头尾节点都设置为一个新的节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将 node 节点插入主队列开端
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

能够看到 addWaiter(Node.EXECUSIVE) 办法的作用是:把以后线程封装成一个独占模式的 Node 节点,并插入到主队列开端(若主队列未初始化,则将其初始化后再插入)。

  • step 3: acquireQueued(final Node node, arg))
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 中断标记位
        boolean interrupted = false;
        for (;;) {
            // 获取该节点的前驱节点
            final Node p = node.predecessor();
            // 若前驱节点为头节点,则尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 若获取胜利,则将该节点设置为头节点并返回
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 若下面条件不满足,即前驱节点不是头节点,或尝试获取失败
            // 判断以后线程是否能够休眠
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

若以后节点的前驱节点为头节点,则会再次尝试获取资源(tryAcuqire),若获取胜利,则将以后节点设置为头节点并返回;否则,若前驱节点不是头节点,或者获取资源失败,则执行如下两个办法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点的期待状态
    int ws = pred.waitStatus;
    // 若前驱节点的期待状态为 SIGNAL,返回 true,示意以后线程能够休眠
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    // 若前驱节点的状态大于 0,示意前驱节点处于勾销(CANCELLED)状态
    // 则将前驱节点跳过(相当于踢出队列)if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         // 此时 waitStatus 只能为 0 或 PROPAGATE 状态,将前驱节点的等着状态设置为 SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

该办法的流程:

  1. 若前驱节点的期待状态为 SIGNAL,返回 true,示意以后线程能够休眠(park);
  2. 若前驱节点是勾销状态 (ws > 0),则将其清理出队列,以此类推;
  3. 若前驱节点为 0 或 PROPAGATE,则将其设置为 SIGNAL 状态。

正如其名,该办法(shouldParkAfterFailedAcquire)的作用就是判断以后线程在获取资源失败后,是否能够休眠(park)。

parkAndCheckInterrupt:

private final boolean parkAndCheckInterrupt() {
    // 将以后线程休眠
    LockSupport.park(this);
    return Thread.interrupted();}

该办法的作用:

  1. 使以后线程休眠(park);
  2. 返回该线程是否被中断(其余线程对其发过中断信号)。

下面就是 acquireQueued(final Node node, arg)) 办法的执行过程,为了便于了解,可参考上面的流程图:

若此期间被其余线程中断过,则此时再去执行 selfInterrupt 办法去响应中断请求:

static void selfInterrupt() {Thread.currentThread().interrupt();}

以上就是 acquire 办法执行的整体流程。

2.2.2 以独占模式获取资源(响应中断)

该操作其实与后面的过程相似,因而剖析绝对简略些,代码如下:

public final void acquireInterruptibly(int arg)
        throws InterruptedException 
    // 若线程被中断过,则抛出异样
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取资源
    if (!tryAcquire(arg))
        // 尝试获取资源失败
        doAcquireInterruptibly(arg);
}

tryAcquire 与后面的操作一样,若尝试获取资源胜利则间接返回;否则,执行 doAcquireInterruptibly:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException 
    // 将以后线程封装成 Node 节点插入主队列开端
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 抛出中断异样
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

通过与后面的 acquire 办法比照能够发现,二者代码简直一样,区别在于 acquire 办法检测到中断(parkAndCheckInterrupt)时只是记录了标记位,并未响应;而此处间接抛出了异样。这也是二者仅有的区别,此处不再详细分析。

2.2.3 以独占模式获取资源(响应中断,且有超时)

该操作与前者也是相似的,代码如下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException 
    // 若被中断,则响应
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

doAcquireNanos:

static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 若超时工夫小于等于 0,间接获取失败
    if (nanosTimeout <= 0L)
        return false;
    // 计算截止工夫
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            // 曾经超时了,获取失败
            if (nanosTimeout <= 0L)
                return false;
            // 若大于自旋工夫,则线程休眠;否则自旋
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 若被中断,则响应
            if (Thread.interrupted())
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

这里有个变量 spinForTimeoutThreshold,示意自旋工夫,若大于该值则将线程休眠,否则持续自旋。集体了解这里减少该工夫是为了提高效率,即,只有在等待时间较长的时候才让线程休眠。

该办法与 acquireInterruptibly 也是相似的,在前者的根底上减少了 timeout,不再详细分析。

2.2.4 开释资源

后面剖析了三种获取资源的形式,天然也有开释资源。上面剖析开释资源的 release 操作:

public final boolean release(int arg) {
    // 尝试开释资源,若胜利则返回 true
    if (tryRelease(arg)) {
        Node h = head;
        // 若头节点不为空,且期待状态不为 0(此时为 SIGNAL)// 则唤醒其后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

与 tryAcquire 办法相似,tryRelease 办法在 AQS 中也是抛出异样,同样交由子类实现:

protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}

unparkSuccessor 的次要作用是唤醒 node 的后继节点,代码如下:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 后继节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        // 若后继节点是勾销状态,则从尾节点向前遍历,找到 node 节点前面一个未勾销状态的节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒 node 节点的后继节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

若 node 节点的后继节点是勾销状态(ws > 0),则从主队列中取其前面一个非勾销状态的线程唤醒。

后面三个获取资源的办法中,finally 代码块中都用到了 cancelAcquire 办法,都是获取失败时的操作,这里也剖析一下:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
        
    node.thread = null;
    
    // Skip cancelled predecessors
    // 跳过勾销状态的前驱节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
        
    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 前驱节点的后继节点援用
    Node predNext = pred.next;
    
    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 将以后节点设置为勾销状态
    node.waitStatus = Node.CANCELLED;
    
    // If we are the tail, remove ourselves.
    // 若该节点为尾节点(前面没其余节点了),将 predNext 指向 null
    if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 前驱节点为头节点,表明以后节点为第一个,勾销时唤醒它的下一个节点
            unparkSuccessor(node);
        }
        
        node.next = node; // help GC
    }
}

该办法的次要操作:

  1. 将 node 节点设置为勾销(CANCELLED)状态;
  2. 找到它在队列中非勾销状态的前驱节点 pred:

    1. 若 node 节点是尾节点,则前驱节点的后继设为空,
    2. 若 pred 不是头节点,且状态为 SIGNAL,则后继节点设为 node 的后继节点;
    3. 若 pred 是头节点,则唤醒 node 的后继节点。

PS: 该过程能够跟双链表删除一个节点的过程进行比照剖析。

3. 小结

本文剖析了以独占模式获取资源的三种形式,以及开释资源的操作。别离为:

  1. acquire: 独占模式获取资源,疏忽中断;
  2. acquireInterruptibly: 独占模式获取资源,响应中断;
  3. tryAcquireNanos: 独占模式获取资源,响应中断,有超时;
  4. release: 开释资源,唤醒主队列中的下一个线程。

这几个办法都能够类比 Lock 接口的相干办法定义。

相干浏览:

JDK 源码剖析 -AbstractQueuedSynchronizer(1)

JDK 源码剖析 -Lock&Condition

正文完
 0