关于java:阅读-JDK-源码可重入锁-ReentrantLock

39次阅读

共计 12261 个字符,预计需要花费 31 分钟才能阅读完成。

前几篇文章介绍了 AQS(AbstractQueuedSynchronizer)中的独占模式和对 Condition 的实现,这一篇文章来聊聊基于 AQS 框架实现的锁工具:ReentrantLock。

ReentrantLock 是一个可重入的互斥锁,也被称为独占锁,具备与 synchronized 雷同的一些根本行为和语义,但性能更弱小。

本文基于 jdk1.8.0_91

1. 继承体系

public class ReentrantLock implements Lock, java.io.Serializable

ReentrantLock 是 Lock 接口的实现,办法对照如下:

Lock 接口                             ReentrantLock 实现

lock()                                     sync.lock()
lockInterruptibly()                     sync.acquireInterruptibly(1)
tryLock()                             sync.nonfairTryAcquire(1)
tryLock(long time, TimeUnit unit)     sync.tryAcquireNanos(1, unit.toNanos(timeout))
unlock()                             sync.release(1)
newCondition()                             sync.newCondition()

可知 ReentrantLock 对 Lock 的实现都是调用外部类 Sync 来做的。

1.1 AQS

Sync 继承了 AQS,也就是说 ReentrantLock 的大部分实现都曾经由 AQS 实现了。
AQS 提供了一系列模板办法,对于互斥锁来说,须要实现其中的 tryAcquire、tryRelease、isHeldExclusively 办法。

java.util.concurrent.locks.AbstractQueuedSynchronizer

// 独占获取(资源数)protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

// 独占开释(资源数)protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}

// 共享获取(资源数)protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}

// 共享获取(资源数)protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}

// 是否排它状态
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();
}    

1.2 Sync

Sync 重写了 AQS 的 tryRelease 和 isHeldExclusively 办法,而 AQS 的 tryAcquire 办法交由 Sync 的子类来实现。
Sync 中还具备了一个形象的 lock 办法,强制子类实现。

java.util.concurrent.locks.ReentrantLock.Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    protected final boolean tryRelease(int releases) {...}

    protected final boolean isHeldExclusively() {...}
}

1.3 FailSync/NonfairSync

Sync 具备两个子类 FailSync 和 NonfairSync,对应的是 ReentrantLock 的两种实现:偏心锁(fair lock)、非偏心锁(non-fair lock)。

Sync 中蕴含了一个形象办法 lock 须要子类来实现,设计该形象办法的目标是,给非偏心模式加锁提供入口。
因为偏心锁和非偏心锁的区别,次要体现在获取锁的机制不同:

  1. 偏心模式,先发动申请的线程先获取锁,后续线程严格排队顺次期待获取锁。
  2. 非偏心模式,线程首次发动锁申请时,只有锁是可用状态,线程就能够尝试获取锁。如果锁不可用,以后线程只能进入同步队列,以偏心模式排队期待获取锁。

也就是说,非偏心锁只有在以后线程未进入同步队列之前,才能够去抢夺锁,一旦进入同步队列,只能按排队程序期待锁。

2. 构造方法

/** Synchronizer providing all implementation mechanics */
// 提供所有实现机制的同步器
private final Sync sync;

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

为什么默认创立的是非偏心锁?

因为偏心模式下,队列头部的线程从阻塞中被唤醒到真正运行,波及到线程调度和 CPU 上下文的切换,比拟耗时,这个过程中锁处于闲暇状态,浪费资源。
而非偏心模式下,多个线程之间采纳 CAS 来抢夺锁,这两头没有时间延迟,可能进步吞吐量。

3. Lock 接口实现

// 获取锁。void lock() 

// 如果以后线程未被中断,则获取锁。void lockInterruptibly() 

// 仅在调用时锁未被另一个线程放弃的状况下,才获取该锁。boolean tryLock() 

// 如果锁在给定等待时间内没有被另一个线程放弃,且以后线程未被中断,则获取该锁。boolean tryLock(long timeout, TimeUnit unit) 

// 试图开释此锁。void unlock() 

// 返回用来与此 Lock 实例一起应用的 Condition 实例。Condition newCondition() 

3.1 Lock#lock

获取锁:

  1. 能够取得锁:如果没有其余线程取得锁,则立刻返回,设置持有锁的数量为 1(tryAcquire)。
  2. 曾经取得锁:如果以后线程曾经持有锁,则立刻返回,设置持有锁的数量加 1(tryAcquire)。
  3. 期待取得锁:如果其余线程持有锁,则以后线程会进入阻塞直到取得锁胜利(acquireQueued)。

java.util.concurrent.locks.ReentrantLock#lock

/**
 * Acquires the lock.
 *
 * <p>Acquires the lock if it is not held by another thread and returns
 * immediately, setting the lock hold count to one.
 *
 * <p>If the current thread already holds the lock then the hold
 * count is incremented by one and the method returns immediately.
 *
 * <p>If the lock is held by another thread then the
 * current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the lock has been acquired,
 * at which time the lock hold count is set to one.
 */
