• 对于Queue
template <typename T, typename Allocator = AlignedAllocator<Slot<T>>>class Queue {private:static_assert(std::is_nothrow_copy_assignable<T>::value || std::is_nothrow_move_assignable<T>::value, "T must be nothrow copy or move assignable");static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible");

第一个模板参数是队列存储的对象类型,第二个模板参数为内存分配器,默认应用AlignedAllocator,即上文定义的内存分配器。

要求T类型的拷贝赋值,挪动赋值函数和析构函数都要是noexcept的。

public:explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) {  if (capacity_ < 1) {    throw std::invalid_argument("capacity < 1");  }  // Allocate one extra slot to prevent false sharing on the last slot  slots_ = allocator_.allocate(capacity_ + 1);    // Allocators are not required to honor alignment for over-aligned types (see http://eel.is/c++draft/allocator.requirements#10) so we verify alignment here  if (reinterpret_cast<size_t>(slots_) % alignof(Slot<T>) != 0) {    allocator_.deallocate(slots_, capacity_ + 1);    throw std::bad_alloc();  }  for (size_t i = 0; i < capacity_; ++i) {    new (&slots_[i]) Slot<T>();  }  static_assert( alignof(Slot<T>) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing");  static_assert(sizeof(Slot<T>) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent false sharing between adjacent slots");  static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to prevent false sharing between adjacent queues");  static_assert( offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast<std::ptrdiff_t>(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing");}

下面代码是Queue的构造函数,为什么要多申请一个Slot防止最初一个Slot的伪共享(不懂)?
而后查看了调配的内存起始地址是不是以Slot<T>的对齐数对齐,因为分配器并不被要求对over-aligned(适度对齐)的类型进行对齐,如果不满足对齐要求,则防止伪共享的要求达不到,开释内存抛出异样。
接着在capacity_个内存块上结构Slot<T>对象,最初进行一系列的动态断言,确保各个对象的内存散布与大小合乎设计要求。

~Queue() noexcept {  for (size_t i = 0; i < capacity_; ++i) {    slots_[i].~Slot();  }  allocator_.deallocate(slots_, capacity_ + 1);}// non-copyable and non-movableQueue(const Queue &) = delete;Queue &operator=(const Queue &) = delete;

析构函数没有什么意外之处.

template <typename... Args> void emplace(Args &&... args) noexcept {static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&...");  auto const head = head_.fetch_add(1);  auto &slot = slots_[idx(head)];  while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire))    ;  slot.construct(std::forward<Args>(args)...);  slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);}

向队列首部插入一个元素,这个函数波及到MPMCQueue外围的设计思路,因而对源码的剖析先暂停,钻研下MPMCQueue入队出队的操作实现。

MPMCQueue类应用head_和tail_两个数据成员作为队列的首元素和尾元素的索引标识,然而这两个数据不会有减小的操作,而是始终fetch_add(1),取元素的时候应用idx(head)取得真正的索引值,这里idx辅助函数就是head % capacity_,而turn函数的实现为head / capacity_,能够这么了解,turn的返回值代表了head遍历以后队列的趟数,假如capacity_ = 5,则:

head = 0,turn(head) = 0,以后head遍历了队列0趟。
head = 1, turn(head) = 0, head后退了一个单位,但还是0趟。
...
head = 5,turn(head) = 1, head又指向了队列的第一个Slot(因为idx(head) = 0),而曾经是第1趟遍历队列了。

因而,emplace函数中首先递增head_,这样就通过idx(head)取得了队首Slot的前一个Slot索引,也就是本次结构T类型对象的Slot的地位,并通过auto& slot = slots_[idx(head)]取得该Slot的援用。
接下来是一个while循环,通过一直比拟turn(head) * 2 和slot.turn的值,相等的时候认为该Slot是空的,否则在这里有限循环,期待slot.turn的值扭转。之后就调用construct函数在Slot对象中结构T类型对象,并给slot.turn赋值为turn(head) * 2 + 1。

暂且疏忽原子操作的内存一致性选项(之后剖析),能够剖析每个Slot对象turn的值代表了该Slot对象中是否存在T类型对象,当slot.turn = turn(head) 2时不存在,当slot.turn = turn(head) 2 + 1时存在。
head_第0趟遍历到该Slot对象的时候,slot.turn = 0, while判断胜利,结构对象,slot.turn被赋值为1。假如始终没有pop操作而一直插入数据,head_一直减少直到又找到了这个Slot对象(这个时候队列曾经满了),这时候head_的趟数变为1,所以while判断(1 * 2 != 1)失败,示意这个Slot对象中曾经含有T类型对象,不能插入。剖析到这里能够晓得,pop函数中也在一直批改slot.turn值,当tail_第0趟遍历队列的时候,会把slot.turn从1变为2,这时emplace操作的while判断就会胜利,即能够插入T类型对象。
因而对于每个slot.turn其实在一直经验如下过程:
slot.turn = 0 // init.

//emplace
wait slot.turn == 0 :
  slot.turn = 1
  construct object.

//pop
wait slot.turn == 1 :
  slot.turn = 2
  destruct object.

//emplace
wait slot.turn == 2 :
 slot.turn = 3
 construct object
...
当slot.turn为奇数的时候Slot中存在对象,当slot.turn为偶数的时候Slot中不存在对象,这时候咱们回顾下Slot的析构函数:

~Slot() noexcept {  if (turn & 1) {    destroy();  }}

当turn为奇数的时候turn & 1的后果为真,调用destroy函数。

为了验证这个猜测上面看下pop函数的代码:

void pop(T &v) noexcept {  auto const tail = tail_.fetch_add(1);  auto &slot = slots_[idx(tail)];  while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))    ;  v = slot.move();  slot.destroy();  slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);}

能够看到,pop函数中在期待slot.turn变为turn(tail) * 2 + 1,而后move出对象,并批改slot.turn为turn(tail) * 2 + 2。

Queue类中还有try_emplace,push,try_push,try_pop接口函数,外围逻辑与emplace和pop大同小异,预计和原子操作的内存一致性选项一起剖析吧。