关于java:Java并发基石篇中

43次阅读

共计 14851 个字符,预计需要花费 38 分钟才能阅读完成。

Synchronized 实现机制

synchronized 是 Java 并发同步开发的根本技术,是 Java 语言层面提供的线程间同步伎俩。咱们编写如下一段代码:

public class SyncTest {private static final Object lock = new Object();

    public static void main(String[] args) {
        int a = 0;
        synchronized (lock) {a++;}
        System.out.println("Result:" + a);
    }
}

针对其中同步局部咱们会看到如下字节码:

monitorenter
iinc 1 by 1
aload_2
monitorexit

这其实是 javac 在编译时将 synchronized 同步块的前后插入 montor 进入和退出的字节码指令,因而,咱们想摸索 synchronized 的实现机制,就须要摸索 monitorenter 和 monitorexit 指令的执行过程。

咱们先看一下 monitorenter 的代码实现:

void TemplateTable::monitorenter() {
    ...
      // store object     __ movptr(Address(rmon, BasicObjectLock::obj_offset_in_bytes()), rax);
    // 跳转执行 lock_object 函数     __ lock_object(rmon);
    ...
    }

这里咱们仍然只给出重点代码局部,代码比拟长,后面有很多指令时初始化执行环境的,最初重点会跳转 lock_object 函数,同样这个函数也是有不同 CPU 平台实现的,咱们还是看 X86 平台的:

// Lock object // // Args: //      rdx, c_rarg1: BasicObjectLock to be used for locking // // Kills: //      rax, rbx void InterpreterMacroAssembler::lock_object(Register lock_reg) {if (UseHeavyMonitors) {
        call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
    } else {
        // 执行锁优化的逻辑局部,例如:锁粗化,锁打消等等         // 如果所有优化措施都执行了,还是须要进入 monitor,就执行如下,其实和下面那个 if 分支是一样的         // Call the runtime routine for slow case         call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
    }

}

这里咱们无论如何最终都是执行 InterpreterRuntime::monitorenter 函数,这个函数不仅仅是模板执行器会调用,解释执行器也会执行这个,所以定义在 InterpreterRuntime 类下:

