共计 22427 个字符,预计需要花费 57 分钟才能阅读完成。
简介:写 JAVA 代码的同学都晓得,JAVA 里的锁有两大类,一类是 synchronized 锁,一类是 concurrent 包里的锁(JUC 锁)。其中 synchronized 锁是 JAVA 语言层面提供的能力,在此不开展,本文次要探讨 JUC 里的 ReentrantLock 锁。
作者 | 蒋冲
起源 | 阿里技术公众号
写 JAVA 代码的同学都晓得,JAVA 里的锁有两大类,一类是 synchronized 锁,一类是 concurrent 包里的锁(JUC 锁)。其中 synchronized 锁是 JAVA 语言层面提供的能力,在此不开展,本文次要探讨 JUC 里的 ReentrantLock 锁。
一 JDK 层
1 AbstractQueuedSynchronizer
ReentrantLock 的 lock(),unlock()等 API 其实依赖于外部的 Synchronizer(留神,不是 synchronized)来实现。Synchronizer 又分为 FairSync 和 NonfairSync,顾名思义是指偏心和非偏心。
当调用 ReentrantLock 的 lock 办法时,其实就只是简略地转交给 Synchronizer 的 lock()办法:
代码节选自:java.util.concurrent.locks.ReentrantLock.java
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {......}
public void lock() {sync.lock();
}
那么这个 sync 又是什么?咱们看到 Sync 继承自 AbstractQueueSynchronizer(AQS),AQS 是 concurrent 包的基石,AQS 自身并不实现任何同步接口(比方 lock,unlock,countDown 等等),然而它定义了一个并发资源管制逻辑的框架(使用了 template method 设计模式),它定义了 acquire 和 release 办法用于独占地(exclusive)获取和开释资源,以及 acquireShared 和 releaseShared 办法用于共享地获取和开释资源。比方 acquire/release 用于实现 ReentrantLock,而 acquireShared/releaseShared 用于实现 CountDownLacth,Semaphore。比方 acquire 的框架如下:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
整体逻辑是,先进行一次 tryAcquire,如果胜利了,就没啥事了,调用者继续执行本人前面的代码,如果失败,则执行 addWaiter 和 acquireQueued。其中 tryAcquire()须要子类依据本人的同步需要进行实现,而 acquireQueued() 和 addWaiter() 曾经由 AQS 实现。addWaiter 的作用是把以后线程退出到 AQS 外部同步队列的尾部,而 acquireQueued 的作用是当 tryAcquire()失败的时候阻塞以后线程。
addWaiter 的代码如下:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {// 创立节点, 设置关联线程和模式(独占或共享)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果尾节点不为空, 阐明同步队列曾经初始化过
if (pred != null) {
// 新节点的前驱节点设置为尾节点
node.prev = pred;
// 设置新节点为尾节点
if (compareAndSetTail(pred, node)) {
// 老的尾节点的后继节点设置为新的尾节点。所以同步队列是一个双向列表。pred.next = node;
return node;
}
}
// 如果尾节点为空, 阐明队列还未初始化, 须要初始化 head 节点并退出新节点
enq(node);
return node;
}
enq(node)的代码如下:
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 如果 tail 为空, 则新建一个 head 节点, 并且 tail 和 head 都指向这个 head 节点
// 队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 第二次循环进入这个分支,node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter 执行完结后,同步队列的构造如下所示:
acquireQueued 的代码如下:
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取以后 node 的前驱 node
final Node p = node.predecessor();
// 如果前驱 node 是 head node,阐明本人是第一个排队的线程,则尝试获锁
if (p == head && tryAcquire(arg)) {
// 把获锁胜利的以后节点变成 head node(哑节点)。setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
acquireQueued 的逻辑是:
判断本人是不是同步队列中的第一个排队的节点,则尝试进行加锁,如果胜利,则把本人变成 head node,过程如下所示:
如果本人不是第一个排队的节点或者 tryAcquire 失败,则调用 shouldParkAfterFailedAcquire,其次要逻辑是应用 CAS 将节点状态由 INITIAL 设置成 SIGNAL,示意以后线程阻塞期待 SIGNAL 唤醒。如果设置失败,会在 acquireQueued 办法中的死循环中持续重试,直至设置胜利,而后调用 parkAndCheckInterrupt 办法。parkAndCheckInterrupt 的作用是把以后线程阻塞挂起,期待唤醒。parkAndCheckInterrupt 的实现须要借助上层的能力,这是本文的重点,在下文中逐层论述。
2 ReentrantLock
上面就让咱们一起看看 ReentrantLock 是如何基于 AbstractQueueSynchronizer 实现其语义的。
ReentrantLock 外部应用的 FairSync 和 NonfairSync,它们都是 AQS 的子类,比方 FairSync 的次要代码如下:
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
AQS 中最重要的一个字段就是 state,锁和同步器的实现都是围绕着这个字段的批改开展的。AQS 能够实现各种不同的锁和同步器的起因之一就是,不同的锁或同步器依照本人的须要能够对同步状态的含意有不同的定义,并重写对应的 tryAcquire, tryRelease 或 tryAcquireshared, tryReleaseShared 等办法来操作同步状态。
咱们来看看 ReentrantLock 的 FairSync 的 tryAcquire 的逻辑:
- 如果此时 state(private volatile int state)是 0,那么就示意这个时候没有人占有锁。但因为是偏心锁,所以还要判断本人是不是首节点,而后才尝试把状态设置为 1,如果胜利的话,就胜利的占有了锁。compareAndSetState 也是通过 CAS 来实现。CAS 是原子操作,而且 state 的类型是 volatile,所以 state 的值是线程平安的。
- 如果此时状态不是 0,那么再判断以后线程是不是锁的 owner,如果是的话,则 state 递增,当 state 溢出时,会抛错。如果没溢出,则返回 true,示意胜利获取锁。
- 上述都不满足,则返回 false,获取锁失败。
至此,JAVA 层面的实现根本说分明了,小结一下,整个框架如下所示:
对于 unlock 的实现,限于篇幅,就不探讨了,下文重点剖析 lock 过程中是如何把以后线程阻塞挂起的,就是上图中的 unsafe.park()是如何实现的。
二 JVM 层
Unsafe.park 和 Unsafe.unpark 是 sun.misc.Unsafe 类的 native 办法,
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
这两个办法的实现是在 JVM 的 hotspot/src/share/vm/prims/unsafe.cpp 文件中,
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
#endif /* USDT2 */
if (event.should_commit()) {const oop obj = thread->current_park_blocker();
if (time == 0) {post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {if (isAbsolute != 0) {post_thread_park_event(&event, obj, min_jlong, time);
} else {post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
UNSAFE_END
外围是逻辑是 thread->parker()->park(isAbsolute != 0, time); 就是获取 java 线程的 parker 对象,而后执行它的 park 办法。每个 java 线程都有一个 Parker 实例,Parker 类是这样定义的:
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [2] ; // one for relative times and one for abs.
public: // TODO-FIXME: make dtor private
~PlatformParker() { guarantee (0, "invariant") ; }
public:
PlatformParker() {
int status;
status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
};
park 办法:
void Parker::park(bool isAbsolute, jlong time) {
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
if (Thread::is_interrupted(thread, false)) {return;}
// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {unpackTime(&absTime, isAbsolute, time);
}
//// 进入 safepoint region,更改线程为阻塞状态
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
// 如果线程被中断,或者尝试给互斥变量加锁时失败,比方被其它线程锁住了,间接返回
return;
}
// 到这里,意味着 pthread_mutex_trylock(_mutex)胜利
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
OrderAccess::fence();
return;
}
#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_cur_index = -1;
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {jt->java_suspend_self();
}
}
park 的思路:parker 外部有个关键字段_counter, 这个 counter 用来记录所谓的“permit”,当_counter 大于 0 时,意味着有 permit,而后就能够把_counter 设置为 0,就算是取得了 permit,能够持续运行前面的代码。如果此时_counter 不大于 0,则期待这个条件满足。
上面我具体来看看 park 的具体实现:
- 当调用 park 时,先尝试是否间接拿到“许可”,即_counter>0 时,如果胜利,则把_counter 设置为 0, 并返回。
- 如果不胜利,则把线程的状态设置成_thread_in_vm 并且_thread_blocked。_thread_in_vm 示意线程以后在 JVM 中执行,_thread_blocked 示意线程以后阻塞了。
- 拿到 mutex 之后,再次查看_counter 是不是 >0,如果是,则把_counter 设置为 0,unlock mutex 并返回
- 如果_counter 还是不大于 0,则判断期待的工夫是否等于 0,而后调用相应的 pthread_cond_wait 系列函数进行期待,如果期待返回(即有人进行 unpark,则 pthread_cond_signal 来告诉),则把_counter 设置为 0,unlock mutex 并返回。
所以实质上来讲,LockSupport.park 是通过 pthread 库的条件变量 pthread_cond_t 来实现的。上面咱们就来看看 pthread_cond_t 是怎么实现的。
三 GLIBC 层
pthread_cond_t 典型的用法如下:
#include < pthread.h>
#include < stdio.h>
#include < stdlib.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* 初始化互斥锁 */
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 初始化条件变量
void *thread1(void *);
void *thread2(void *);
int i=1;
int main(void)
{
pthread_t t_a;
pthread_t t_b;
pthread_create(&t_a,NULL,thread1,(void *)NULL);/* 创立过程 t_a*/
pthread_create(&t_b,NULL,thread2,(void *)NULL); /* 创立过程 t_b*/
pthread_join(t_b, NULL);/* 期待过程 t_b 完结 */
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
exit(0);
}
void *thread1(void *junk)
{for(i=1;i<=9;i++)
{pthread_mutex_lock(&mutex);//
if(i%3==0)
pthread_cond_signal(&cond);/* 条件扭转,发送信号,告诉 t_b 过程 */
else
printf("thead1:%d/n",i);
pthread_mutex_unlock(&mutex);//* 解锁互斥量 */
printf("Up Unlock Mutex/n");
sleep(1);
}
}
void *thread2(void *junk)
{while(i<9)
{pthread_mutex_lock(&mutex);
if(i%3!=0)
pthread_cond_wait(&cond,&mutex);/* 期待 */
printf("thread2:%d/n",i);
pthread_mutex_unlock(&mutex);
printf("Down Ulock Mutex/n");
sleep(1);
}
}
重点就是:无论是 pthread_cond_wait 还是 pthread_cond_signal 都必须得先 pthread_mutex_lock。如果没有这个爱护,可能会产生 race condition,漏掉信号。pthread_cond_wait()函数一进入 wait 状态就会主动 release mutex。当其余线程通过 pthread_cond_signal 或 pthread_cond_broadcast 把该线程唤醒,使 pthread_cond_wait()返回时,该线程又主动取得该 mutex。
整个过程如下图所示:
1 pthread_mutex_lock
例如,在 Linux 中,应用了称为 Futex(疾速用户空间互斥锁的简称)的零碎。
在此零碎中,对用户空间中的互斥变量执行原子增量和测试操作。
如果操作结果表明锁上没有争用,则对 pthread_mutex_lock 的调用将返回,而无需将上下文切换到内核中,因而获取互斥量的操作能够十分快。
仅当检测到争用时,零碎调用 (称为 futex) 才会产生,并且上下文切换到内核中,这会使调用过程进入睡眠状态,直到开释互斥锁为止。
还有很多更多的细节,尤其是对于牢靠和 / 或优先级继承互斥,但这就是它的实质。
nptl/pthread_mutex_lock.c
int
PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex)
{
/* See concurrency notes regarding mutex type which is loaded from __kind
in struct __pthread_mutex_s in sysdeps/nptl/bits/thread-shared-types.h. */
unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);
LIBC_PROBE (mutex_entry, 1, mutex);
if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP
| PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
return __pthread_mutex_lock_full (mutex);
if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
{FORCE_ELISION (mutex, goto elision);
simple:
/* Normal mutex. */
LLL_MUTEX_LOCK_OPTIMIZED (mutex);
assert (mutex->__data.__owner == 0);
}
#if ENABLE_ELISION_SUPPORT
else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP))
{elision: __attribute__((unused))
/* This case can never happen on a system without elision,
as the mutex type initialization functions will not
allow to set the elision flags. */
/* Don't record owner or users for elision case. This is a
tail call. */
return LLL_MUTEX_LOCK_ELISION (mutex);
}
#endif
else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
== PTHREAD_MUTEX_RECURSIVE_NP, 1))
{
/* Recursive mutex. */
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
/* Check whether we already hold the mutex. */
if (mutex->__data.__owner == id)
{
/* Just bump the counter. */
if (__glibc_unlikely (mutex->__data.__count + 1 == 0))
/* Overflow of the counter. */
return EAGAIN;
++mutex->__data.__count;
return 0;
}
/* We have to get the mutex. */
LLL_MUTEX_LOCK_OPTIMIZED (mutex);
assert (mutex->__data.__owner == 0);
mutex->__data.__count = 1;
}
else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
== PTHREAD_MUTEX_ADAPTIVE_NP, 1))
{if (LLL_MUTEX_TRYLOCK (mutex) != 0)
{
int cnt = 0;
int max_cnt = MIN (max_adaptive_count (),
mutex->__data.__spins * 2 + 10);
do
{if (cnt++ >= max_cnt)
{LLL_MUTEX_LOCK (mutex);
break;
}
atomic_spin_nop ();}
while (LLL_MUTEX_TRYLOCK (mutex) != 0);
mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
}
assert (mutex->__data.__owner == 0);
}
else
{pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);
/* Check whether we already hold the mutex. */
if (__glibc_unlikely (mutex->__data.__owner == id))
return EDEADLK;
goto simple;
}
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
/* Record the ownership. */
mutex->__data.__owner = id;
#ifndef NO_INCR
++mutex->__data.__nusers;
#endif
LIBC_PROBE (mutex_acquired, 1, mutex);
return 0;
}
pthread_mutex_t 的定义如下:
typedef union
{
struct __pthread_mutex_s
{
int __lock;
unsigned int __count;
int __owner;
unsigned int __nusers;
int __kind;
int __spins;
__pthread_list_t __list;
} __data;
......
} pthread_mutex_t;
其中__kind 字段是指锁的类型,取值如下:
/* Mutex types. */
enum
{
PTHREAD_MUTEX_TIMED_NP,
PTHREAD_MUTEX_RECURSIVE_NP,
PTHREAD_MUTEX_ERRORCHECK_NP,
PTHREAD_MUTEX_ADAPTIVE_NP
#if defined __USE_UNIX98 || defined __USE_XOPEN2K8
,
PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP,
PTHREAD_MUTEX_RECURSIVE = PTHREAD_MUTEX_RECURSIVE_NP,
PTHREAD_MUTEX_ERRORCHECK = PTHREAD_MUTEX_ERRORCHECK_NP,
PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL
#endif
#ifdef __USE_GNU
/* For compatibility. */
, PTHREAD_MUTEX_FAST_NP = PTHREAD_MUTEX_TIMED_NP
#endif
};
其中:
- PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是一般锁。
- PTHREAD_MUTEX_RECURSIVE_NP,可重入锁,容许同一个线程对同一个锁胜利取得屡次,并通过屡次 unlock 解锁。
- PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程反复申请同一个锁,则返回 EDEADLK,否则与 PTHREAD_MUTEX_TIMED_NP 类型雷同。
- PTHREAD_MUTEX_ADAPTIVE_NP,自适应锁,自旋锁与一般锁的混合。
mutex 默认用的是 PTHREAD_MUTEX_TIMED_NP,所以会走到 LLL_MUTEX_LOCK_OPTIMIZED,这是个宏:
# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)
lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
/* The single-threaded optimization is only valid for private
mutexes. For process-shared mutexes, the mutex could be in a
shared mapping, so synchronization with another process is needed
even without any threads. If the lock is already marked as
acquired, POSIX requires that pthread_mutex_lock deadlocks for
normal mutexes, so skip the optimization in that case as
well. */
int private = PTHREAD_MUTEX_PSHARED (mutex);
if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)
mutex->__data.__lock = 1;
else
lll_lock (mutex->__data.__lock, private);
}
因为不是 LLL_PRIVATE,所以走 lll_lock,lll_lock 也是个宏:
#define lll_lock(futex, private) \
__lll_lock (&(futex), private)
留神这里呈现了 futex,本文的后续次要就是围绕它开展的。#define __lll_lock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))
其中,atomic_compare_and_exchange_bool_acq 是尝试通过原子操作尝试将__futex(就是 mutex->__data.__lock)从 0 变为 1,如果胜利就间接返回了,如果失败,则调用__lll_lock_wait,代码如下:
void
__lll_lock_wait (int *futex, int private)
{if (atomic_load_relaxed (futex) == 2)
goto futex;
while (atomic_exchange_acquire (futex, 2) != 0)
{
futex:
LIBC_PROBE (lll_lock_wait, 1, futex);
futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2. */
}
}
在这里先要阐明一下,pthread 将 futex 的锁状态定义为 3 种:
- 0,代表以后锁闲暇无锁,能够进行疾速上锁,不须要进内核。
- 1,代表有线程持有以后锁,如果这时有其它线程须要上锁,就必须标记 futex 为“锁竞争”,而后通过 futex 零碎调用进内核把以后线程挂起。
- 2,代表锁竞争,有其它线程将要或正在内核的 futex 零碎中排队期待锁。
所以上锁失败进入到__lll_lock_wait 这里后,先判断 futex 是不是等于 2,如果是则阐明大家都在排队,你也排着吧(直跳转到 futex_wait)。如果不等于 2,那阐明你是第一个来竞争的人,把 futex 设置成 2,通知前面来的人要排队,而后本人言传身教先排队。
futex_wait 本质上就是调用 futex 零碎调用。在第四节,咱们就来仔细分析这个零碎调用。
2 pthread_cond_wait
实质也是走到 futex 零碎调用,限于篇幅就不开展了。
四 内核层
为什么要有 futex,它解决什么问题?何时退出内核的?
简略来讲,futex 的解决思路是:在无竞争的状况下操作齐全在 user space 进行,不须要零碎调用,仅在产生竞争的时候进入内核去实现相应的解决(wait 或者 wake up)。所以说,futex 是一种 user mode 和 kernel mode 混合的同步机制,须要两种模式单干能力实现,futex 变量位于 user space,而不是内核对象,futex 的代码也分为 user mode 和 kernel mode 两局部,无竞争的状况下在 user mode,产生竞争时则通过 sys_futex 零碎调用进入 kernel mode 进行解决。
用户态的局部曾经在后面解说了,本节重点解说 futex 在内核局部的实现。
futex 设计了三个根本数据结构:futex_hash_bucket,futex_key,futex_q。
struct futex_hash_bucket {
atomic_t waiters;
spinlock_t lock;
struct plist_head chain;
} ____cacheline_aligned_in_smp;
struct futex_q {
struct plist_node list;
struct task_struct *task;
spinlock_t *lock_ptr;
union futex_key key; // 惟一标识 uaddr 的 key 值
struct futex_pi_state *pi_state;
struct rt_mutex_waiter *rt_waiter;
union futex_key *requeue_pi_key;
u32 bitset;
};
union futex_key {
struct {
unsigned long pgoff;
struct inode *inode;
int offset;
} shared;
struct {
unsigned long address;
struct mm_struct *mm;
int offset;
} private;
struct {
unsigned long word;
void *ptr;
int offset;
} both;
};
其实还有个 struct __futex_data,如下所示,这个
static struct {
struct futex_hash_bucket *queues;
unsigned long hashsize;
} __futex_data __read_mostly __aligned(2*sizeof(long));
#define futex_queues (__futex_data.queues)
#define futex_hashsize (__futex_data.hashsize)
在 futex 初始化的时候(futex_init),会确定 hashsize,比方 24 核 cpu 时,hashsize = 8192。而后依据这个 hashsize 调用 alloc_large_system_hash 调配数组空间,并初始化数组元素里的相干字段,比方 plist_head, lock。static int __init futex_init(void)
{
unsigned int futex_shift;
unsigned long i;
#if CONFIG_BASE_SMALL
futex_hashsize = 16;
#else
futex_hashsize = roundup_pow_of_two(256 * num_possible_cpus());
#endif
futex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),
futex_hashsize, 0,
futex_hashsize < 256 ? HASH_SMALL : 0,
&futex_shift, NULL,
futex_hashsize, futex_hashsize);
futex_hashsize = 1UL << futex_shift;
futex_detect_cmpxchg();
for (i = 0; i < futex_hashsize; i++) {atomic_set(&futex_queues[i].waiters, 0);
plist_head_init(&futex_queues[i].chain);
spin_lock_init(&futex_queues[i].lock);
}
return 0;
}
这些数据结构之间的关系如下所示:
脑子里有了数据结构,流程就容易了解了。futex_wait 的总体流程如下:
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;
if (!bitset)
return -EINVAL;
q.bitset = bitset;
if (abs_time) {
to = &timeout;
hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
CLOCK_REALTIME : CLOCK_MONOTONIC,
HRTIMER_MODE_ABS);
hrtimer_init_sleeper(to, current);
hrtimer_set_expires_range_ns(&to->timer, *abs_time,
current->timer_slack_ns);
}
retry:
/*
* Prepare to wait on uaddr. On success, holds hb lock and increments
* q.key refs.
*/
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)
goto out;
/* queue_me and wait for wakeup, timeout, or a signal. */
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
/* unqueue_me() drops q.key ref */
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current))
goto retry;
ret = -ERESTARTSYS;
if (!abs_time)
goto out;
restart = ¤t->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = *abs_time;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
ret = -ERESTART_RESTARTBLOCK;
out:
if (to) {hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
函数 futex_wait_setup 次要做两件事,一是对 uaddr 进行 hash,找到 futex_hash_bucket 并获取它下面的自旋锁,二是判断 *uaddr 是否为预期值。如果不相等则会立刻返回,由用户态持续 trylock。
*
* futex_wait_setup() - Prepare to wait on a futex
* @uaddr: the futex userspace address
* @val: the expected value
* @flags: futex flags (FLAGS_SHARED, etc.)
* @q: the associated futex_q
* @hb: storage for hash_bucket pointer to be returned to caller
*
* Setup the futex_q and locate the hash_bucket. Get the futex value and
* compare it with the expected value. Handle atomic faults internally.
* Return with the hb lock held and a q.key reference on success, and unlocked
* with no q.key reference on failure.
*
* Return:
* - 0 - uaddr contains val and hb has been locked;
* - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
retry:
// 初始化 futex_q,把 uaddr 设置到 futex_key 的字段中,未来 futex_wake 时也是通过这个 key 来查找 futex。ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
// 依据 key 计算 hash,而后在数组里找到对应的 futex_hash_bucket
*hb = queue_lock(q);
// 原子地将 uaddr 的值读到 uval 中
ret = get_futex_value_locked(&uval, uaddr);
if (ret) {queue_unlock(*hb);
ret = get_user(uval, uaddr);
if (ret)
goto out;
if (!(flags & FLAGS_SHARED))
goto retry_private;
put_futex_key(&q->key);
goto retry;
}
// 如果以后 uaddr 指向的值不等于 val,即阐明其余过程批改了
//uaddr 指向的值,期待条件不再成立,不必阻塞间接返回。if (uval != val) {queue_unlock(*hb);
ret = -EWOULDBLOCK;
}
out:
if (ret)
put_futex_key(&q->key);
return ret;
}
而后调用 futex_wait_queue_me 把以后过程挂起:
/**
* futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
* @hb: the futex hash bucket, must be locked by the caller
* @q: the futex_q to queue up on
* @timeout: the prepared hrtimer_sleeper, or null for no timeout
*/
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
/*
* The task state is guaranteed to be set before another task can
* wake it. set_current_state() is implemented using smp_store_mb() and
* queue_me() calls spin_unlock() upon completion, both serializing
* access to the hash list and forcing another memory barrier.
*/
set_current_state(TASK_INTERRUPTIBLE);
queue_me(q, hb);
/* Arm the timer */
if (timeout)
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
/*
* If the timer has already expired, current will already be
* flagged for rescheduling. Only call schedule if there
* is no timeout, or if it has yet to expire.
*/
if (!timeout || timeout->task)
freezable_schedule();}
__set_current_state(TASK_RUNNING);
}
futex_wait_queue_me 次要做几件事:
- 将以后过程插入到期待队列,就是把 futex_q 挂到 futex_hash_bucket 上
- 启动定时工作
- 被动触发内核过程调度
五 总结
本文次要是对 JAVA 中的 ReentrantLock.lock 流程进行了自上而下的梳理。
原文链接
本文为阿里云原创内容,未经容许不得转载。