Synchronized实现机制
synchronized是Java并发同步开发的根本技术,是Java语言层面提供的线程间同步伎俩。咱们编写如下一段代码:
public class SyncTest {
private static final Object lock = new Object();
public static void main(String[] args) {
int a = 0;
synchronized (lock) {
a++;
}
System.out.println("Result: " + a);
}
}
针对其中同步局部咱们会看到如下字节码:
monitorenter
iinc 1 by 1
aload_2
monitorexit
这其实是javac在编译时将synchronized同步块的前后插入montor进入和退出的字节码指令,因而,咱们想摸索synchronized的实现机制,就须要摸索monitorenter和monitorexit指令的执行过程。
咱们先看一下monitorenter的代码实现:
void TemplateTable::monitorenter() {
...
// store object __ movptr(Address(rmon, BasicObjectLock::obj_offset_in_bytes()), rax);
// 跳转执行 lock_object 函数 __ lock_object(rmon);
...
}
这里咱们仍然只给出重点代码局部,代码比拟长,后面有很多指令时初始化执行环境的,最初重点会跳转lock_object函数,同样这个函数也是有不同CPU平台实现的,咱们还是看X86平台的:
// Lock object // // Args: // rdx, c_rarg1: BasicObjectLock to be used for locking // // Kills: // rax, rbx void InterpreterMacroAssembler::lock_object(Register lock_reg) {
if (UseHeavyMonitors) {
call_VM(noreg,
CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
lock_reg);
} else {
// 执行锁优化的逻辑局部,例如:锁粗化,锁打消等等 // 如果所有优化措施都执行了,还是须要进入 monitor,就执行如下,其实和下面那个 if 分支是一样的 // Call the runtime routine for slow case call_VM(noreg,
CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
lock_reg);
}
}
这里咱们无论如何最终都是执行InterpreterRuntime::monitorenter函数,这个函数不仅仅是模板执行器会调用,解释执行器也会执行这个,所以定义在InterpreterRuntime类下:
// Synchronization // // The interpreter's synchronization code is factored out so that it can // be shared by method invocation and synchronized blocks. //%note synchronization_3 //%note monitor_1 IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
Handle h_obj(thread, elem->obj());
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
IRT_END
下面的代码,在原始的代码根底上有删减,保留了外围要害逻辑。即依据UseBiasedLocking这个变量别离执行fast_enter或者slow_enter的逻辑。
同步锁优化解决
同步锁优化解决即fast_enter执行解决,上面是fast_enter函数的定义:
// Fast Monitor Enter/Exit // This the fast monitor enter. The interpreter and compiler use // some assembly copies of this code. Make sure update those code // if the following function is changed. The implementation is // extremely sensitive to race condition. Be careful. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock,
bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter(obj, lock, THREAD);
}
这里开始还是要判断UseBiasedLocking,如果是true的话,就针对开始执行优化逻辑,否则还是会fall back到slow_enter的,是不是感觉判断UseBiasedLocking有点啰嗦?其实不是的,因为这个函数在很多中央都会调用,因而判断是必须的。为了不便接下来的代码剖析,上面我要放出OpenJDK官网wiki中针对锁优化的原理图:
在解释原理图之前,须要介绍一下Java对象的内存布局,因为下面图中的实现原理就是充分利用java对象的头实现的。Java对象在内存的构造根本分为:对象头和对象体,其中对象头存储对象特色信息,对象体寄存对象数据局部。
在OpenJDK工程中,有一个子工程叫jol,全名:java object layout,简略易懂,就是java对象布局的意思。这是一个工具库,通过这个库能够获取JVM中对象布局信息,上面咱们看一下一个简略的例子(这也是官网给的例子):
public class JOLTest {
public static void main(String[] args) {
System.out.println(VM.current().details());
System.out.println(ClassLayout.parseClass(A.class).toPrintable());
}
public static class A {
boolean f;
}
}
这里通过JOL的接口来获取类A的对象内存布局,执行之后输入如下内容:
# Running 64-bit HotSpot VM. # Using compressed oop with 3-bit shift. # Using compressed klass with 3-bit shift. # WARNING | Compressed references base/shifts are guessed by the experiment! # WARNING | Therefore, computed addresses are just guesses, and ARE NOT RELIABLE. # WARNING | Make sure to attach Serviceability Agent to get the reliable addresses. # Objects are 8 bytes aligned. # Field sizes by type: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes] # Array element sizes: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes]
JOLTest$A object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 12 (object header) N/A
12 1 boolean A.f N/A
13 3 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 3 bytes external = 3 bytes total
这里咱们看到输入了很多信息,下面咱们类A的对象布局如下:12byte的对象头+1byte的对象体+3byte的填充局部。
从JVM的代码咱们能够看到一个对象的头部定义:
volatile markOop _mark;
union _metadata {
Klass* _klass;
narrowKlass _compressed_klass;
} _metadata;
能够看到分为两局部:第一局部就是mark局部,官网称之为mark word,第二个是klass的类型指针,指向这个对象的类对象。这里的mark word长度是一个零碎字宽,在64bit零碎上就是8个字节,从下面的日志咱们能够看到虚拟机默认应用了compressed klass,因而第二局部的union就是narrowKlass类型的,如果咱们持续看下narrowKlass的定义就晓得这是个32bit的unsigned int类型,因而将占用4个字节,所以对象的头部长度整体为12字节。
Mark word用于存储对象本身运行时的数据,如hash code、GC分代年龄等等信息,他是实现偏差锁的要害。而且思考到虚拟机的空间效率,mark word被设计成一个非固定数据结构以便在极小的空间内存存储尽量多的信息,他会依据对象的状态复用本人的存储空间。因而,mark word内存布局定义在32bit和64bit零碎中对象的布局不同:
// 32 bits:
// --------
// hash:25 ------------>| age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:23 epoch:2 age:4 biased_lock:1 lock:2 (biased object)
// size:32 ------------------------------------------>| (CMS free block)
// PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object)
//
// 64 bits:
// --------
// unused:25 hash:31 -->| unused:1 age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:54 epoch:2 unused:1 age:4 biased_lock:1 lock:2 (biased object)
// PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object)
// size:64 ----------------------------------------------------->| (CMS free block)
//
// unused:25 hash:31 -->| cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && normal object)
// JavaThread*:54 epoch:2 cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && biased object)
// narrowOop:32 unused:24 cms_free:1 unused:4 promo_bits:3 ----->| (COOPs && CMS promoted object)
// unused:21 size:35 -->| cms_free:1 unused:7 ------------------>| (COOPs && CMS free block)
咱们次要关注其中的normal object和biased object两局部的头定义
biased_lock | lock | 状态 |
---|---|---|
1 | 01 | 可偏置、但未锁且未偏置 |
0 | 01 | 已解锁、不可偏置 |
— | 00 | 轻量级锁定 |
— | 01 | 重量级锁定 |
偏置锁即这个锁首先假如本人被偏差的线程所持有。在单个线程间断持有锁时,偏差锁就起作用了。如果一个线程连续不断地获取锁,那么获取的过程中如果没有产生竞态,那么能够跳过沉重的同步过程,间接就取得锁执行,这样能够大大提高性能。偏差锁是JDK1.6中引入的一项锁优化伎俩,它的目标就是打消数据在无争用的状况下的同步操作,进一步提高运行性能。这里也波及了轻量级锁,轻量级锁也是JDK1.6引入的一个锁优化机制,所谓轻量级是绝对于应用操作系统互斥机制来实现传统锁而言的,在这个角度上,传统的形式及时重量级锁,乐观锁,会导致线程的状态切换,而线程状态的切换是一个相当重量级的操作。
inflate成为重锁
先看一下slow_enter函数:
// Interpreter/Compiler Slow Case // This routine is used to handle interpreter/compiler slow case // We don't need to use fast path here, because it must have been // failed in the interpreter/compiler code. void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
if (mark->is_neutral()) {
// Anticipate successful CAS -- the ST of the displaced mark must // be visible <= the ST performed by the CAS. lock->set_displaced_header(mark);
if (mark == obj()->cas_set_mark((markOop) lock, mark)) {
TEVENT(slow_enter: release stacklock);
return;
}
// Fall through to inflate() ... } else if (mark->has_locker() &&
THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
// The object header will never be displaced to this lock, // so it does not matter what the value is, except that it // must be non-zero to avoid looking like a re-entrant lock, // and must not look locked either. lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD,
obj(),
inflate_cause_monitor_enter)->enter(THREAD);
}
这里的执行逻辑比拟简洁,次要执行下面OpenJDK wiki中的锁优化逻辑。首先会判断对象锁是否为中立的(neutral):
bool is_neutral() const {
// 这里的 biased_lock_mask_in_place 是 7 // unlocked_value 值是 1 return (mask_bits(value(), biased_lock_mask_in_place) == unlocked_value);
}
它的判断规范是将mark word中最初7个bit进行掩码运算,将失去的值和1进行比拟,如果等于1就示意对象时中立的,也就是没有被任何线程锁定,否则就算失败。至于为什么是最初7个bit,是因为无论是一般对象还是可偏置的对象,最初7个bit的格局是固定的(其余几种模式的对象格局不同)。
再回到下面的slow_enter函数,如果判断为中立的,也就是没有锁定的话,会将以后的mark word,存储到lock指针指向的对象中,这里的lock指针指向的就是下面提到的lock record。而后进行一个十分重要的操作,就是通过院子cas操作将这个lock指针装置到对象mark word中,如果装置胜利就示意以后线程取得了这个对象锁,能够间接返回执行同步代码块,否则就会fall back到收缩锁中。
下面是判断对象是否为中立的逻辑,如果当线程进来发现以后的对象锁曾经被另一个线程锁定了。这个时候就会执行到else逻辑中:
if (mark->has_locker() &&
THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
如果发现以后对象曾经锁定,须要判断下是不是以后线程本人锁定了,因为在sysnchronized中可能再一次synchronized,这种状况下间接返回即可。
如果下面的两个判断都失败了,也就是对象被锁定,并且锁定线程不是以后线程,这个时候须要执行下面OpenJDK wiki中的inflate收缩逻辑。所谓收缩,就是依据以后锁对象,生成一个ObjectMonitor对象,这个对象中保留了sychronized阻塞的队列,以及实现了不同的队列调度策略,上面咱们重点看一下ObjectMonitor中的enter逻辑
ObjectMonitor enter
在enter函数中,有很多判断和优化执行的逻辑,然而外围和通过Enterl函数理论进入队列将以后线程阻塞:
void ObjectMonitor::EnterI(TRAPS) {
...
// Try the lock - TATAS if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
...
// We try one round of spinning *before* enqueueing Self. // // If the _owner is ready but OFFPROC we could use a YieldTo() // operation to donate the remainder of this thread's quantum // to the owner. This has subtle but beneficial affinity // effects.
if (TrySpin (Self) > 0) {
assert(_owner == Self, "invariant");
assert(_succ != Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
...
ObjectWaiter node(Self);
// Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt;
for (;;) {
node._next = nxt = _cxq;
if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;
// Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
}
...
for (;;) {
if (TryLock(Self) > 0) break;
...
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::replace_if_null(Self, &_Responsible);
}
// park self if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT(Inflated enter - park TIMED);
Self->_ParkEvent->park((jlong) recheckInterval);
// Increase the recheckInterval, but clamp the value. recheckInterval *= 8;
if (recheckInterval > MAX_RECHECK_INTERVAL) {
recheckInterval = MAX_RECHECK_INTERVAL;
}
} else {
TEVENT(Inflated enter - park UNTIMED);
Self->_ParkEvent->park();
}
if (TryLock(Self) > 0) break;
...
}
...
if (_Responsible == Self) {
_Responsible = NULL;
}
// 善后处理,比方将以后线程从期待队列 CXQ 中移除 ...
}
照例只保留了重要代码。咱们先看TryLock办法:
int ObjectMonitor::TryLock(Thread * Self) {
void * own = _owner;
if (own != NULL) return 0;
if (Atomic::replace_if_null(Self, &_owner)) {
// Either guarantee _recursions == 0 or set _recursions = 0. assert(_recursions == 0, "invariant");
assert(_owner == Self, "invariant");
return 1;
}
// The lock had been free momentarily, but we lost the race to the lock. // Interference -- the CAS failed. // We can either return -1 or retry. // Retry doesn't make as much sense because the lock was just acquired. return -1;
}
这里逻辑很简略,次要是尝试通过cas操作将_owner字段设置为Self,其中_owner示意以后ObjectMonitor对象锁持有的线程指针,Self指向以后执行的线程。如果设置上了,示意以后线程取得了锁,否则没有取得。
在下面的Enterl函数中,咱们看到TryLock前后间断执行了两次,而且代码判断逻辑一样,为什么要这样?这其实是为了在入队阻塞线程之前的最初查看,避免线程无谓的进行状态切换。然而为什么执行两次?其实第二次执行的正文曾经阐明了,这么做有一些奥妙的亲和力影响,即如果在过来一段时间内,某个线程尝试获取某个资源始终失败,那么零碎在前面会偏向于将资源分配给这个线程。
如果两次TryLock之后依然失败,那么只能乖乖入队阻塞了,在入队之前须要创立一个ObjectWaiter对象,这个对象将以后线程的对象(留神是JavaThread对象)包裹起来,咱们看一下ObjectWaiter的定义:
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };
enum Sorted { PREPEND, APPEND, SORTED };
ObjectWaiter * volatile _next;
ObjectWaiter * volatile _prev;
Thread* _thread;
jlong _notifier_tid;
ParkEvent * _event;
volatile int _notified;
volatile TStates TState;
Sorted _Sorted; // List placement disposition bool _active; // Contention monitoring is enabled public:
ObjectWaiter(Thread* thread);
void wait_reenter_begin(ObjectMonitor *mon);
void wait_reenter_end(ObjectMonitor *mon);
};
_next和_prev代表这是一个双向队列实现期待队列(然而实际上,入队操作并没有造成双向链表,真正造成双向链表是在exit的时候)。node节点创立结束之后会执行如下入队操作
// Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt;
for (;;) {
node._next = nxt = _cxq;
if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;
// Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
}
正文中阐明,咱们是要将以后节点放到CXQ队列的头部,将节点的next指针通过cas操作指向_cxq指针就实现了入队操作。如果入队胜利,则退出以后循环,否则再次尝试lock,因为在高并发状态下,cas锁定可能会出错失败。
如果下面的循环退出了,就示意以后线程的node节点曾经顺利进入CXQ队列了,那么接下来须要进入另一个循环:
for (;;) {
if (TryLock(Self) > 0) break;
...
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::replace_if_null(Self, &_Responsible);
}
// park self if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT(Inflated enter - park TIMED);
Self->_ParkEvent->park((jlong) recheckInterval);
// Increase the recheckInterval, but clamp the value. recheckInterval *= 8;
if (recheckInterval > MAX_RECHECK_INTERVAL) {
recheckInterval = MAX_RECHECK_INTERVAL;
}
} else {
TEVENT(Inflated enter - park UNTIMED);
Self->_ParkEvent->park();
}
if (TryLock(Self) > 0) break;
...
}
这个循环的逻辑比较简单:
- 尝试获取锁
- park以后锁
- 再次尝试获取锁
重点在于第2步,咱们晓得synchronzed如果获取对象锁失败的话,会导致以后线程被阻塞,那么这个阻塞操作就是在这里实现的,这里须要留神的是,这里须要判断一下_Responible指针,如果这个指针为null,示意之前对象锁还没有期待线程,也就是说以后线程是第一个期待线程,这时候通过cas操作将_Responsible指向Self,示意以后线程是这个对象锁的期待线程。接下来,如果以后线程是期待线程,那么会执行一个简略的退却算法,进行一个短时间的阻塞期待。这个算法很简略,第一次期待1ms,第二次期待8ms,第三次期待64ms,以此类推,晓得期待时长的下限:MAX_RECHECK_INTERVAL,也就是说在synchronize在一个对象锁上的线程,如果他是第一个期待线程的话,那么他会不停的休眠,查看锁。反之,如果以后线程不是第一个期待线程,那么只能执行无限期的休眠,始终期待对象锁的exit函数执行唤醒才行。
ObjectMonitor exit
当一个线程取得对象锁胜利后,就能够执行自定义的同步代码块了。执行实现之后会执行到ObjectMonitor的exit函数中,开释以后对象锁,不便下一个线程来获取这个对象锁,上面咱们逐渐剖析exit的实现过程。
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
for (;;) {
...
ObjectWaiter * w = NULL;
int QMode = Knob_QMode;
if (QMode == 2 && _cxq != NULL) {
...
}
if (QMode == 3 && _cxq != NULL) {
...
}
if (QMode == 4 && _cxq != NULL) {
...
}
...
ExitEpilog(Self, w);
return;
}
}
exit函数的执行逻辑有两步:
- 依据Knob_QMode的值和_cxq是否为空执行不同策略
- 依据肯定策略唤醒期待队列的下一个线程
出队策略0——默认策略
在exit函数中首先是依据Knob_QMode的值执行不同执行不同逻辑,而Knob_QMode的默认值为0,它的作用次要用来指定在exit的时候EntryList和CXQ队列之间的唤醒关系,也就是说,当EntryList和CXQ中都有期待的线程时,因为exit之后只能有一个线程失去锁,这个时候抉择唤醒哪个队列中的线程是一个值得思考的事。而这里的默认策略就是0。
出队策略0代表CXQ队列后进先出,行将cxq指针赋予_EntryList,而后通过一个循环将本来单项链表的CXQ链表变成双向链表,不便前面针对CXQ链表进行查问,这时候,_EntryList就是CXQ。而后交由ExitEpilog唤醒
void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
assert(_owner == Self, "invariant");
// Exit protocol: // 1. ST _succ = wakee // 2. membar #loadstore|#storestore; // 2. ST _owner = NULL // 3. unpark(wakee)
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL;
ParkEvent * Trigger = Wakee->_event;
// Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again. // The thread associated with Wakee may have grabbed the lock and "Wakee" may be // out-of-scope (non-extant). Wakee = NULL;
// Drop the lock OrderAccess::release_store(&_owner, (void*)NULL);
OrderAccess::fence(); // ST _owner vs LD in unpark()
if (SafepointMechanism::poll(Self)) {
TEVENT(unpark before SAFEPOINT);
}
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
Trigger->unpark();
// Maintain stats and report events to JVMTI OM_PERFDATA_OP(Parks, inc());
}
即通过park event将期待的线程唤醒,而后执行unpark函数
void os::PlatformEvent::unpark() {
if (Atomic::xchg(1, &_event) >= 0) return;
int status = pthread_mutex_lock(_mutex);
int anyWaiters = _nParked;
status = pthread_mutex_unlock(_mutex);
if (anyWaiters != 0) {
status = pthread_cond_signal(_cond);
assert_status(status == 0, status, "cond_signal");
}
}
这里仍然是通过pthread的condition signal唤醒线程,后面线程休眠是通过condition wait实现的。
出队策略1
出队策略1即Knob_QMnode的值批改为1,这种模式下是先进先出,即FIFO队列行为。这种模式下的解决是先将CXQ队列reverse一下,而后再讲新的队头也就是原来的队尾赋值给_EntryList。而后按_EntryList进行唤醒。
出队策略2
出队策略2跟出队策略0类似,然而他是优先执行CXQ队列的操作,再执行_EntryList队列的操作。即优先按CXQ进行唤醒。
出队策略3和出队策略4
出队策略3和出队策略4都是简略的链接。出队策略3是将CXQ放在_EntryList之后,而出队策略4是将_EntryList放在CXQ之前。而后按新~~~~_EntryList进行唤醒。
发表回复