// Synchronization // // The interpreter's synchronization code is factored out so that it can // be shared by method invocation and synchronized blocks. //%note synchronization_3 //%note monitor_1 IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  Handle h_obj(thread, elem->obj());
  if (UseBiasedLocking) {// Retry fast entry if bias is revoked to avoid unnecessary inflation     ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
IRT_END

下面的代码,在原始的代码根底上有删减,保留了外围要害逻辑。即依据 UseBiasedLocking 这个变量别离执行 fast_enter 或者 slow_enter 的逻辑。

同步锁优化解决

同步锁优化解决即 fast_enter 执行解决,上面是 fast_enter 函数的定义:

//  Fast Monitor Enter/Exit // This the fast monitor enter. The interpreter and compiler use // some assembly copies of this code. Make sure update those code // if the following function is changed. The implementation is // extremely sensitive to race condition. Be careful. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock,
                                    bool attempt_rebias, TRAPS) {if (UseBiasedLocking) {if (!SafepointSynchronize::is_at_safepoint()) {BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {return;}
    } else {assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
  }

  slow_enter(obj, lock, THREAD);
}

这里开始还是要判断 UseBiasedLocking,如果是 true 的话,就针对开始执行优化逻辑,否则还是会 fall back 到 slow_enter 的,是不是感觉判断 UseBiasedLocking 有点啰嗦?其实不是的,因为这个函数在很多中央都会调用,因而判断是必须的。为了不便接下来的代码剖析,上面我要放出 OpenJDK 官网 wiki 中针对锁优化的原理图:

在解释原理图之前,须要介绍一下 Java 对象的内存布局,因为下面图中的实现原理就是充分利用 java 对象的头实现的。Java 对象在内存的构造根本分为:对象头和对象体,其中对象头存储对象特色信息,对象体寄存对象数据局部。
在 OpenJDK 工程中,有一个子工程叫 jol,全名:java object layout,简略易懂,就是 java 对象布局的意思。这是一个工具库,通过这个库能够获取 JVM 中对象布局信息,上面咱们看一下一个简略的例子(这也是官网给的例子):

public class JOLTest {public static void main(String[] args) {System.out.println(VM.current().details());
        System.out.println(ClassLayout.parseClass(A.class).toPrintable());
    }

    public static class A {boolean f;}
}

这里通过 JOL 的接口来获取类 A 的对象内存布局,执行之后输入如下内容:

# Running 64-bit HotSpot VM. # Using compressed oop with 3-bit shift. # Using compressed klass with 3-bit shift. # WARNING | Compressed references base/shifts are guessed by the experiment! # WARNING | Therefore, computed addresses are just guesses, and ARE NOT RELIABLE. # WARNING | Make sure to attach Serviceability Agent to get the reliable addresses. # Objects are 8 bytes aligned. # Field sizes by type: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes] # Array element sizes: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes] 
JOLTest$A object internals:
 OFFSET  SIZE      TYPE DESCRIPTION                               VALUE
      0    12           (object header)                           N/A
     12     1   boolean A.f                                       N/A
     13     3           (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 3 bytes external = 3 bytes total

这里咱们看到输入了很多信息,下面咱们类 A 的对象布局如下:12byte 的对象头 +1byte 的对象体 +3byte 的填充局部。

从 JVM 的代码咱们能够看到一个对象的头部定义:

volatile markOop _mark;
  union _metadata {
    Klass*      _klass;
    narrowKlass _compressed_klass;
  } _metadata;

能够看到分为两局部:第一局部就是 mark 局部,官网称之为 mark word,第二个是 klass 的类型指针,指向这个对象的类对象。这里的 mark word 长度是一个零碎字宽,在 64bit 零碎上就是 8 个字节,从下面的日志咱们能够看到虚拟机默认应用了 compressed klass,因而第二局部的 union 就是 narrowKlass 类型的,如果咱们持续看下 narrowKlass 的定义就晓得这是个 32bit 的 unsigned int 类型,因而将占用 4 个字节,所以对象的头部长度整体为 12 字节。

Mark word 用于存储对象本身运行时的数据,如 hash code、GC 分代年龄等等信息,他是实现偏差锁的要害。而且思考到虚拟机的空间效率,mark word 被设计成一个非固定数据结构以便在极小的空间内存存储尽量多的信息,他会依据对象的状态复用本人的存储空间。因而,mark word 内存布局定义在 32bit 和 64bit 零碎中对象的布局不同:

//  32 bits:
//  --------
//             hash:25 ------------>| age:4    biased_lock:1 lock:2 (normal object)
//             JavaThread*:23 epoch:2 age:4    biased_lock:1 lock:2 (biased object)
//             size:32 ------------------------------------------>| (CMS free block)
//             PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object)
//
//  64 bits:
//  --------
//  unused:25 hash:31 -->| unused:1   age:4    biased_lock:1 lock:2 (normal object)
//  JavaThread*:54 epoch:2 unused:1   age:4    biased_lock:1 lock:2 (biased object)
//  PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object)
//  size:64 ----------------------------------------------------->| (CMS free block)
//
//  unused:25 hash:31 -->| cms_free:1 age:4    biased_lock:1 lock:2 (COOPs && normal object)
//  JavaThread*:54 epoch:2 cms_free:1 age:4    biased_lock:1 lock:2 (COOPs && biased object)
//  narrowOop:32 unused:24 cms_free:1 unused:4 promo_bits:3 ----->| (COOPs && CMS promoted object)
//  unused:21 size:35 -->| cms_free:1 unused:7 ------------------>| (COOPs && CMS free block)

咱们次要关注其中的 normal object 和 biased object 两局部的头定义

biased_lock lock 状态
1 01 可偏置、但未锁且未偏置
0 01 已解锁、不可偏置
00 轻量级锁定
01 重量级锁定

偏置锁即这个锁首先假如本人被偏差的线程所持有。在单个线程间断持有锁时,偏差锁就起作用了。如果一个线程连续不断地获取锁,那么获取的过程中如果没有产生竞态,那么能够跳过沉重的同步过程,间接就取得锁执行,这样能够大大提高性能。偏差锁是 JDK1.6 中引入的一项锁优化伎俩,它的目标就是打消数据在无争用的状况下的同步操作,进一步提高运行性能。这里也波及了轻量级锁,轻量级锁也是 JDK1.6 引入的一个锁优化机制,所谓轻量级是绝对于应用操作系统互斥机制来实现传统锁而言的,在这个角度上,传统的形式及时重量级锁,乐观锁,会导致线程的状态切换,而线程状态的切换是一个相当重量级的操作。

inflate 成为重锁

先看一下 slow_enter 函数:

// Interpreter/Compiler Slow Case // This routine is used to handle interpreter/compiler slow case // We don't need to use fast path here, because it must have been // failed in the interpreter/compiler code. void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");

  if (mark->is_neutral()) {// Anticipate successful CAS -- the ST of the displaced mark must     // be visible <= the ST performed by the CAS.     lock->set_displaced_header(mark);
    if (mark == obj()->cas_set_mark((markOop) lock, mark)) {TEVENT(slow_enter: release stacklock);
      return;
    }
    // Fall through to inflate() ...} else if (mark->has_locker() &&
             THREAD->is_lock_owned((address)mark->locker())) {assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);
    return;
  }

  // The object header will never be displaced to this lock,   // so it does not matter what the value is, except that it   // must be non-zero to avoid looking like a re-entrant lock,   // and must not look locked either.   lock->set_displaced_header(markOopDesc::unused_mark());
  ObjectSynchronizer::inflate(THREAD,
                              obj(),
                              inflate_cause_monitor_enter)->enter(THREAD);
}