public void lock() {sync.lock();
}

3.1.1 Sync#lock

在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 Sync#lock 办法。

偏心锁

java.util.concurrent.locks.ReentrantLock.FairSync#lock

final void lock() {acquire(1);
}

非偏心锁

java.util.concurrent.locks.ReentrantLock.NonfairSync#lock

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

偏心锁的 lock 办法间接调用 AQS#acquire(1);
非偏心锁首先通过 CAS 批改 state 值来获取锁,当获取失败时才会调用 AQS#acquire(1) 来获取锁。

3.1.2 AQS#acquire

AQS 中的 acquire 办法中,只有 tryAcquire 办法须要子类来实现。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire

public final void acquire(int arg) {if (!tryAcquire(arg) && 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}     

3.1.3 tryAcquire

在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 AQS#tryAcquire 办法。

偏心锁

java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {if (!hasQueuedPredecessors() &&              // 判断同步队列中是否有等待时间更长的节点
            compareAndSetState(0, acquires)) {       // 进入这里,阐明以后线程期待锁工夫最长,则 CAS 批改 state
            setExclusiveOwnerThread(current);        // 将以后线程设置为持有锁的线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 以后线程已持有锁
        int nextc = c + acquires; // 重入次数
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded"); // 可重入的最大次数 Integer.MAX_VALUE
        setState(nextc);
        return true;
    }
    return false;
}

代码流程:

  1. 如果锁是可用状态,且同步队列中没有比以后线程等待时间还长的节点,则尝试获取锁,胜利则设为锁的持有者。
  2. 如果以后线程已持有锁,则进行重入,设置已获取锁的次数,最大次数为 Integer.MAX_VALUE。
  3. 以上条件都不合乎,则无奈获取锁。

非偏心锁

java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}

