共计 20561 个字符,预计需要花费 52 分钟才能阅读完成。
java.util.concurrent.locks.ReentrantLock
exclusive : adj. (个人或集体) 专用的,专有的,独有的,独占的; 排外的; 不愿接收新成员 (尤指较低社会阶层) 的; 高档的; 豪华的; 高级的
reentrant : 可重入; 可重入的; 重入; 可再入的; 重进入
一切从 Thread 线程开始
独占线程 exclusiveOwnerThread 出场:
package java.util.concurrent.locks;
/**
* A synchronizer that may be exclusively owned by a thread. This
* class provides a basis for creating locks and related synchronizers
* that may entail a notion of ownership. The
* {@code AbstractOwnableSynchronizer} class itself does not manage or
* use this information. However, subclasses and tools may use
* appropriately maintained values to help control and monitor access
* and provide diagnostics.
*
* @since 1.6
* @author Doug Lea
*/
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* Empty constructor for use by subclasses.
*/
protected AbstractOwnableSynchronizer() {}
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}
/**
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread
*/
protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
}
这里的获取当前锁的独占线程的方法是 final 的:
protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
AQS 继承这个类:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {}
快速认识 ReentrantLock
直接上代码:
fun main() {
val time1 = measureTimeMillis {singleThreadSum()
}
val time2 = measureTimeMillis {multiThreadSumNoLock()
}
val time3 = measureTimeMillis {multiThreadSumUseLock()
}
println("time1:$time1")
println("time2:$time2")
println("time3:$time3")
}
fun singleThreadSum() {
var sum: Long = 0
for (i in 1..100000) {sum += i}
for (i in 100001..200000) {sum += i}
println("singleThreadSum: $sum")
}
fun multiThreadSumNoLock() {
var sum: Long = 0
val t1 = Thread {for (i in 1..100000) {sum += i}
}
val t2 = Thread {for (i in 100001..200000) {sum += i}
}
t1.start()
t2.start()
t1.join()
t2.join()
println("multiThreadSumNoLock:$sum")
}
fun multiThreadSumUseLock() {
var sum: Long = 0
val lock = ReentrantLock()
val t1 = Thread {lock.lock()
try {for (i in 1..100000) {sum += i}
} finally {lock.unlock()
}
}
val t2 = Thread {lock.lock()
try {for (i in 100001..200000) {sum += i}
} finally {lock.unlock()
}
}
t1.start()
t2.start()
t1.join()
t2.join()
println("multiThreadSumUseLock:$sum")
}
运行结果(每次会有不同):
singleThreadSum: 20000100000
multiThreadSumNoLock:19496951532
multiThreadSumUseLock:20000100000
time1:2
time2:11
time3:8
其中, lock() 方法背后发生的事情大概如下图:
lock()
如果没有线程使用则立即返回,并设置 state 为 1;
如果当前线程已经占有锁,则 state 加 1;如果其他线程占有锁,则当前线程不可用,等待.
tryLock()
如果锁可用,则获取锁,并立即返回值 true。
如果锁不可用,则此方法将立即返回值 false.
unlock()
尝试释放锁,如果当前线程占有锁则 count 减一,如果 count 为 0 则释放锁。
如果占有线程不是当前线程,则抛异常.
ReentrantLock 是什么?
ReentrantLock, 可重入锁 , 是一个基于 AQS()并发框架的并发控制类.
ReentrantLock 内部实现了 3 个类,分别是:
Sync
NoFairSync
FairSync
其中 Sync 继承自 AQS,实现了释放锁的模板方法 tryRelease(int).
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
Sync() {}
abstract void lock();
final boolean nonfairTryAcquire(int var1) {Thread var2 = Thread.currentThread();
int var3 = this.getState();
if (var3 == 0) {if (this.compareAndSetState(0, var1)) {this.setExclusiveOwnerThread(var2);
return true;
}
} else if (var2 == this.getExclusiveOwnerThread()) {
int var4 = var3 + var1;
if (var4 < 0) {throw new Error("Maximum lock count exceeded");
}
this.setState(var4);
return true;
}
return false;
}
protected final boolean tryRelease(int var1) {int var2 = this.getState() - var1;
if (Thread.currentThread() != this.getExclusiveOwnerThread()) {throw new IllegalMonitorStateException();
} else {
boolean var3 = false;
if (var2 == 0) {
var3 = true;
this.setExclusiveOwnerThread((Thread)null);
}
this.setState(var2);
return var3;
}
}
protected final boolean isHeldExclusively() {return this.getExclusiveOwnerThread() == Thread.currentThread();}
final ConditionObject newCondition() {return new ConditionObject(this);
}
final Thread getOwner() {return this.getState() == 0 ? null : this.getExclusiveOwnerThread();}
final int getHoldCount() {return this.isHeldExclusively() ? this.getState() : 0;}
final boolean isLocked() {return this.getState() != 0;
}
private void readObject(ObjectInputStream var1) throws IOException, ClassNotFoundException {var1.defaultReadObject();
this.setState(0);
}
}
}
而 NoFairSync 和 FairSync 都继承自 Sync,实现各种获取锁的方法 tryAcquire(int)。
FairSync
static final class FairSync extends ReentrantLock.Sync {
private static final long serialVersionUID = -3000897897090466540L;
FairSync() {}
final void lock() {this.acquire(1);
}
protected final boolean tryAcquire(int var1) {Thread var2 = Thread.currentThread();
int var3 = this.getState();
if (var3 == 0) {if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, var1)) {this.setExclusiveOwnerThread(var2);
return true;
}
} else if (var2 == this.getExclusiveOwnerThread()) {
int var4 = var3 + var1;
if (var4 < 0) {throw new Error("Maximum lock count exceeded");
}
this.setState(var4);
return true;
}
return false;
}
}
NonfairSync
static final class NonfairSync extends ReentrantLock.Sync {
private static final long serialVersionUID = 7316153563782823691L;
NonfairSync() {}
final void lock() {if (this.compareAndSetState(0, 1)) {this.setExclusiveOwnerThread(Thread.currentThread());
} else {this.acquire(1);
}
}
protected final boolean tryAcquire(int var1) {return this.nonfairTryAcquire(var1);
}
}
[Ref: https://www.jianshu.com/p/620…]
Lock
First time a thread wants to acquire the lock, it acquires it without any wait as the lock is not owned by any thread yet. Thread acquires and owns it.
Lock below is called the ReentrantLock, which is either open to be acquired or locked and is determined by its state which is either zero or non-zero. It knows the owning thread.
What is Reentrance Mutext?
When a thread acquires a previously unheld lock, the JVM records the owner thread and sets the acquisition count to one. If that same thread acquires the lock again, the count is incremented, and when the owning thread calls unlock, the count is decremented. When the count reaches zero, the lock is released.
Reentrancy means that locks are acquired on a per-thread rather than per-invocation basis. In other words, if a thread is not reentrant, and tries to acquire a lock that it already holds, the request won’t succeed.
A re-entrant lock can follow either fair or non-fair policy, by default it is non-fair. In this article we will discuss both non-fair and fair locks. Below is the class diagram.
Class diagram
ReentrantLock 实现原理
获取锁的背后发生了什么?
When the second thread fails to acquire lock as the lock is already acquired by the first thread then it adds itself to the waiting node linked list at the end of the list.
Try acquiring lock for the queued thread:
After the thread adds itself to the waiting node linked list, it will still try acquiring lock in exclusive uninterruptible mode.
If the thread trying to acquire lock is not the successor to the header node then it won’t be able to acquire the lock. It will still not be parked yet and the predecessor’s status field will be checked to figure out whether the status represents the correct state.
Thread Parking
If the lock to acquire is currently owned by some other thread then the current thread trying to acquire the lock will be parked, that is, disabled for thread scheduling purposes unless the permit is available. If the permit is available then it is consumed and the call returns immediately, otherwise it remains blocked. The blocker (the lock) which is responsible for this thread parking will be set to the thread.
Data Structure
If the lock is already owned by a thread, any other thread trying to acquire the lock will put on a hold and will be waiting in a queue. The thread would be disabled for thread scheduling purpose. The queue here is a double linked list. The lock will know the owning thread, state (0 or 1), head and tail of the linked list. If the node’successor node is waiting for the lock then its wait state property (-1) will reflect that.
Waiting Queue: 双向链表
The thread fails to acquire the lock fails as the lock is already acquired by another thread. The thread is added to a double linked list queue where the header is a dummy node with a mode indicating that it is waiting for a signal and the current thread which failed to acquire the lock becomes the tail node and is the next node to header. Since the current thread fails to get the lock, it is parked as it remains blocked till some other thread unblocks it.
Second thread tries to acquire the lock, it too fails so gets queued. It becomes the new tail and the thread is parked.
unlock() 算法
If the current thread is the holder of this lock then the hold count is decremented. If the hold count is now zero then the lock is released. If the current thread is not the holder of this lock then llegalMonitorStateException is thrown.
Once the lock is released, the lock finds out the node which is in waiting status, removed the waiting status and wakes up the node’s successor so that the node woken up can acquire the lock.
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
The thread to unpark is held in successor, which is normally just the next node. But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
If the node woken up is immediate successor to the head node, it tries to acquire the lock and become the new head. The old head and the variables referring to it are nullified so the gc can claim the memory.
If the owning thread releases the lock, the head’s successor node’s thread will be unparked. The successor node is tail in this case, so we will end up with a single node which is itself head and tail both. The unparked thread will take the ownership of the lock.
Further release of the lock will simply nullify the owning thread and the state of the lock will be set to 0.
公平锁和非公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入 FIFO 队列,队列中的第一个线程才能获得锁。
用一个打水的例子来理解:
公平锁的优点是等待锁的线程不会夯死。缺点是吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大。
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。
非公平锁的优点是可以减少唤起线程的开销(因为可能有的线程可以直接获取到锁,CPU 也就不用唤醒它),所以整体的吞吐效率高。缺点是处于等待队列中的线程可能会夯死(试想恰好每次有新线程来,它恰巧都每次获取到锁,此时还在排队等待获取锁的线程就悲剧了 …..),或者等很久才会获得锁。
小结: 公平锁和非公平锁的差异在于是否按照申请锁的顺序来获取锁,非公平锁可能会出现有多个线程等待时,有一个人品特别的好的线程直接没有等待而直接获取到了锁的情况,他们各有利弊;Reentrantlock 在构造时默认是非公平的,可以通过参数控制。
[Ref:https://www.jianshu.com/p/620…]
Reentrantlock 与 AQS
java.util.concurrent.locks.AbstractQueuedSynchronizer
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers ....
ref: [http://www.cs.rochester.edu/wcms/research/systems/high_performance_synch/]
AQS 是 AbustactQueuedSynchronizer 的简称,它是一个 Java 提高的底层同步工具类,用一个 int 类型的变量表示同步状态,并提供了一系列的 CAS 操作来管理这个同步状态。AQS 的主要作用是为 Java 中的并发同步组件提供统一的底层支持.
同步工具类 Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask 等虽然各自都有不同特征,但是简单看一下源码,每个类内部都包含一个如下的内部子类定义:
abstract static class Sync extends AbstractQueuedSynchronizer
[ref:https://blog.csdn.net/zhangdo…]
AQS 提供了一种实现阻塞锁和一系列依赖 FIFO 等待队列的同步器的框架,如下图所示。
同步队列是 AQS 很重要的组成部分,它是一个双端队列,遵循 FIFO 原则,主要作用是用来存放在锁上阻塞的线程,当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个 Node 节点 add 到同步队列的尾部,队列的头节点是成功获取锁的节点,当头节点线程是否锁时,会唤醒后面的节点并释放当前头节点的引用。
Queue 中节点 Node 的状态机流转机制是 AQS 的核心:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
.......
}
AQS 为一系列同步器依赖于一个单独的原子变量(state)的同步器提供了一个非常有用的基础。子类们必须定义改变 state 变量的 protected 方法,这些方法定义了 state 是如何被获取或释放的。鉴于此,本类中的其他方法执行所有的排队和阻塞机制。子类也可以维护其他的 state 变量,但是为了保证同步,必须原子地操作这些变量。
AbstractQueuedSynchronizer 中对 state 的操作是原子的,且不能被继承。所有的同步机制的实现均依赖于对改变量的原子操作。为了实现不同的同步机制,我们需要创建一个非共有的(non-public internal)扩展了 AQS 类的内部辅助类来实现相应的同步逻辑。AbstractQueuedSynchronizer 并不实现任何同步接口,它提供了一些可以被具体实现类直接调用的一些原子操作方法来重写相应的同步逻辑。AQS 同时提供了互斥模式(exclusive)和共享模式(shared)两种不同的同步逻辑。一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如ReadWriteLock
。接下来将详细介绍 AbstractQueuedSynchronizer 的提供的一些具体实现方法。
state 状态
AbstractQueuedSynchronizer 维护了一个 volatile int 类型的变量,用户表示当前同步状态。volatile 虽然不能保证操作的原子性,但是保证了当前变量 state 的可见性。至于 volatile 的具体语义,可以参考相关文章。state 的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
这三种叫做均是原子操作,其中 compareAndSetState 的实现依赖于 Unsafe 的 compareAndSwapInt()方法。代码实现如下:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {return state;}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {state = newState;}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
自定义资源共享方式
AQS 定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如 ReentrantLock)和 Share(共享,多个线程可同时执行,如 Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队 / 唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。
[Ref:https://www.jianshu.com/p/da9…]
ReentrantLock 重入锁的基本原理是判断上次获取锁的线程是否为当前线程,如果是则可再次进入临界区,如果不是,则阻塞。
由于 ReentrantLock 是基于 AQS 实现的,底层通过操作同步状态来获取锁.,下面看一下非公平锁的实现逻辑:
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 通过 AQS 获取同步状态
int c = getState();
// 同步状态为 0,说明临界区处于无锁状态,if (c == 0) {
// 修改同步状态,即加锁
if (compareAndSetState(0, acquires)) {
// 将当前线程设置为锁的 owner
setExclusiveOwnerThread(current);
return true;
}
}
// 如果临界区处于锁定状态,且上次获取锁的线程为当前线程
else if (current == getExclusiveOwnerThread()) {
// 则递增同步状态
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
读写锁
Java 提供了一个基于 AQS 到读写锁实现 ReentrantReadWriteLock,该读写锁到实现原理是:将同步变量 state 按照高 16 位和低 16 位进行拆分,高 16 位表示读锁,低 16 位表示写锁。
写锁的获取与释放
写锁是一个独占锁,所以我们看一下 ReentrantReadWriteLock 中 tryAcquire(arg)的实现:
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
上述代码的处理流程已经非常清晰:
获取同步状态,并从中分离出低 16 为的写锁状态
如果同步状态不为 0,说明存在读锁或写锁
如果存在读锁(c!=0 && w == 0),则不能获取写锁(保证写对读的可见性)如果当前线程不是上次获取写锁的线程,则不能获取写锁(写锁为独占锁)如果以上判断均通过,则在低 16 为写锁同步状态上利用 CAS 进行修改(增加写锁同步状态,实现可重入)将当前线程设置为写锁的获取线程
写锁的释放过程与独占锁基本相同:
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
在释放的过程中,不断减少读锁同步状态,只为同步状态为 0 时,写锁完全释放。
读锁的获取与释放
读锁是一个共享锁,获取读锁的步骤如下:
获取当前同步状态
计算高 16 为读锁状态 + 1 后的值
如果大于能够获取到的读锁的最大值,则抛出异常
如果存在写锁并且当前线程不是写锁的获取者,则获取读锁失败
如果上述判断都通过,则利用 CAS 重新设置读锁的同步状态
读锁的获取步骤与写锁类似,即不断的释放写锁状态,直到为 0 时,表示没有线程获取读锁。
[ref:https://blog.csdn.net/zhangdo…]
Unsafe 类
我们可以看到在 AbstractQueuedSynchronizer 中用到了 Unsafe 类:
/**
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) {throw new Error(ex); }
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* CAS next field of a node.
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
}
实现线程安全且高效地设置队列中节点 Node 的等待状态 waitStatus .
比如说, ReentrantLock 中非公平锁的实现代码中:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
compareAndSetState(0, 1) 方法:
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Unsafe 类里面的实现方法基本都是 native 方法:
...
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
...
参考资料
https://www.javarticles.com/2012/09/reentrant-lock.html
https://www.javarticles.com/2016/06/java-reentrantlock-interruption-example.html
Kotlin 开发者社区
国内第一 Kotlin 开发者社区公众号,主要分享、交流 Kotlin 编程语言、Spring Boot、Android、React.js/Node.js、函数式编程、编程思想等相关主题。