这里的执行逻辑比拟简洁,次要执行下面 OpenJDK wiki 中的锁优化逻辑。首先会判断对象锁是否为中立的(neutral):

bool is_neutral()  const {// 这里的 biased_lock_mask_in_place 是 7   // unlocked_value 值是 1   return (mask_bits(value(), biased_lock_mask_in_place) == unlocked_value); 
}

它的判断规范是将 mark word 中最初 7 个 bit 进行掩码运算,将失去的值和 1 进行比拟,如果等于 1 就示意对象时中立的,也就是没有被任何线程锁定,否则就算失败。至于为什么是最初 7 个 bit,是因为无论是一般对象还是可偏置的对象,最初 7 个 bit 的格局是固定的(其余几种模式的对象格局不同)。
再回到下面的 slow_enter 函数,如果判断为中立的,也就是没有锁定的话,会将以后的 mark word,存储到 lock 指针指向的对象中,这里的 lock 指针指向的就是下面提到的 lock record。而后进行一个十分重要的操作,就是通过院子 cas 操作将这个 lock 指针装置到对象 mark word 中,如果装置胜利就示意以后线程取得了这个对象锁,能够间接返回执行同步代码块,否则就会 fall back 到收缩锁中。

下面是判断对象是否为中立的逻辑,如果当线程进来发现以后的对象锁曾经被另一个线程锁定了。这个时候就会执行到 else 逻辑中:

if (mark->has_locker() &&
             THREAD->is_lock_owned((address)mark->locker())) {assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);
    return;
}

如果发现以后对象曾经锁定,须要判断下是不是以后线程本人锁定了,因为在 sysnchronized 中可能再一次 synchronized,这种状况下间接返回即可。

如果下面的两个判断都失败了,也就是对象被锁定,并且锁定线程不是以后线程,这个时候须要执行下面 OpenJDK wiki 中的 inflate 收缩逻辑。所谓收缩,就是依据以后锁对象,生成一个 ObjectMonitor 对象,这个对象中保留了 sychronized 阻塞的队列,以及实现了不同的队列调度策略,上面咱们重点看一下 ObjectMonitor 中的 enter 逻辑

ObjectMonitor enter

