关于java:Java并发编程-Lock-Condition-ReentrantLock二

55次阅读

共计 8444 个字符,预计需要花费 22 分钟才能阅读完成。

实现上一篇文章的未尽事宜:

  1. ReentrantLock 的 lock、unlock 源码剖析
  2. Condition 的 await、signal 源码剖析

ReentrantLock#lock

lock 办法最终是由 sync 实现的,偏心锁的 sync 是 FairSync,非偏心锁是 UnfairSync。

两者 lock 办法的区别是,偏心锁 FairSync 间接调用 acquire(1)办法,非偏心锁 UnfairSync 则首先尝试取得锁资源(间接尝试批改锁状态)、获取不到才调用 acquire(1)办法。

acquire 办法由 AQS 实现。

AbstractQueuedSynchronizer#acquire

首先调用 tryAcquire 办法尝试取得锁资源,如果获取不胜利的话,调用 acquireQueued 办法进入队列排队期待。

如果是通过 acquireQueued 办法通过排队获取到锁资源、并且办法返回 true 的话,阐明在线程排队期待锁资源的过程中收到了中断信号,然而因为线程处于挂起状态、尚未取得锁资源,不能对 CLH 队列做操作,所以须要等到获取到锁资源之后、再调用 selfInterrupt()中断。

public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

上面咱们看一下 tryAcquire 及 acquireQueued 办法。

FairSync#tryAcquire 以及 UnFairSync#tryAcquire

偏心锁与非偏心锁的 tryAcquire 办法的实现逻辑不同。

偏心锁 FairSync#tryAcquire 办法判断锁状态处于闲暇的话,首先调用 hasQueuedPredecessors 办法,判断以后 CLH 队列中是否存在比以后线程等待时间更久的线程,没有的话才尝试获取锁资源(CAS 形式批改锁状态)。

非偏心锁 UnFairSync#tryAcquire 办法在判断锁处于闲暇状态的话,间接尝试获取锁资源(CAS 形式批改锁状态)。

不论是 FairSync 还是 UnFairSync 的 tryAcquire 办法,如果判断锁资源是被以后线程独占,则能够间接再次获取锁资源,锁状态 state 在以后值的根底上加 1。这也就是可重入锁的意思,同一线程能够屡次取得锁资源。

AbstractQueuedSynchronizer#acquireQueued

在 tryAcquire 获取锁资源失败后,调用 acquireQueued 办法,办法名很好的阐明了办法的作用:通过排队取得锁资源。

办法的参数 addWaiter(Node.EXCLUSIVE), arg),咱们先看一下这个办法。

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

代码其实很简略,首先将以后线程封装成 Node,因为 ReentrantLock 是独占锁,所以 Node 的 mode 是独占。

而后如果队列不空的话,将以后节点的 prev 设置为尾结点(留神这个时候的操作不是 CAS 的,也没有取得锁),之后才利用 CAS 操作将尾结点设置为以后节点(到这一步以后节点才正式退出队列),返回。

否则如果队列空的话,调用 enq 办法把以后节点退出队列,enq 办法通过自旋 +CAS 的形式将以后节点入队(退出到尾结点),如果以后队列空的话,为 CLH 队列创立一个 空的头节点 后再将以后节点作为尾结点退出到队列中。

值得关注的是 enq 办法的操作形式,与 addWaiter 在队列不空的时候尝试间接将以后节点退出队列尾部的操作逻辑统一(关注点 1):操作是在没有获取到锁资源的前提下进行的,在不应用 CAS 的状况下首先将以后节点的 prev 指向尾结点,而后再尝试 CAS 扭转队列的尾结点为以后节点。如果本次 CAS 操作失败(有其余线程曾经退出队列,队列的尾结点有变动),则自旋再来一次,如此重复直到胜利退出队列。

而后再来看 acquireQueued 办法:
先上代码:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 以后节点的前一个节点
                final Node p = node.predecessor();
                
                if (p == head && tryAcquire(arg)) {setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

定义了一个 interrupted 变量,表明以后线程排队挂起期待的过程中是否收到了中断信号。

判断以后节点的前一个节点如果是头节点的话,阐明以后节点排队排到了,该出队获取锁资源了,所以就调用 tryAcquire 尝试,如果的确拿到锁资源了,则头节点出队,以后节点变成头节点,线程胜利拿到了锁资源,返回 interrupted。

否则,前一节点不是头节点的话,阐明还得排队期待,所以首先调用shouldParkAfterFailedAcquire:办法名真的是太好了,倡议大家学习一下,不要胆怯办法名称太长,相比之下可能表白意思更重要!这个办法是判断在没有拿到锁资源后是否须要通过 park 挂起以后线程。

shouldParkAfterFailedAcquire 的代码逻辑:

上一节点的 waitStatus=Node.SIGNAL 的话,阐明上一节点处于期待状态,则返回 true,能够挂起。

如果上一节点曾经被勾销(waitStatus>0)则循环查看所有勾销状态的上一节点,将所有勾销状态的上一节点全副出队列。

如果上一节点状态为 0,则尝试 CAS 批改上一节点状态为 Node.SIGNAL,返回 false(还不能挂起,始终要等到上一节点状态为 Node.SIGNAL 才能够挂起),返回 false 之后调用方不挂起线程、在下次自旋过程中挂起。

批改上一节点状态为 SIGNAL 后才挂起是一种“负责任的挂起”态度(关注点 2 ),是为了挂起之后可能顺利的被唤醒,因为锁占用线程在实现操作调用 unlock 开释锁资源之后,是针对队列中状态为 Node.SIGNAL 的节点做唤醒。

接下来,线程能够被挂起了,调用parkAndCheckInterrupt 办法:

 private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
        return Thread.interrupted();}

