不晓得你有没有想过,那些去申请锁的线程都怎么了?有些可能申请到了锁,马上就能执行业务代码。然而如果有一个锁被很多个线程须要,那么这些线程是如何被解决的呢?

明天咱们走进synchronized 重量级锁,看看那些没有申请到锁的线程都怎么了。

ps: 如果你不想看剖析后果,能够拉到最初,开端有一张总结图,一图胜千言

之前文章剖析过synchroinzed中锁的优化,然而如果存在大量竞争的状况下,那么最终还是都会变成重量级锁。所以咱们这里开始间接剖析重量级锁的代码。

申请锁

在ObjectMonitor::enter函数中,有很多判断和优化执行的逻辑,然而外围还是通过EnterI函数理论进入队列将将以后线程阻塞

void ObjectMonitor::EnterI(TRAPS) {  Thread * const Self = THREAD;  // CAS尝试将以后线程设置为持有锁的线程  if (TryLock (Self) > 0) {    assert(_succ != Self, "invariant");    assert(_owner == Self, "invariant");    assert(_Responsible != Self, "invariant");    return;  }  // 通过自旋形式调用tryLock再次尝试,操作系统认为会有一些奥妙影响  if (TrySpin(Self) > 0) {    assert(_owner == Self, "invariant");    assert(_succ != Self, "invariant");    assert(_Responsible != Self, "invariant");    return;  }  ...  // 将以后线程构建成ObjectWaiter  ObjectWaiter node(Self);  Self->_ParkEvent->reset();  node._prev   = (ObjectWaiter *) 0xBAD;  node.TState  = ObjectWaiter::TS_CXQ;  ObjectWaiter * nxt;  for (;;) {    // 通过CAS形式将ObjectWaiter对象插入CXQ队列头部中    node._next = nxt = _cxq;    if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;    // 因为cxq扭转,导致CAS失败,这里进行tryLock重试    if (TryLock (Self) > 0) {      assert(_succ != Self, "invariant");      assert(_owner == Self, "invariant");      assert(_Responsible != Self, "invariant");      return;    }  }  // 阻塞以后线程  for (;;) {    if (TryLock(Self) > 0) break;    assert(_owner != Self, "invariant");    // park self    if (_Responsible == Self) {      Self->_ParkEvent->park((jlong) recheckInterval);      recheckInterval *= 8;      if (recheckInterval > MAX_RECHECK_INTERVAL) {        recheckInterval = MAX_RECHECK_INTERVAL;      }    } else {      Self->_ParkEvent->park();    }    ...        if (TryLock(Self) > 0) break;    ++nWakeups;    if (TrySpin(Self) > 0) break;    ...  }  ...  // Self曾经获取到锁了,须要将它从CXQ或者EntryList中移除  UnlinkAfterAcquire(Self, &node);  ...}
  1. 在入队之前,会调用tryLock尝试通过CAS操作将_owner(以后ObjectMonitor对象锁持有的线程指针)字段设置为Self(指向以后执行的线程),如果设置胜利,示意以后线程取得了锁,否则没有。
int ObjectMonitor::TryLock(Thread * Self) {  void * own = _owner;  if (own != NULL) return 0;  if (Atomic::replace_if_null(Self, &_owner)) {    return 1;  }  return -1;}
  1. 如果tryLock没有胜利,又会再次调用tryLock(trySpin中调用了tryLock)去尝试获取锁,因为这样能够通知操作系统我迫切需要这个资源,心愿能尽量调配给我。不过这种亲和力并不是肯定能失去保障的协定,只是一种踊跃的操作。
  2. 通过 ObjectWaiter对象将以后线程包裹起来,入到 CXQ 队列的头部
  3. 阻塞以后线程(通过pthread_cond_wait)
  4. 当线程被唤醒而获取了锁,调用UnlinkAfterAcquire办法将ObjectWaiter从CXQ或者EntryList中移除

外围数据结构

ObjectMonitor对象中保留了 sychronized 阻塞的线程的队列,以及实现了不同的队列调度策略,因而咱们有必须先来意识下这个对象的一些重要属性

class ObjectMonitor {  // mark word  volatile markOop _header;  // 指向领有线程或BasicLock的指针                   void * volatile _owner;   // monitor的先前所有者的线程ID  volatile jlong _previous_owner_tid;  // 重入次数,第一次为0  volatile intptr_t _recursions;  // 下一个被唤醒的线程  Thread * volatile _succ;  // 线程在进入或者从新进入时被阻塞的列表,由ObjectWaiter组成,相当于对线程的一个封装对象  ObjectWaiter * volatile _EntryList;  // CXQ队列存储的是enter的时候因为锁曾经被别的线程阻塞而进不来的线程  ObjectWaiter * volatile _cxq;  // 处于wait状态(调用了wait())的线程,会被退出到waitSet  ObjectWaiter * volatile _WaitSet;  // 省略其余属性以及办法}class ObjectWaiter : public StackObj { public:  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };  // 后一个节点  ObjectWaiter * volatile _next;  // 前一个节点  ObjectWaiter * volatile _prev;  // 线程  Thread*       _thread;  // 线程状态  volatile TStates TState; public:  ObjectWaiter(Thread* thread);};

看到ObjectWaiter中的_next和_prev你就会明确,这是应用了双向队列实现期待队列的的,然而实际上咱们下面的入队操作并没有造成双向列表,造成双向列表是在exit锁的时候。

wait

Java Object 类提供了一个基于 native 实现的 wait 和 notify 线程间通信的形式,JDK中wait/notify/notifyAll全副是通过native实现的,当然到了JVM,它的实现还是在 src/hotspot/share/runtime/objectMonitor.cpp 中。

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {   Thread * const Self = THREAD;  JavaThread *jt = (JavaThread *)THREAD;  ...  // 如果线程被中断,须要抛出异样  if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {    THROW(vmSymbols::java_lang_InterruptedException());    return;  }    jt->set_current_waiting_monitor(this);  // 结构 ObjectWaiter节点  ObjectWaiter node(Self);  node.TState = ObjectWaiter::TS_WAIT;  ...  // 将ObjectWaiter退出WaitSet的尾部  AddWaiter(&node);  // 让出锁  exit(true, Self);                       ...  // 调研park(),阻塞以后线程  if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {        // Intentionally empty  } else if (node._notified == 0) {    if (millis <= 0) {      Self->_ParkEvent->park();    } else {      ret = Self->_ParkEvent->park(millis);    }  }  ...}// 将node插入双向列表_WaitSet的尾部inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {  if (_WaitSet == NULL) {    _WaitSet = node;    node->_prev = node;    node->_next = node;  } else {    ObjectWaiter* head = _WaitSet;    ObjectWaiter* tail = head->_prev;    tail->_next = node;    head->_prev = node;    node->_next = head;    node->_prev = tail;  }

下面我把wait的次要办法逻辑列出来了,次要会执行以下步骤

  1. 首先判断以后线程是否被中断,如果被中断了须要抛出InterruptedException
  2. 如果没有被中断,则会应用以后线程结构ObjectWaiter节点,将其插入双向链表WaitSet的尾部
  3. 调用exit,让出锁(让出锁的逻辑会在前面剖析)
  4. 调用park(实际上是调用pthread_cond_wait)阻塞以后线程

notify

同样的notify的逻辑也是在ObjectMonitory.cpp中

void ObjectMonitor::notify(TRAPS) {  CHECK_OWNER();  // waitSet为空,间接返回  if (_WaitSet == NULL) {    TEVENT(Empty-Notify);    return;  }  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);    // 唤醒某个线程  INotify(THREAD);  OM_PERFDATA_OP(Notifications, inc(1));}

在notify中首先会判断waitSet是否为空,如果为空,示意没有线程在期待,则间接返回。否则则调用INotify办法。

notifyAll办法实际上是循环调用INotify
void ObjectMonitor::INotify(Thread * Self) {  // notify之前须要获取一个锁,保障并发平安  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");  // 移除并返回WaitSet中的第一个元素,比方之前waitSet中是1 <--> 2 <--> 3,当初是返回1,而后waitSet变成 2<-->3  ObjectWaiter * iterator = DequeueWaiter();  if (iterator != NULL) {       // Disposition - what might we do with iterator ?    // a.  add it directly to the EntryList - either tail (policy == 1)    //     or head (policy == 0).    // b.  push it onto the front of the _cxq (policy == 2).    // For now we use (b).    // 设置线程状态    iterator->TState = ObjectWaiter::TS_ENTER;    iterator->_notified = 1;    iterator->_notifier_tid = JFR_THREAD_ID(Self);    ObjectWaiter * list = _EntryList;    if (list != NULL) {      assert(list->_prev == NULL, "invariant");      assert(list->TState == ObjectWaiter::TS_ENTER, "invariant");      assert(list != iterator, "invariant");    }    // prepend to cxq    if (list == NULL) {      iterator->_next = iterator->_prev = NULL;      _EntryList = iterator;    } else {      iterator->TState = ObjectWaiter::TS_CXQ;      for (;;) {        // 将须要唤醒的node放到CXQ的头部        ObjectWaiter * front = _cxq;        iterator->_next = front;        if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {          break;        }      }    }    iterator->wait_reenter_begin(this);  }  // notify执行实现之后开释waitSet锁,留神这里并不是开释线程持有的锁  Thread::SpinRelease(&_WaitSetLock);}

notify的逻辑比较简单,就是将WaitSet的头节点从队列中移除,如果EntryList为空,则将出队节点放入到EntryList中,如果EntryList不为空,则将节点插入到CXQ列表的头节点。

须要留神的是,notify并没有开释锁,开释锁的逻辑是在exit中

exit

当一个线程取得对象锁胜利之后,就能够执行自定义的同步代码块了。执行实现之后会执行到 ObjectMonitor 的 exit 函数中,开释以后对象锁,不便下一个线程来获取这个锁。

void ObjectMonitor::exit(bool not_suspended, TRAPS) {  Thread * const Self = THREAD;  if (THREAD != _owner) {    // 锁的持有者是以后线程    if (THREAD->is_lock_owned((address) _owner)) {      assert(_recursions == 0, "invariant");      _owner = THREAD;      _recursions = 0;    } else {      assert(false, "Non-balanced monitor enter/exit! Likely JNI locking");      return;    }  }  // 重入次数减去1  if (_recursions != 0) {    _recursions--;        // this is simple recursive enter    return;  }  for (;;) {    ...    w = _EntryList;    // 如果entryList不为空,则将    if (w != NULL) {      assert(w->TState == ObjectWaiter::TS_ENTER, "invariant");      // 执行unpark,让出锁      ExitEpilog(Self, w);      return;    }    w = _cxq;    ...    _EntryList = w;    ObjectWaiter * q = NULL;    ObjectWaiter * p;    // 这里将_cxq或者说_EntryList从单向链表变成了一个双向链表    for (p = w; p != NULL; p = p->_next) {      guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");      p->TState = ObjectWaiter::TS_ENTER;      p->_prev = q;      q = p;    }    w = _EntryList;    if (w != NULL) {      guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");      // 执行unpark,让出锁      ExitEpilog(Self, w);      return;    }    ...  }  ...}void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {  // Exit protocol:  // 1. ST _succ = wakee  // 2. membar #loadstore|#storestore;  // 2. ST _owner = NULL  // 3. unpark(wakee)  _succ = Wakee->_thread;  ParkEvent * Trigger = Wakee->_event;  Wakee  = NULL;  // Drop the lock  OrderAccess::release_store(&_owner, (void*)NULL);  OrderAccess::fence();  ...    // 开释锁  Trigger->unpark();}

exit的逻辑还是比较简单的

  1. 如果以后是以后线程要让出锁,那么则查看其重入次数是否为0,不为0则将重入次数减去1,而后间接退出。
  2. 如果EntryList不为空,则将EntryList的头元素中的线程唤醒
  3. 将cxq指针赋值给EntryList,而后通过循环将cxq链表变成双向链表,而后调用ExitEpilog将CXQ链表的头结点唤醒(理论是通过pthread_cond_signal)

从这里之后,EntryList和CXQ就是同一个了,因为将CXQ赋值给了EntryList了。

须要留神的是这里唤醒的线程会继续执行文章结尾的EnterI办法,此时会将ObjectWaiter从EntryList或者CXQ中移除。

实战演示

下面的源码均是基于JDK12,JDK8中的代码对于exit和notify都还有其余策略(抉择哪个线程),而从JDK9开始就只保留了默认策略了。

所以上面的Java代码的运行后果无论是在jdk8还是jdk12,失去的后果都是一样的。

Object lock = new Object();Thread t1 = new Thread(() -> {  System.out.println("Thread 1 start!!!!!!");  synchronized (lock) {    try {      lock.wait();    } catch (Exception e) {    }    System.out.println("Thread 1 end!!!!!!");  }});Thread t2 = new Thread(() -> {  System.out.println("Thread 2 start!!!!!!");  synchronized (lock) {      try {        lock.wait();      } catch (Exception e) {      }      System.out.println("Thread 2 end!!!!!!");  }});Thread t3 = new Thread(() -> {  System.out.println("Thread 3 start!!!!!!");  synchronized (lock) {      try {        lock.wait();      } catch (Exception e) {      }      System.out.println("Thread 3 end!!!!!!");  }});Thread t4 = new Thread(() -> {  System.out.println("Thread 4 start!!!!!!");  synchronized (lock) {    try {      System.in.read();    } catch (Exception e) {    }    lock.notify();    lock.notify();    lock.notify();    System.out.println("Thread 4 end!!!!!!");  }});Thread t5 = new Thread(() -> {  System.out.println("Thread 5 start!!!!!!");  synchronized (lock) {      System.out.println("Thread 5 end!!!!!!");  }});Thread t6 = new Thread(() -> {  System.out.println("Thread 6 start!!!!!!");  synchronized (lock) {      System.out.println("Thread 6 end!!!!!!");  }});Thread t7 = new Thread(() -> {  System.out.println("Thread 7 start!!!!!!");  synchronized (lock) {      System.out.println("Thread 7 end!!!!!!");  }});t1.start();sleep_1_second();t2.start();sleep_1_second();t3.start();sleep_1_second();t4.start();sleep_1_second();t5.start();sleep_1_second();t6.start();sleep_1_second();t7.start();

下面的代码很简略,咱们来剖析一下。

线程1,2,3都调用了wait,所以会阻塞,而后WaitSet的链表构造如下:

线程4获取了锁,在期待一个输出

线程5,6,7也在期待锁,所以他们也会把阻塞,所以CXQ链表构造如下:

当线程4输出任意内容,并回车完结后(调用了其中的3个notify办法,但还未开释锁)

线程4让出锁之后,因为EntryList不为空,所以会先唤醒EntryList中的线程1,而后接下来会唤醒CXQ队列中的线程(前面你能够认为CXQ就是EntryList)

所以最终线程执行程序为 4 1 3 2 7 6 5,咱们的输入后果也能验证咱们的论断

Thread 1 start!!!!!!Thread 2 start!!!!!!Thread 3 start!!!!!!Thread 4 start!!!!!!Thread 5 start!!!!!!Thread 6 start!!!!!!Thread 7 start!!!!!!think123Thread 4 end!!!!!!Thread 1 end!!!!!!Thread 3 end!!!!!!Thread 2 end!!!!!!Thread 7 end!!!!!!Thread 6 end!!!!!!Thread 5 end!!!!!!

一图胜千言