在 enter 函数中,有很多判断和优化执行的逻辑,然而外围和通过 Enterl 函数理论进入队列将以后线程阻塞:

void ObjectMonitor::EnterI(TRAPS) {
  ...
  // Try the lock - TATAS   if (TryLock (Self) > 0) {assert(_succ != Self, "invariant");
    assert(_owner == Self, "invariant");
    assert(_Responsible != Self, "invariant");
    return;
  }
  ...
  // We try one round of spinning *before* enqueueing Self.   //   // If the _owner is ready but OFFPROC we could use a YieldTo()   // operation to donate the remainder of this thread's quantum   // to the owner.  This has subtle but beneficial affinity   // effects. 
  if (TrySpin (Self) > 0) {assert(_owner == Self, "invariant");
    assert(_succ != Self, "invariant");
    assert(_Responsible != Self, "invariant");
    return;
  }
  ...
  ObjectWaiter node(Self);
  // Push "Self" onto the front of the _cxq.   // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.   // Note that spinning tends to reduce the rate at which threads   // enqueue and dequeue on EntryList|cxq.   ObjectWaiter * nxt;
  for (;;) {
    node._next = nxt = _cxq;
    if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;

    // Interference - the CAS failed because _cxq changed.  Just retry.     // As an optional optimization we retry the lock.     if (TryLock (Self) > 0) {assert(_succ != Self, "invariant");
      assert(_owner == Self, "invariant");
      assert(_Responsible != Self, "invariant");
      return;
    }
  }
  ...
  for (;;) {if (TryLock(Self) > 0) break;
    ...
    if ((SyncFlags & 2) && _Responsible == NULL) {Atomic::replace_if_null(Self, &_Responsible);
    }
    // park self     if (_Responsible == Self || (SyncFlags & 1)) {TEVENT(Inflated enter - park TIMED);
      Self->_ParkEvent->park((jlong) recheckInterval);
      // Increase the recheckInterval, but clamp the value.       recheckInterval *= 8;
      if (recheckInterval > MAX_RECHECK_INTERVAL) {recheckInterval = MAX_RECHECK_INTERVAL;}
    } else {TEVENT(Inflated enter - park UNTIMED);
      Self->_ParkEvent->park();}

    if (TryLock(Self) > 0) break;
    ...
  }
  ...
  if (_Responsible == Self) {_Responsible = NULL;}
  // 善后处理,比方将以后线程从期待队列 CXQ 中移除   ...
}

照例只保留了重要代码。咱们先看 TryLock 办法:

int ObjectMonitor::TryLock(Thread * Self) {
  void * own = _owner;
  if (own != NULL) return 0;
  if (Atomic::replace_if_null(Self, &_owner)) {// Either guarantee _recursions == 0 or set _recursions = 0.     assert(_recursions == 0, "invariant");
    assert(_owner == Self, "invariant");
    return 1;
  }
  // The lock had been free momentarily, but we lost the race to the lock.   // Interference -- the CAS failed.   // We can either return -1 or retry.   // Retry doesn't make as much sense because the lock was just acquired.   return -1;
}

这里逻辑很简略,次要是尝试通过 cas 操作将_owner 字段设置为 Self,其中_owner 示意以后 ObjectMonitor 对象锁持有的线程指针,Self 指向以后执行的线程。如果设置上了,示意以后线程取得了锁,否则没有取得。

在下面的 Enterl 函数中,咱们看到 TryLock 前后间断执行了两次,而且代码判断逻辑一样,为什么要这样?这其实是为了在入队阻塞线程之前的最初查看,避免线程无谓的进行状态切换。然而为什么执行两次?其实第二次执行的正文曾经阐明了,这么做有一些奥妙的亲和力影响,即如果在过来一段时间内,某个线程尝试获取某个资源始终失败,那么零碎在前面会偏向于将资源分配给这个线程。

如果两次 TryLock 之后依然失败,那么只能乖乖入队阻塞了,在入队之前须要创立一个 ObjectWaiter 对象,这个对象将以后线程的对象(留神是 JavaThread 对象)包裹起来,咱们看一下 ObjectWaiter 的定义:

