前几篇文章介绍了 AQS(AbstractQueuedSynchronizer)中的独占模式和对 Condition 的实现,这一篇文章来聊聊基于 AQS 框架实现的锁工具:ReentrantLock。
ReentrantLock 是一个可重入的互斥锁,也被称为独占锁,具备与 synchronized 雷同的一些根本行为和语义,但性能更弱小。
本文基于 jdk1.8.0_91
1. 继承体系
public class ReentrantLock implements Lock, java.io.Serializable
ReentrantLock 是 Lock 接口的实现,办法对照如下:
Lock 接口 ReentrantLock 实现lock() sync.lock()lockInterruptibly() sync.acquireInterruptibly(1)tryLock() sync.nonfairTryAcquire(1)tryLock(long time, TimeUnit unit) sync.tryAcquireNanos(1, unit.toNanos(timeout))unlock() sync.release(1)newCondition() sync.newCondition()
可知 ReentrantLock 对 Lock 的实现都是调用外部类 Sync 来做的。
1.1 AQS
Sync 继承了 AQS,也就是说 ReentrantLock 的大部分实现都曾经由 AQS 实现了。
AQS 提供了一系列模板办法,对于互斥锁来说,须要实现其中的 tryAcquire、tryRelease、isHeldExclusively 办法。
java.util.concurrent.locks.AbstractQueuedSynchronizer
// 独占获取(资源数)protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}// 独占开释(资源数)protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}// 共享获取(资源数)protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}// 共享获取(资源数)protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException();}// 是否排它状态protected boolean isHeldExclusively() { throw new UnsupportedOperationException();}
1.2 Sync
Sync 重写了 AQS 的 tryRelease 和 isHeldExclusively 办法,而 AQS 的 tryAcquire 办法交由 Sync 的子类来实现。
Sync 中还具备了一个形象的 lock 办法,强制子类实现。
java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer { /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); protected final boolean tryRelease(int releases) {...} protected final boolean isHeldExclusively() {...}}
1.3 FailSync/NonfairSync
Sync 具备两个子类 FailSync 和 NonfairSync,对应的是 ReentrantLock 的两种实现:偏心锁(fair lock)、非偏心锁(non-fair lock)。
Sync 中蕴含了一个形象办法 lock 须要子类来实现,设计该形象办法的目标是,给非偏心模式加锁提供入口。
因为偏心锁和非偏心锁的区别,次要体现在获取锁的机制不同:
- 偏心模式,先发动申请的线程先获取锁,后续线程严格排队顺次期待获取锁。
- 非偏心模式,线程首次发动锁申请时,只有锁是可用状态,线程就能够尝试获取锁。如果锁不可用,以后线程只能进入同步队列,以偏心模式排队期待获取锁。
也就是说,非偏心锁只有在以后线程未进入同步队列之前,才能够去抢夺锁,一旦进入同步队列,只能按排队程序期待锁。
2. 构造方法
/** Synchronizer providing all implementation mechanics */// 提供所有实现机制的同步器private final Sync sync;/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */public ReentrantLock() { sync = new NonfairSync();}/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
为什么默认创立的是非偏心锁?
因为偏心模式下,队列头部的线程从阻塞中被唤醒到真正运行,波及到线程调度和 CPU 上下文的切换,比拟耗时,这个过程中锁处于闲暇状态,浪费资源。
而非偏心模式下,多个线程之间采纳 CAS 来抢夺锁,这两头没有时间延迟,可能进步吞吐量。
3. Lock 接口实现
// 获取锁。 void lock() // 如果以后线程未被中断,则获取锁。void lockInterruptibly() // 仅在调用时锁未被另一个线程放弃的状况下,才获取该锁。boolean tryLock() // 如果锁在给定等待时间内没有被另一个线程放弃,且以后线程未被中断,则获取该锁。boolean tryLock(long timeout, TimeUnit unit) // 试图开释此锁。 void unlock() // 返回用来与此 Lock 实例一起应用的 Condition 实例。Condition newCondition()
3.1 Lock#lock
获取锁:
- 能够取得锁:如果没有其余线程取得锁,则立刻返回,设置持有锁的数量为 1(tryAcquire)。
- 曾经取得锁:如果以后线程曾经持有锁,则立刻返回,设置持有锁的数量加 1(tryAcquire)。
- 期待取得锁:如果其余线程持有锁,则以后线程会进入阻塞直到取得锁胜利(acquireQueued)。
java.util.concurrent.locks.ReentrantLock#lock
/** * Acquires the lock. * * <p>Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * * <p>If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * * <p>If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */public void lock() { sync.lock();}
3.1.1 Sync#lock
在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 Sync#lock 办法。
偏心锁
java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() { acquire(1);}
非偏心锁
java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);}
偏心锁的 lock 办法间接调用 AQS#acquire(1);
非偏心锁首先通过 CAS 批改 state 值来获取锁,当获取失败时才会调用 AQS#acquire(1) 来获取锁。
3.1.2 AQS#acquire
AQS 中的 acquire 办法中,只有 tryAcquire 办法须要子类来实现。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
3.1.3 tryAcquire
在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 AQS#tryAcquire 办法。
偏心锁
java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && // 判断同步队列中是否有等待时间更长的节点 compareAndSetState(0, acquires)) { // 进入这里,阐明以后线程期待锁工夫最长,则CAS批改state setExclusiveOwnerThread(current); // 将以后线程设置为持有锁的线程 return true; } } else if (current == getExclusiveOwnerThread()) { // 以后线程已持有锁 int nextc = c + acquires; // 重入次数 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 可重入的最大次数 Integer.MAX_VALUE setState(nextc); return true; } return false;}
代码流程:
- 如果锁是可用状态,且同步队列中没有比以后线程等待时间还长的节点,则尝试获取锁,胜利则设为锁的持有者。
- 如果以后线程已持有锁,则进行重入,设置已获取锁的次数,最大次数为 Integer.MAX_VALUE。
- 以上条件都不合乎,则无奈获取锁。
非偏心锁
java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);}
java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 不论以后线程在同步队列中是否期待最久,都来 CAS 抢夺锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果已取得锁,则重入。逻辑与偏心锁统一 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}
代码流程:
- 如果锁是可用状态,(不论同步队列中有没有节点在阻塞期待中)间接尝试获取锁,胜利则设为锁的持有者。
- 如果以后线程已持有锁,则进行重入,设置已获取锁的次数,最大次数为 Integer.MAX_VALUE。
- 以上条件都不合乎,则无奈获取锁。
能够看到,偏心锁比非偏心锁多执行了 hasQueuedPredecessors 办法,用于判断同步队列中是否具备比以后线程期待锁工夫还长的节点。
在 AQS 中的同步队列中,头节点是一个 dummy node,因而等待时间最长的节点是头节点的下一个节点,若该节点存在且不是以后线程,则 hasQueuedPredecessors 返回 true。阐明以后节点不是同步队列中等待时间最长的节点,无奈获取锁。
java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && // 头节点的下一个节点,不是以后线程的节点,阐明以后线程期待锁工夫不是最长的 ((s = h.next) == null || s.thread != Thread.currentThread());}
3.2 Lock#lockInterruptibly
如果以后线程未被中断,则获取锁。
java.util.concurrent.locks.ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1);}
关注获取锁过程中,对异样的解决。
- 获取锁之前,以后线程曾经中断,则间接抛出中断异样。
- 在同步队列中期待锁的过程中,如果被中断唤醒,则放弃期待锁,间接抛出异样。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 获取锁之前,以后线程曾经中断,则间接抛出中断异样 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly
/** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程在阻塞期待锁的过程中,被中断唤醒,则放弃期待锁,间接抛出异样 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
3.3 Lock#tryLock
仅在调用时锁未被另一个线程放弃的状况下,才获取该锁。
与非偏心锁的 NonfairSync#tryAcquire 办法一样,都是调用了 Sync#nonfairTryAcquire 办法。
- ReentrantLock#lock -> NonfairSync#lock -> AQS#acquire -> NonfairSync#tryAcquire -> Sync#nonfairTryAcquire
- ReentrantLock#tryLock -> Sync#nonfairTryAcquire
留神,Sync#nonfairTryAcquire 办法不存在任何和队列相干的操作。
在锁没有被其余线程持有的状况下,不论应用的是偏心锁还是非偏心锁,多个线程都能够调用该办法来 CAS 抢夺锁,抢夺失败了不必进入同步队列,间接返回。
应用偏心锁的状况下,在上一个持有锁的线程开释锁(锁状态 state == 0)之后,同步队列中的线程被唤醒但尚未获取锁之前,此时以后线程执行 tryLock 可能会胜利取得锁,这实际上是对偏心锁的公平性的一种毁坏。
在某些状况下,突破公平性的行为可能是有用的。如果心愿恪守此锁的偏心设置,则应用 tryLock(0, TimeUnit.SECONDS)
,它简直是等效的(也检测中断)。
java.util.concurrent.locks.ReentrantLock#tryLock()
public boolean tryLock() { return sync.nonfairTryAcquire(1);}
java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 不论同步队列中是否具备期待很久的节点,以后线程间接 CAS 抢夺锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果已取得锁,则重入。逻辑与偏心锁统一 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}
3.4 Lock#tryLock(time, unit)
如果锁在给定等待时间内没有被另一个线程放弃,且以后线程未被中断,则获取该锁。
java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout));}
在 JDK 中,AQS 中的 tryAcquireNanos 办法只会被 ReentrantLock#tryLock(time, unit) 和 ReentrantReadWriteLock.WriteLock#tryLock(time, unit) 这两个办法调用到。
阐明只有独占模式能力调用。
代码流程:
- 获取锁之前,如果以后线程已中断,则抛出中断异样。
- tryAcquire:以后线程尝试获取锁,获取胜利间接返回,否则进入下一个步。
- doAcquireNanos:以后线程进入同步队列中进行期待,直到取得锁、产生中断、期待超时。
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);}
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireNanos
/** * Acquires in exclusive timed mode. * * @param arg the acquire argument * @param nanosTimeout max wait time * @return {@code true} if acquired */private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) // 直到超时都没有取得锁,则返回false return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 能够进入阻塞的状况下,剩余时间大于阈值,则阻塞,否则自旋 if (Thread.interrupted()) throw new InterruptedException(); // 线程在阻塞期待锁的过程中,被中断唤醒,则放弃期待锁,间接抛出异样 } } finally { if (failed) cancelAcquire(node); }}
3.5 Lock#unlock
尝试开释锁。
java.util.concurrent.locks.ReentrantLock#unlock
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */public void unlock() { sync.release(1);}
3.5.1 AQS#release
尝试开释锁,若开释胜利,且同步队列中具备期待中的节点,则尝试唤醒后继节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) { if (tryRelease(arg)) { // 开释锁资源 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤醒head的后继节点 return true; } return false;}
3.5.2 Sync#tryRelease
在 ReentrantLock 中,不论是偏心锁还是非偏心锁,均应用雷同的 Sync#tryRelease 办法。
- 如果以后线程不是持有锁的线程,抛出 IllegalMonitorStateException。
- 因为锁是可重入的,必须把持有的锁全副开释(计数归零)才表明以后线程不再持有锁。
java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 计算开释之后的state if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 锁已全副开释(获取锁的次数必须等于开释次数),返回true,表明能够唤醒下一个期待线程 setExclusiveOwnerThread(null); // 设置独占锁持有线程为null } setState(c); return free;}
3.4 Lock#newCondition
创立 AQS 中的 ConditionObject 对象。是能够与 Lock 一起应用的 Condition 接口实例。
Condition 实例反对与 Object 的监视器办法(wait、notify 和 notifyAll)雷同的用法(await、signal 和 signalAll)。
- 在调用 Condition#await 或 Condition#signal 办法时,如果没有持有锁,则将抛出 IllegalMonitorStateException。
- 在调用 Condition#await 办法时,将开释锁并进入阻塞,当被唤醒时从新获取锁,并复原调用 Condition#await 时锁的持有计数。
- 如果线程在期待时被中断,则期待将终止,待从新获取锁胜利之后,再响应中断(抛异样或从新中断)。
- 期待线程按 FIFO 程序收到信号。
- 期待办法返回的线程从新获取锁的程序,与线程最后获取锁的程序雷同(对于非偏心锁,是按获取锁的程序;对于偏心锁,等同于按期待锁的工夫排序)。
java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() { return new ConditionObject();}
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#ConditionObject
public ConditionObject() {}
相干浏览:
浏览 JDK 源码:AQS 中的独占模式
浏览 JDK 源码:AQS 中的共享模式
浏览 JDK 源码:AQS 对 Condition 的实现
作者:Sumkor
链接:https://segmentfault.com/a/11...