java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {if (compareAndSetState(0, acquires)) { // 不论以后线程在同步队列中是否期待最久,都来 CAS 抢夺锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {  // 如果已取得锁,则重入。逻辑与偏心锁统一
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

代码流程:

  1. 如果锁是可用状态,(不论同步队列中有没有节点在阻塞期待中)间接尝试获取锁,胜利则设为锁的持有者。
  2. 如果以后线程已持有锁,则进行重入,设置已获取锁的次数,最大次数为 Integer.MAX_VALUE。
  3. 以上条件都不合乎,则无奈获取锁。

能够看到,偏心锁比非偏心锁多执行了 hasQueuedPredecessors 办法,用于判断同步队列中是否具备比以后线程期待锁工夫还长的节点。

在 AQS 中的同步队列中,头节点是一个 dummy node,因而等待时间最长的节点是头节点的下一个节点,若该节点存在且不是以后线程,则 hasQueuedPredecessors 返回 true。阐明以后节点不是同步队列中等待时间最长的节点,无奈获取锁。

java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        // 头节点的下一个节点,不是以后线程的节点,阐明以后线程期待锁工夫不是最长的
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

3.2 Lock#lockInterruptibly

如果以后线程未被中断,则获取锁。

java.util.concurrent.locks.ReentrantLock#lockInterruptibly

public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}

关注获取锁过程中,对异样的解决。

  1. 获取锁之前,以后线程曾经中断,则间接抛出中断异样。
  2. 在同步队列中期待锁的过程中,如果被中断唤醒,则放弃期待锁,间接抛出异样。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly

public final void acquireInterruptibly(int arg)
        throws InterruptedException {if (Thread.interrupted())
        // 获取锁之前,以后线程曾经中断,则间接抛出中断异样
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg); 
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly

/**
 * Acquires in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 线程在阻塞期待锁的过程中,被中断唤醒,则放弃期待锁,间接抛出异样
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

3.3 Lock#tryLock

仅在调用时锁未被另一个线程放弃的状况下,才获取该锁。

与非偏心锁的 NonfairSync#tryAcquire 办法一样,都是调用了 Sync#nonfairTryAcquire 办法。

  • ReentrantLock#lock -> NonfairSync#lock -> AQS#acquire -> NonfairSync#tryAcquire -> Sync#nonfairTryAcquire
  • ReentrantLock#tryLock -> Sync#nonfairTryAcquire

留神,Sync#nonfairTryAcquire 办法不存在任何和队列相干的操作。
在锁没有被其余线程持有的状况下,不论应用的是偏心锁还是非偏心锁,多个线程都能够调用该办法来 CAS 抢夺锁,抢夺失败了不必进入同步队列,间接返回。

应用偏心锁的状况下,在上一个持有锁的线程开释锁(锁状态 state == 0)之后,同步队列中的线程被唤醒但尚未获取锁之前,此时以后线程执行 tryLock 可能会胜利取得锁,这实际上是对偏心锁的公平性的一种毁坏。
在某些状况下,突破公平性的行为可能是有用的。如果心愿恪守此锁的偏心设置,则应用 tryLock(0, TimeUnit.SECONDS) ,它简直是等效的(也检测中断)。

java.util.concurrent.locks.ReentrantLock#tryLock()

public boolean tryLock() {return sync.nonfairTryAcquire(1);
}

java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {if (compareAndSetState(0, acquires)) { // 不论同步队列中是否具备期待很久的节点,以后线程间接 CAS 抢夺锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {  // 如果已取得锁,则重入。逻辑与偏心锁统一
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

3.4 Lock#tryLock(time, unit)

如果锁在给定等待时间内没有被另一个线程放弃,且以后线程未被中断,则获取该锁。

java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

在 JDK 中,AQS 中的 tryAcquireNanos 办法只会被 ReentrantLock#tryLock(time, unit) 和 ReentrantReadWriteLock.WriteLock#tryLock(time, unit) 这两个办法调用到。
阐明只有独占模式能力调用。

代码流程:

  1. 获取锁之前,如果以后线程已中断,则抛出中断异样。
  2. tryAcquire:以后线程尝试获取锁,获取胜利间接返回,否则进入下一个步。
  3. doAcquireNanos:以后线程进入同步队列中进行期待,直到取得锁、产生中断、期待超时。

java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireNanos

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireNanos

/**
 * Acquires in exclusive timed mode.
 *
 * @param arg the acquire argument
 * @param nanosTimeout max wait time
 * @return {@code true} if acquired
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) // 直到超时都没有取得锁,则返回 false
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout); // 能够进入阻塞的状况下,剩余时间大于阈值,则阻塞,否则自旋
            if (Thread.interrupted())
                throw new InterruptedException(); // 线程在阻塞期待锁的过程中,被中断唤醒,则放弃期待锁,间接抛出异样}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

3.5 Lock#unlock

尝试开释锁。

java.util.concurrent.locks.ReentrantLock#unlock

/**
 * Attempts to release this lock.
 *
 * <p>If the current thread is the holder of this lock then the hold
 * count is decremented.  If the hold count is now zero then the lock
 * is released.  If the current thread is not the holder of this
 * lock then {@link IllegalMonitorStateException} is thrown.
 *
 * @throws IllegalMonitorStateException if the current thread does not
 *         hold this lock
 */
public void unlock() {sync.release(1);
}

3.5.1 AQS#release

尝试开释锁,若开释胜利,且同步队列中具备期待中的节点,则尝试唤醒后继节点。

java.util.concurrent.locks.AbstractQueuedSynchronizer#release

public final boolean release(int arg) {if (tryRelease(arg)) { // 开释锁资源
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 唤醒 head 的后继节点
        return true;
    }
    return false;
}

3.5.2 Sync#tryRelease

在 ReentrantLock 中,不论是偏心锁还是非偏心锁,均应用雷同的 Sync#tryRelease 办法。

  1. 如果以后线程不是持有锁的线程,抛出 IllegalMonitorStateException。
  2. 因为锁是可重入的,必须把持有的锁全副开释(计数归零)才表明以后线程不再持有锁。

java.util.concurrent.locks.ReentrantLock.Sync#tryRelease

protected final boolean tryRelease(int releases) {int c = getState() - releases; // 计算开释之后的 state
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true; // 锁已全副开释(获取锁的次数必须等于开释次数),返回 true,表明能够唤醒下一个期待线程
        setExclusiveOwnerThread(null); // 设置独占锁持有线程为 null
    }
    setState(c);
    return free;
}

3.4 Lock#newCondition

创立 AQS 中的 ConditionObject 对象。是能够与 Lock 一起应用的 Condition 接口实例。

Condition 实例反对与 Object 的监视器办法(wait、notify 和 notifyAll)雷同的用法(await、signal 和 signalAll)。

  1. 在调用 Condition#await 或 Condition#signal 办法时,如果没有持有锁,则将抛出 IllegalMonitorStateException。
  2. 在调用 Condition#await 办法时,将开释锁并进入阻塞,当被唤醒时从新获取锁,并复原调用 Condition#await 时锁的持有计数。
  3. 如果线程在期待时被中断,则期待将终止,待从新获取锁胜利之后,再响应中断(抛异样或从新中断)。
  4. 期待线程按 FIFO 程序收到信号。
  5. 期待办法返回的线程从新获取锁的程序,与线程最后获取锁的程序雷同(对于非偏心锁,是按获取锁的程序;对于偏心锁,等同于按期待锁的工夫排序)。

java.util.concurrent.locks.ReentrantLock.Sync#newCondition

final ConditionObject newCondition() {return new ConditionObject();
}

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#ConditionObject

public ConditionObject() {}

相干浏览:
浏览 JDK 源码:AQS 中的独占模式
浏览 JDK 源码:AQS 中的共享模式
浏览 JDK 源码:AQS 对 Condition 的实现

作者:Sumkor
链接:https://segmentfault.com/a/11…

正文完
 0