class ObjectWaiter : public StackObj {
 public:
  enum TStates {TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ};
  enum Sorted  {PREPEND, APPEND, SORTED};
  ObjectWaiter * volatile _next;
  ObjectWaiter * volatile _prev;
  Thread*       _thread;
  jlong         _notifier_tid;
  ParkEvent *   _event;
  volatile int  _notified;
  volatile TStates TState;
  Sorted        _Sorted;           // List placement disposition   bool          _active;           // Contention monitoring is enabled  public:
  ObjectWaiter(Thread* thread);

  void wait_reenter_begin(ObjectMonitor *mon);
  void wait_reenter_end(ObjectMonitor *mon);
};

_next 和_prev 代表这是一个双向队列实现期待队列(然而实际上,入队操作并没有造成双向链表,真正造成双向链表是在 exit 的时候)。node 节点创立结束之后会执行如下入队操作

// Push "Self" onto the front of the _cxq.   // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.   // Note that spinning tends to reduce the rate at which threads   // enqueue and dequeue on EntryList|cxq.   ObjectWaiter * nxt;
  for (;;) {
    node._next = nxt = _cxq;
    if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;

    // Interference - the CAS failed because _cxq changed.  Just retry.     // As an optional optimization we retry the lock.     if (TryLock (Self) > 0) {assert(_succ != Self, "invariant");
      assert(_owner == Self, "invariant");
      assert(_Responsible != Self, "invariant");
      return;
    }
  }

正文中阐明,咱们是要将以后节点放到 CXQ 队列的头部,将节点的 next 指针通过 cas 操作指向_cxq 指针就实现了入队操作。如果入队胜利,则退出以后循环,否则再次尝试 lock,因为在高并发状态下,cas 锁定可能会出错失败。

如果下面的循环退出了,就示意以后线程的 node 节点曾经顺利进入 CXQ 队列了,那么接下来须要进入另一个循环:

for (;;) {if (TryLock(Self) > 0) break;
    ...
    if ((SyncFlags & 2) && _Responsible == NULL) {Atomic::replace_if_null(Self, &_Responsible);
    }
    // park self     if (_Responsible == Self || (SyncFlags & 1)) {TEVENT(Inflated enter - park TIMED);
      Self->_ParkEvent->park((jlong) recheckInterval);
      // Increase the recheckInterval, but clamp the value.       recheckInterval *= 8;
      if (recheckInterval > MAX_RECHECK_INTERVAL) {recheckInterval = MAX_RECHECK_INTERVAL;}
    } else {TEVENT(Inflated enter - park UNTIMED);
      Self->_ParkEvent->park();}

    if (TryLock(Self) > 0) break;
    ...
}

这个循环的逻辑比较简单:

  1. 尝试获取锁
  2. park 以后锁
  3. 再次尝试获取锁

重点在于第 2 步,咱们晓得 synchronzed 如果获取对象锁失败的话,会导致以后线程被阻塞,那么这个阻塞操作就是在这里实现的,这里须要留神的是,这里须要判断一下_Responible 指针,如果这个指针为 null,示意之前对象锁还没有期待线程,也就是说以后线程是第一个期待线程,这时候通过 cas 操作将_Responsible 指向 Self,示意以后线程是这个对象锁的期待线程。接下来,如果以后线程是期待线程,那么会执行一个简略的退却算法,进行一个短时间的阻塞期待。这个算法很简略,第一次期待 1ms,第二次期待 8ms,第三次期待 64ms,以此类推,晓得期待时长的下限:MAX_RECHECK_INTERVAL,也就是说在 synchronize 在一个对象锁上的线程,如果他是第一个期待线程的话,那么他会不停的休眠,查看锁。反之,如果以后线程不是第一个期待线程,那么只能执行无限期的休眠,始终期待对象锁的 exit 函数执行唤醒才行。

ObjectMonitor exit