很简略,挂起线程,(关注点 3)被唤醒后返回线程的中断状态(挂起过程中如果收到中断信号则返回 true,否则,没有收到中断信号则返回 false)。

所以咱们能够看到,如果线程排队、排队挂起期待锁资源的过程中如果收到了中断信号,则 acquireQueued 办法会返回 true。

所以咱们也就应该可能了解 acquire 办法中,调用 acquireQueued 办法返回为 true 的话,通过 selfInterrupt()办法补发一个中断的起因了。

如果没有中断的话,持续自旋,再次判断以后节点的上一个节点是否是头节点,是的话就 tryAcquire,不是的话就持续 shouldParkAfterFailedAcquire、parkAndCheckInterrupt…… 如此重复,直到 tryAcquire 胜利,获取到锁资源!

lock 办法源码剖析结束!

ReentrantLock#unlock

unlock 办法间接调用 sync 的 release:

public void unlock() {sync.release(1);
    }

release 办法是 AQS 实现的:

  public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先调用 ReentranceLock 的 tryRelease 办法。之后查看 CLH 队列不空、且首节点 waitStatus!= 0 的话 ( 参见关注点 2 )则调用 unparkSuccessor(h)。

先看 tryRelease:

protected final boolean tryRelease(int releases) {int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

tryRelease 办法更新锁状态,如果更新后锁状态为 0 则开释锁 setExclusiveOwnerThread(null),返回 true。

开释锁资源胜利,调用 AQS 的 unparkSuccessor(h):

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

如果以后节点(调用时传入的是头节点)waitStatus<0,CANCELLED、SIGNAL、CONDITION 或 PROPAGATE 都是小于 0 的状态,然而这里可能的也就是 SIGNAL,则批改状态为 0。

一般来讲须要唤醒的节点就是以后节点的下一节点,然而如果下一节点被勾销了,就只能找前面的没有被勾销的节点。不过 unparkSuccessor 采纳的算法是从尾结点向前找,始终找到以后节点,最初被找到的那个节点就是间隔以后节点最近的那个未被勾销的节点。

有一个疑难是:既然是双向队列,这里为什么要从尾结点向前查找,为什么不从以后节点向后查找?两者相比显然向后查找效率更高而且更容易了解。

倡议联合 关注点 1 来了解,因为节点退出队列的时候蕴含 3 步:

  1. 首先是设置以后节点的 prev 为原尾结点
  2. CAS 批改尾结点为以后节点
  3. 批改原尾结点的 next 为以后节点

这 3 步操作并没有锁爱护,3 步一起看不是原子操作,只有独自看第 2 步是原子操作。所以执行完第 1 步、执行完第 2 步以后线程都有可能被 cpu 挂起。

为了不便阐明问题,咱们假如以后队列有 H 头节点,3 个节点 N1、N2、N3,N3 是尾结点:

H -> N1 -> N2 -> N3(T)

假如线程 1 申请获取锁资源,然而锁资源被线程 2 独占了,线程 1 只能封装为 N4 申请加入队列。假如线程 1 的退出队列操作执行到上述第 2 步实现之后被挂起了。队列变成:

H -> N1 -> N2 -> N3 -> N4(T)

因为线程 1 还没有执行完退出队列的第 3 步,所以 N4 变成了尾结点,N4 的 prev 是 N3,然而 N3 的 next 是 null。

假如这个时候碰巧,N1、N2、N3 都被勾销了。

好了,场景筹备实现了。

线程 2 开释锁资源之后,该调用 unparkSuccessor 唤醒期待线程了。

如果向后查找,从头节点 H 开始,N1、N2、N3 都被勾销所以就被跳过了,N3 的 next 是 null,找不到尾结点 N4…… 就出问题了。

推导一下向前查找应该是不存在这个问题的,这个算法应该是和退出队列算法 (关注点 1) 匹配的。

好了,unparkSuccessor 找到了应该被唤醒的排队节点后,唤醒该节点的线程,被唤醒的线程会从 关注点 3 继续执行。

unlock 剖析结束!

Condition#await

Conditon 提供了 await()、awaitUninterruptibly()、awaitNanos(long nanosTimeout)、await(long time, TimeUnit unit)、awaitUntil(Date deadline)等期待办法,其实次要也就是期待时长、可中断期待或不可中断期待的区别,代码逻辑大同小异。所以咱们就以 await()作为例子来剖析。

AQS 中蕴含了 Condition 接口的一个默认实现 ConditionObject,await()办法是 ConditionObject 实现的:

public final void await() throws InterruptedException {if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

第一步,以后节点入 Condition 队列排队:调用 addConditionWaiter 办法,addConditionWaiter 办法负责将以后线程封装为 mode 为 CONDITION 的 Node,并将该 Node 放入 Condition 的队列中排队。

入 Condition 队列的逻辑也很简略,由 lastWaiter 指定的队尾节点的 nextWaiter 指定为以后节点,以后节点变为 lastWaiter。addConditionWaiter 同时也会清理队列中被勾销的节点。

须要留神的是,Condition 队列中的首节点不是空架子,是实实在在排队期待的线程!

第二步,用刚创立的节点 node 调用 fullyRelease 办法。

fullRelease 办法的逻辑是:以后线程放弃对 ReentrantLock 的锁定,开释曾经获取的锁资源。放弃锁资源的起因是,以后线程须要挂起期待,必须等其余线程唤醒之后能力继续执行操作,持有锁资源去睡觉显然是不适合的,是对锁资源的节约,或者会造成死锁,所以就肯定要开释锁资源,只有把锁资源交给其余线程,其余线程有机会获取锁资源,才有可能会对以后线程收回 signal 信号、从新唤醒以后线程。

fullRelease 办法最终会调用 release 办法。

public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先调用 tryRelease 办法,胜利则意味着以后线程开释掉了锁资源,间接查看 CLH 队列头节点如果不空、状态为期待唤醒的话,唤醒该节点。

第三步,循环查看刚刚退出到 Condtion 队列中的节点 node 如果不在 CLH 队列中,则挂起以后线程(关注点 4)。直到以后线程被其余线程通过 signal 操作唤醒之后,则该节点 node 会被放入到 CLH 队列中(满足循环完结条件),完结循环。

第四步,曾经完结 Condition 排队了,节点曾经在 CLH 队列中了。咱们晓得这个时候以后线程必须要再次获取到锁资源能力持续。所以,应用以后节点 node 调用 acquireQueued 办法(这个办法后面剖析过,咱们曾经很相熟了)

第五步,如果 Condition 队列中以后节点 node 的下一节点 nextWaiter 不空的话,调用 unlinkCancelledWaiters()革除被勾销的节点。

第六步,挂起期待过程中如果产生中断的话,调用 reportInterruptAfterWait 解决中断。

await 办法剖析实现,接下来看 signal 办法。

Condition#signal

signal 办法的调用场景是:以后业务实现操作之后,可能会导致在 Condition 对象的队列中排队的线程满足相应条件,因而须要调用该 COndition 对象的 signal 办法唤醒期待线程。

public final void signal() {if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

signal 办法首先获取到队列中的第一个节点,而后调用 doSignal 办法:

 private void doSignal(Node first) {
            do {if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

首先解决好排队队列,首节点出队列,如果队列空的话,做相应的解决。

而后应用以后节点调用 transferForSignal 办法,如果 transferForSignal 返回 false,则获取 fisrt 为 firstWaiter(其实就是用队列中的下一个节点持续循环)。

transferForSignal 办法

final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

如果将以后节点从 CONDITION 状态批改为 0 失败,*表明以后节点状态不正确,可能被勾销,或者曾经被其余线程唤醒中(曾经批改状态为 0 了),则返回 false。

否则,如果状态批改胜利,该节点入 CLH 队列,并查看 CLH 队列中该节点的上一节点如果曾经被勾销、或者批改上一节点状态为 SIGNAL 失败的话(非凡状况),间接唤醒以后节点的线程从新同步(关注点 4),返回 true。

下面所说的非凡状况下,上一节点可能曾经被勾销,或者曾经被其余线程批改了状态,所以以后节点可能取得了出队列从而取得锁资源的机会,所以就唤醒线程做一次尝试。

否则,失常批改上一节点状态为 SIGNAL 后,以后节点退出 CLH 队列中失常排队,期待被 ReentranceLock 的 unlock 办法唤醒(唤醒后程序逻辑仍然会到关注点 4)

小结

Finally,ReentranceLock 以及 Condition 剖析完了,欢送斧正谬误!

Thanks a lot!

上一篇 Java 并发编程 Lock Condition & ReentrantLock(一)

正文完
 0