不晓得你有没有想过,那些去申请锁的线程都怎么了?有些可能申请到了锁,马上就能执行业务代码。然而如果有一个锁被很多个线程须要,那么这些线程是如何被解决的呢?
明天咱们走进 synchronized 重量级锁,看看那些没有申请到锁的线程都怎么了。
ps: 如果你不想看剖析后果, 能够拉到最初,开端有一张总结图, 一图胜千言
之前文章剖析过 synchroinzed 中锁的优化,然而如果存在大量竞争的状况下,那么最终还是都会变成重量级锁。所以咱们这里开始间接剖析重量级锁的代码。
申请锁
在 ObjectMonitor::enter 函数中,有很多判断和优化执行的逻辑,然而外围还是通过 EnterI 函数理论进入队列将将以后线程阻塞
void ObjectMonitor::EnterI(TRAPS) {
Thread * const Self = THREAD;
// CAS 尝试将以后线程设置为持有锁的线程
if (TryLock (Self) > 0) {assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
// 通过自旋形式调用 tryLock 再次尝试,操作系统认为会有一些奥妙影响
if (TrySpin(Self) > 0) {assert(_owner == Self, "invariant");
assert(_succ != Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
...
// 将以后线程构建成 ObjectWaiter
ObjectWaiter node(Self);
Self->_ParkEvent->reset();
node._prev = (ObjectWaiter *) 0xBAD;
node.TState = ObjectWaiter::TS_CXQ;
ObjectWaiter * nxt;
for (;;) {
// 通过 CAS 形式将 ObjectWaiter 对象插入 CXQ 队列头部中
node._next = nxt = _cxq;
if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;
// 因为 cxq 扭转,导致 CAS 失败,这里进行 tryLock 重试
if (TryLock (Self) > 0) {assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
}
// 阻塞以后线程
for (;;) {if (TryLock(Self) > 0) break;
assert(_owner != Self, "invariant");
// park self
if (_Responsible == Self) {Self->_ParkEvent->park((jlong) recheckInterval);
recheckInterval *= 8;
if (recheckInterval > MAX_RECHECK_INTERVAL) {recheckInterval = MAX_RECHECK_INTERVAL;}
} else {Self->_ParkEvent->park();
}
...
if (TryLock(Self) > 0) break;
++nWakeups;
if (TrySpin(Self) > 0) break;
...
}
...
// Self 曾经获取到锁了,须要将它从 CXQ 或者 EntryList 中移除
UnlinkAfterAcquire(Self, &node);
...
}
- 在入队之前, 会调用 tryLock 尝试通过 CAS 操作将_owner(以后 ObjectMonitor 对象锁持有的线程指针)字段设置为 Self(指向以后执行的线程), 如果设置胜利,示意以后线程取得了锁,否则没有。
int ObjectMonitor::TryLock(Thread * Self) {
void * own = _owner;
if (own != NULL) return 0;
if (Atomic::replace_if_null(Self, &_owner)) {return 1;}
return -1;
}
- 如果 tryLock 没有胜利,又会再次调用 tryLock(trySpin 中调用了 tryLock)去尝试获取锁,因为这样能够通知操作系统我迫切需要这个资源,心愿能尽量调配给我。不过这种亲和力并不是肯定能失去保障的协定,只是一种踊跃的操作。
- 通过 ObjectWaiter 对象将以后线程包裹起来,入到 CXQ 队列的头部
- 阻塞以后线程(通过 pthread_cond_wait)
- 当线程被唤醒而获取了锁,调用 UnlinkAfterAcquire 办法将 ObjectWaiter 从 CXQ 或者 EntryList 中移除
外围数据结构
ObjectMonitor 对象中保留了 sychronized 阻塞的线程的队列,以及实现了不同的队列调度策略,因而咱们有必须先来意识下这个对象的一些重要属性
class ObjectMonitor {
// mark word
volatile markOop _header;
// 指向领有线程或 BasicLock 的指针
void * volatile _owner;
// monitor 的先前所有者的线程 ID
volatile jlong _previous_owner_tid;
// 重入次数,第一次为 0
volatile intptr_t _recursions;
// 下一个被唤醒的线程
Thread * volatile _succ;
// 线程在进入或者从新进入时被阻塞的列表, 由 ObjectWaiter 组成, 相当于对线程的一个封装对象
ObjectWaiter * volatile _EntryList;
// CXQ 队列存储的是 enter 的时候因为锁曾经被别的线程阻塞而进不来的线程
ObjectWaiter * volatile _cxq;
// 处于 wait 状态 (调用了 wait()) 的线程,会被退出到 waitSet
ObjectWaiter * volatile _WaitSet;
// 省略其余属性以及办法
}
class ObjectWaiter : public StackObj {
public:
enum TStates {TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ};
// 后一个节点
ObjectWaiter * volatile _next;
// 前一个节点
ObjectWaiter * volatile _prev;
// 线程
Thread* _thread;
// 线程状态
volatile TStates TState;
public:
ObjectWaiter(Thread* thread);
};
看到 ObjectWaiter 中的_next 和_prev 你就会明确,这是应用了双向队列实现期待队列的的,然而 实际上咱们下面的入队操作并没有造成双向列表,造成双向列表是在 exit 锁的时候。
wait
Java Object 类提供了一个基于 native 实现的 wait 和 notify 线程间通信的形式,JDK 中 wait/notify/notifyAll 全副是通过 native 实现的,当然到了 JVM,它的实现还是在 src/hotspot/share/runtime/objectMonitor.cpp
中。
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD;
JavaThread *jt = (JavaThread *)THREAD;
...
// 如果线程被中断,须要抛出异样
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {THROW(vmSymbols::java_lang_InterruptedException());
return;
}
jt->set_current_waiting_monitor(this);
// 结构 ObjectWaiter 节点
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT;
...
// 将 ObjectWaiter 退出 WaitSet 的尾部
AddWaiter(&node);
// 让出锁
exit(true, Self);
...
// 调研 park(),阻塞以后线程
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {// Intentionally empty} else if (node._notified == 0) {if (millis <= 0) {Self->_ParkEvent->park();
} else {ret = Self->_ParkEvent->park(millis);
}
}
...
}
// 将 node 插入双向列表_WaitSet 的尾部
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet;
ObjectWaiter* tail = head->_prev;
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
下面我把 wait 的次要办法逻辑列出来了,次要会执行以下步骤
- 首先判断以后线程是否被中断,如果被中断了须要抛出 InterruptedException
- 如果没有被中断,则会应用以后线程结构 ObjectWaiter 节点,将其插入双向链表 WaitSet 的尾部
- 调用 exit, 让出锁(让出锁的逻辑会在前面剖析)
- 调用 park(实际上是调用 pthread_cond_wait)阻塞以后线程
notify
同样的 notify 的逻辑也是在 ObjectMonitory.cpp 中
void ObjectMonitor::notify(TRAPS) {CHECK_OWNER();
// waitSet 为空,间接返回
if (_WaitSet == NULL) {TEVENT(Empty-Notify);
return;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
// 唤醒某个线程
INotify(THREAD);
OM_PERFDATA_OP(Notifications, inc(1));
}
在 notify 中首先会判断 waitSet 是否为空,如果为空,示意没有线程在期待,则间接返回。否则则调用 INotify 办法。
notifyAll 办法实际上是循环调用 INotify
void ObjectMonitor::INotify(Thread * Self) {
// notify 之前须要获取一个锁,保障并发平安
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
// 移除并返回 WaitSet 中的第一个元素, 比方之前 waitSet 中是 1 <--> 2 <--> 3, 当初是返回 1,而后 waitSet 变成 2<-->3
ObjectWaiter * iterator = DequeueWaiter();
if (iterator != NULL) {
// Disposition - what might we do with iterator ?
// a. add it directly to the EntryList - either tail (policy == 1)
// or head (policy == 0).
// b. push it onto the front of the _cxq (policy == 2).
// For now we use (b).
// 设置线程状态
iterator->TState = ObjectWaiter::TS_ENTER;
iterator->_notified = 1;
iterator->_notifier_tid = JFR_THREAD_ID(Self);
ObjectWaiter * list = _EntryList;
if (list != NULL) {assert(list->_prev == NULL, "invariant");
assert(list->TState == ObjectWaiter::TS_ENTER, "invariant");
assert(list != iterator, "invariant");
}
// prepend to cxq
if (list == NULL) {
iterator->_next = iterator->_prev = NULL;
_EntryList = iterator;
} else {
iterator->TState = ObjectWaiter::TS_CXQ;
for (;;) {
// 将须要唤醒的 node 放到 CXQ 的头部
ObjectWaiter * front = _cxq;
iterator->_next = front;
if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {break;}
}
}
iterator->wait_reenter_begin(this);
}
// notify 执行实现之后开释 waitSet 锁, 留神这里并不是开释线程持有的锁
Thread::SpinRelease(&_WaitSetLock);
}
notify 的逻辑比较简单,就是将 WaitSet 的头节点从队列中移除,如果 EntryList 为空,则将出队节点放入到 EntryList 中,如果 EntryList 不为空,则将节点插入到 CXQ 列表的头节点。
须要留神的是,notify 并没有开释锁,开释锁的逻辑是在 exit 中
exit
当一个线程取得对象锁胜利之后,就能够执行自定义的同步代码块了。执行实现之后会执行到 ObjectMonitor 的 exit 函数中,开释以后对象锁,不便下一个线程来获取这个锁。
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * const Self = THREAD;
if (THREAD != _owner) {
// 锁的持有者是以后线程
if (THREAD->is_lock_owned((address) _owner)) {assert(_recursions == 0, "invariant");
_owner = THREAD;
_recursions = 0;
} else {assert(false, "Non-balanced monitor enter/exit! Likely JNI locking");
return;
}
}
// 重入次数减去 1
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
return;
}
for (;;) {
...
w = _EntryList;
// 如果 entryList 不为空,则将
if (w != NULL) {assert(w->TState == ObjectWaiter::TS_ENTER, "invariant");
// 执行 unpark, 让出锁
ExitEpilog(Self, w);
return;
}
w = _cxq;
...
_EntryList = w;
ObjectWaiter * q = NULL;
ObjectWaiter * p;
// 这里将_cxq 或者说_EntryList 从单向链表变成了一个双向链表
for (p = w; p != NULL; p = p->_next) {guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
p->TState = ObjectWaiter::TS_ENTER;
p->_prev = q;
q = p;
}
w = _EntryList;
if (w != NULL) {guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");
// 执行 unpark, 让出锁
ExitEpilog(Self, w);
return;
}
...
}
...
}
void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Wakee->_thread;
ParkEvent * Trigger = Wakee->_event;
Wakee = NULL;
// Drop the lock
OrderAccess::release_store(&_owner, (void*)NULL);
OrderAccess::fence();
...
// 开释锁
Trigger->unpark();}
exit 的逻辑还是比较简单的
- 如果以后是以后线程要让出锁,那么则查看其重入次数是否为 0,不为 0 则将重入次数减去 1,而后间接退出。
- 如果 EntryList 不为空,则将 EntryList 的头元素中的线程唤醒
- 将 cxq 指针赋值给 EntryList,而后通过循环将 cxq 链表变成双向链表,而后调用 ExitEpilog 将 CXQ 链表的头结点唤醒(理论是通过 pthread_cond_signal)
从这里之后,EntryList 和 CXQ 就是同一个了,因为将 CXQ 赋值给了 EntryList 了。
须要留神的是这里唤醒的线程会继续执行文章结尾的 EnterI 办法,此时会将 ObjectWaiter 从 EntryList 或者 CXQ 中移除。
实战演示
下面的源码均是基于 JDK12,JDK8 中的代码对于 exit 和 notify 都还有其余策略(抉择哪个线程),而从 JDK9 开始就只保留了默认策略了。
所以上面的 Java 代码的运行后果无论是在 jdk8 还是 jdk12, 失去的后果都是一样的。
Object lock = new Object();
Thread t1 = new Thread(() -> {System.out.println("Thread 1 start!!!!!!");
synchronized (lock) {
try {lock.wait();
} catch (Exception e) { }
System.out.println("Thread 1 end!!!!!!");
}
});
Thread t2 = new Thread(() -> {System.out.println("Thread 2 start!!!!!!");
synchronized (lock) {
try {lock.wait();
} catch (Exception e) { }
System.out.println("Thread 2 end!!!!!!");
}
});
Thread t3 = new Thread(() -> {System.out.println("Thread 3 start!!!!!!");
synchronized (lock) {
try {lock.wait();
} catch (Exception e) { }
System.out.println("Thread 3 end!!!!!!");
}
});
Thread t4 = new Thread(() -> {System.out.println("Thread 4 start!!!!!!");
synchronized (lock) {
try {System.in.read();
} catch (Exception e) { }
lock.notify();
lock.notify();
lock.notify();
System.out.println("Thread 4 end!!!!!!");
}
});
Thread t5 = new Thread(() -> {System.out.println("Thread 5 start!!!!!!");
synchronized (lock) {System.out.println("Thread 5 end!!!!!!");
}
});
Thread t6 = new Thread(() -> {System.out.println("Thread 6 start!!!!!!");
synchronized (lock) {System.out.println("Thread 6 end!!!!!!");
}
});
Thread t7 = new Thread(() -> {System.out.println("Thread 7 start!!!!!!");
synchronized (lock) {System.out.println("Thread 7 end!!!!!!");
}
});
t1.start();
sleep_1_second();
t2.start();
sleep_1_second();
t3.start();
sleep_1_second();
t4.start();
sleep_1_second();
t5.start();
sleep_1_second();
t6.start();
sleep_1_second();
t7.start();
下面的代码很简略,咱们来剖析一下。
线程 1,2,3 都调用了 wait, 所以会阻塞,而后 WaitSet 的链表构造如下:
线程 4 获取了锁,在期待一个输出
线程 5,6,7 也在期待锁,所以他们也会把阻塞,所以 CXQ 链表构造如下:
当线程 4 输出任意内容,并回车完结后(调用了其中的 3 个 notify 办法,但还未开释锁)
线程 4 让出锁之后,因为 EntryList 不为空,所以会先唤醒 EntryList 中的线程 1, 而后接下来会唤醒 CXQ 队列中的线程(前面你能够认为 CXQ 就是 EntryList)
所以最终线程执行程序为 4 1 3 2 7 6 5
, 咱们的输入后果也能验证咱们的论断
Thread 1 start!!!!!!
Thread 2 start!!!!!!
Thread 3 start!!!!!!
Thread 4 start!!!!!!
Thread 5 start!!!!!!
Thread 6 start!!!!!!
Thread 7 start!!!!!!
think123
Thread 4 end!!!!!!
Thread 1 end!!!!!!
Thread 3 end!!!!!!
Thread 2 end!!!!!!
Thread 7 end!!!!!!
Thread 6 end!!!!!!
Thread 5 end!!!!!!
一图胜千言