当一个线程取得对象锁胜利后,就能够执行自定义的同步代码块了。执行实现之后会执行到 ObjectMonitor 的 exit 函数中,开释以后对象锁,不便下一个线程来获取这个对象锁,上面咱们逐渐剖析 exit 的实现过程。

void ObjectMonitor::exit(bool not_suspended, TRAPS) {for (;;) {
    ...
        ObjectWaiter * w = NULL;
    int QMode = Knob_QMode;

    if (QMode == 2 && _cxq != NULL) {...}

    if (QMode == 3 && _cxq != NULL) {...}

    if (QMode == 4 && _cxq != NULL) {...}
    ...
    ExitEpilog(Self, w);
    return;
  }
}

exit 函数的执行逻辑有两步:

  1. 依据 Knob_QMode 的值和_cxq 是否为空执行不同策略
  2. 依据肯定策略唤醒期待队列的下一个线程

出队策略 0——默认策略

在 exit 函数中首先是依据 Knob_QMode 的值执行不同执行不同逻辑,而 Knob_QMode 的默认值为 0,它的作用次要用来指定在 exit 的时候 EntryList 和 CXQ 队列之间的唤醒关系,也就是说,当 EntryList 和 CXQ 中都有期待的线程时,因为 exit 之后只能有一个线程失去锁,这个时候抉择唤醒哪个队列中的线程是一个值得思考的事。而这里的默认策略就是 0。
出队策略 0 代表 CXQ 队列后进先出,行将 cxq 指针赋予_EntryList,而后通过一个循环将本来单项链表的 CXQ 链表变成双向链表,不便前面针对 CXQ 链表进行查问,这时候,_EntryList 就是 CXQ。而后交由 ExitEpilog 唤醒

void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {assert(_owner == Self, "invariant");

  // Exit protocol:   // 1. ST _succ = wakee   // 2. membar #loadstore|#storestore;   // 2. ST _owner = NULL   // 3. unpark(wakee) 
  _succ = Knob_SuccEnabled ? Wakee->_thread : NULL;
  ParkEvent * Trigger = Wakee->_event;

  // Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again.   // The thread associated with Wakee may have grabbed the lock and "Wakee" may be   // out-of-scope (non-extant).   Wakee  = NULL;

  // Drop the lock   OrderAccess::release_store(&_owner, (void*)NULL);
  OrderAccess::fence();                               // ST _owner vs LD in unpark() 
  if (SafepointMechanism::poll(Self)) {TEVENT(unpark before SAFEPOINT);
  }

  DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
  Trigger->unpark();

  // Maintain stats and report events to JVMTI   OM_PERFDATA_OP(Parks, inc());
}

即通过 park event 将期待的线程唤醒,而后执行 unpark 函数

void os::PlatformEvent::unpark() {if (Atomic::xchg(1, &_event) >= 0) return;

  int status = pthread_mutex_lock(_mutex);
  int anyWaiters = _nParked;
  status = pthread_mutex_unlock(_mutex);

  if (anyWaiters != 0) {status = pthread_cond_signal(_cond);
    assert_status(status == 0, status, "cond_signal");
  }
}

这里仍然是通过 pthread 的 condition signal 唤醒线程,后面线程休眠是通过 condition wait 实现的。

出队策略 1

出队策略 1 即 Knob_QMnode 的值批改为 1,这种模式下是先进先出,即 FIFO 队列行为。这种模式下的解决是先将 CXQ 队列 reverse 一下,而后再讲新的队头也就是原来的队尾赋值给_EntryList。而后按_EntryList 进行唤醒。

出队策略 2

出队策略 2 跟出队策略 0 类似,然而他是优先执行 CXQ 队列的操作,再执行_EntryList 队列的操作。即优先按 CXQ 进行唤醒。

出队策略 3 和出队策略 4

出队策略 3 和出队策略 4 都是简略的链接。出队策略 3 是将 CXQ 放在_EntryList 之后,而出队策略 4 是将_EntryList 放在 CXQ 之前。而后按新~~~~_EntryList 进行唤醒。

正文完
 0