AQS系列二源码分析公平ReentrantLock和Condition

3次阅读

共计 7824 个字符,预计需要花费 20 分钟才能阅读完成。

上篇文章 AQS 系列一:源码分析非公平 ReentrantLock 中,我们分析了 ReentrantLock 的非公平实现,本篇会承接上文,继续分析 ReentrantLock 的公平锁实现(以及 Condition 的实现)。

在此之前我们要先弄明白,“不公平”体现在哪里。

为何“不公”

好吧,我也不清楚。
于是我对比了 ReentrantLock 的非公平和公平实现,即 NonfairSync vs FairSync,发现 差别主要体现 在加锁,更确切的说是 获取锁环节

## 非公平获取锁
final boolean nonfairTryAcquire(int acquires) {
    ...
    if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
            return true;
        }
    }
    ...
}
## 公平获取锁
protected final boolean tryAcquire(int acquires) {
    ...
    if (!hasQueuedPredecessors() //### 差别体现在此处
        && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
        return true;
    }
    ...
}

显然,公平锁多执行了!hasQueuedPredecessors(),看看此方法的逻辑。

public final boolean hasQueuedPredecessors() {
    Node h, s;
    if ((h = head) != null) { ## h 头结点
        if ((s = h.next) == null ... ) { ## s 二号节点
            ...
        }
        if (s != null && s.thread != Thread.currentThread()) ## 检查 2 号节点绑定线程,是否当前线程
            return true;
    }
    return false;
}

hasQueuedPredecessors 方法只有在 2 号节点不为空,且绑定线程非当前线程的前提下,会返回 true
返回 ture 意味着!hasQueuedPredecessors() = false,没有资格获取锁(就是没机会执行 compareAndSetState——尝试修改 state)

反过来讲,没有队列(无线程正在执行),或者没有 2 号节点(取消或者临时状态),再或者 2 号节点的绑定线程就是当前线程 时,才 会尝试获取锁

我们分析下最后这种情况,2 号节点绑定的线程是第 1 个等待的线程(第 1 个获取锁失败的线程),第 1 个等待的线程 在 hasQueuedPredecessors()的运作下,成为了第 1 个有资格尝试获取锁的线程 。而这,就是 公平

那么没有 hasQueuedPredecessors 方法的非公平锁,到底“不公平”在哪儿呢?
我们回想一下,在加 / 解锁的过程中,nonfairTryAcquire 方法被调用的位置就能得到答案了。

public final void acquire(int arg) {if (!tryAcquire(arg) ### 位置 1,尝试获取
        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    ...
        for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { ### 位置 2,尝试获取
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();}
    ...
}

在上述代码中,tryAcquire(非公平实现会调用 nonfairTryAcquire)会在位置 1、2 两处触发。试想如下场景:

  • 线程 T -3执行完毕,调用了 unlock;随着 线程 T -2被唤醒,位置 2 处代码可能会被执行。
  • 与此同时,随着新的 线程 T -1的介入,位置 1 处的代码也有可能被执行。

因此 线程 T -2T-1 谁能在并发中抢到锁,存在不确定性

  • 如果线程 T - 2 先执行了,T- 1 失败于位置 1 处,后续会阻塞于队列尾部;
  • 如果线程 T - 1 先执行了,T- 2 失败于位置 2 处,面临又一轮阻塞,这种情况就不怎么“公平”——新来的线程 T - 1 抢先了!

原理说完了,那具体怎么构建公平的 ReentrantLock 呢?构造函数传参即可:

public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();  ## 入参 fair 传入 true,构建公平锁}

Condition 解析

使用

ReentrantLock 的加解锁过程已详细分析了一遍,如果你经常使用这个工具,肯定对衍生出另一个大咖 condition 有所了解陌生。
二话不说,先甩出 demo:

static Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();;

public void doSomething(){lock.lock();
    System.out.println(String.format("%s 线程,获取到锁了",Thread.currentThread().getName()));
    try {System.out.println(String.format("%s 线程,await",Thread.currentThread().getName()));
        TimeUnit.SECONDS.sleep(2L); // 模拟耗时业务逻辑执行
        condition.await();  //await
        System.out.println(String.format("%s 线程,await 被唤醒",Thread.currentThread().getName()));
    } catch (InterruptedException e) {e.printStackTrace();
    }
    System.out.println(String.format("%s 线程,业务执行完毕",Thread.currentThread().getName()));
    lock.unlock();}

public static void main(String[] args) throws InterruptedException {ReentrantLockTest test = new ReentrantLockTest();
    int total = 1;
    while (total>0){Thread t = new Thread(()->{test.doSomething();
        },"T-"+total);
        t.start();

        TimeUnit.MILLISECONDS.sleep(200L);  // 让子线程 T - 1 率先获取到锁
        lock.lock();
        System.out.println(String.format("%s 线程,获取到锁了",Thread.currentThread().getName()));
        test.condition.signal();
        System.out.println(String.format("%s 线程,signal",Thread.currentThread().getName()));
        lock.unlock();
        total--;
    }
}

结合已掌握的加解锁原理,分析 demo 执行过程:

  • 人为控制让子 线程 T -1先获取到锁,200ms 后 main 线程 也会尝试获取锁,当然 main 线程 获取不到——由于耗时达 2s 的业务逻辑疯狂执行中。(sleep 处,此时 main 线程应该构建了同步队列,main 线程 作为 2 号节点的绑定线程被无情阻塞,下图)

  • 2s 后,线程 T -1搞定了难缠的业务逻辑,却又遭遇 condition.await() 的伏击
  • 此时,线程 main发现自己神奇的不被阻塞了,又神奇的获取到了锁。于是投桃报李,condition.signal()unlock 二连招唤醒了 线程 T -1
  • 线程 T -1觉醒于 await 处,执行完剩余逻辑

demo 的执行结果,能初步证明上述分析:

T- 1 线程,获取到锁了
T- 1 线程,await
main 线程,获取到锁了
main 线程,signal
T- 1 线程,await 被唤醒
T- 1 线程,业务执行完毕

原理

构造器

从构造函数出发:

public Condition newCondition() {return sync.newCondition();
}

## Sync 类创建 ConditionObject
final ConditionObject newCondition() {return new ConditionObject();
}

ConditionObject 是 AQS 中的另一内部类,看看它的属性:

## ConditionObject 类
private transient Node firstWaiter;
private transient Node lastWaiter;

感觉上和 AQS 的设定上有些像?

## AQS 类
private transient volatile Node head;
private transient volatile Node tail;

先大胆猜测一下,condition 中很可能会再次构建同步队列。

await()

接下来就是验证我们的猜测的过程:

public final void await() throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();    ## 创建等待队列,node 是尾节点。详情参看[addConditionWaiter 详情]
    int savedState = fullyRelease(node);    ## 重置 state,返回重置前的 state 值。详情参看[fullyRelease 详情]
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {    ## 是否在 AQS 同步队列中
        LockSupport.park(this);    ## 不在 AQS 同步队列的节点,阻塞当前线程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • addConditionWaiter 详情
private Node addConditionWaiter() {if (!isHeldExclusively())    ## 当前线程是否 owner 线程,如果不是,抛异常——这儿决定了 await 必须用在 lock()方法之后
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
        t = lastWaiter;
    }

    Node node = new Node(Node.CONDITION); ## 创建新节点,原子形赋值 waitStatus=CONDITION=-2,并绑定当前线程到 node 节点

    ## node 会作为尾节点,置于队列最后
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node; 
}
  • fullyRelease 详情
final int fullyRelease(Node node) {
    try {int savedState = getState();    ## 获取当前 state
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();} catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

public final boolean release(int arg) {if (tryRelease(arg)) {    ## 尝试“清 0”state
        Node h = head;    ## 此处 head 不为空,unpark 线程 main,return true
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())    ## 当前线程验证,如果当前线程!=owner,抛异常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {    ## 如果 state 清 0,同事清空 owner 线程,return true
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

有了之前分析 ReentrantLock 的经验,与之非常相像的 condition 代码应该不难拿下。
这里画出 await 方法中 fullyRelease(node)执行前后的节点和关键属性的变化:

图右侧(await 方法执行到了 LockSupport.park(this)时),线程 T -1已经阻塞,线程 main则解除阻塞状态。

通过上图很容易看出,我们之前的猜测是正确的:await方法又构建了一个同步队列,不过这次的头、尾指针在 ConditionObject 类中

signal()

再来看看 signal 方法作了什么:

public final void signal() {if (!isHeldExclusively())    ## 和 await()方法中的一样,先验证 owner 线程
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first)    ## condition 头结点传递
             && (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    Node p = enq(node);    ## 将 ConditionObject 头结点移动到 AQS 队列尾部。详情参看[enq 详情]
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))    ## 取消或修改 waitStatus 失败才作 unpark 操作,此处 unpark 不会触发
        LockSupport.unpark(node.thread);
    return true;
}
  • enq 详情
private Node enq(Node node) {for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {node.setPrevRelaxed(oldTail);    ## 入参 node,成为了 AQS 队列新的尾节点
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {initializeSyncQueue();    ## 初始化 AQS 队列
        }
    }
}

signal 中最神奇的莫过于 enq(node)方法,它完成了节点的转移,condition 队列头结点 -> AQS 队列尾节点。

通过下图观察整个 signal 方法产生的各对象结构和属性变化:

观察可知,signal 执行后节点转移已经完成,线程 T - 1 依然阻塞,此时 ConditionObject 已经完成了它的历史使命。

线程 T - 1 什么时候解除阻塞呢?其实这部分上篇文章已经分析过了,就是我们的老朋友 unlock()。
区别在于线程 T - 1 被唤醒后,执行的是 await 后续的逻辑:

public final void await() throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {    ## 2. 下次循环,node 已经在 AQS 队列中,返回 true,跳出循环
        LockSupport.park(this);    ## 1. 线程 T - 1 觉醒于此
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) ## 3. 再度获取到锁
            && interruptMode != THROW_IE)    
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

后记

至此,我们已经了解了 ReentrantLock 的主逻辑的源码实现(公平、非公平、condition),本系列的下篇文章将进入下一副本——CountDownLatch,敬请期待!

正文完
 0