- 对于Queue
template <typename T, typename Allocator = AlignedAllocator<Slot<T>>>class Queue {private:static_assert(std::is_nothrow_copy_assignable<T>::value || std::is_nothrow_move_assignable<T>::value, "T must be nothrow copy or move assignable");static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible");
第一个模板参数是队列存储的对象类型,第二个模板参数为内存分配器,默认应用AlignedAllocator,即上文定义的内存分配器。
要求T类型的拷贝赋值,挪动赋值函数和析构函数都要是noexcept的。
public:explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { if (capacity_ < 1) { throw std::invalid_argument("capacity < 1"); } // Allocate one extra slot to prevent false sharing on the last slot slots_ = allocator_.allocate(capacity_ + 1); // Allocators are not required to honor alignment for over-aligned types (see http://eel.is/c++draft/allocator.requirements#10) so we verify alignment here if (reinterpret_cast<size_t>(slots_) % alignof(Slot<T>) != 0) { allocator_.deallocate(slots_, capacity_ + 1); throw std::bad_alloc(); } for (size_t i = 0; i < capacity_; ++i) { new (&slots_[i]) Slot<T>(); } static_assert( alignof(Slot<T>) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing"); static_assert(sizeof(Slot<T>) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent false sharing between adjacent slots"); static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to prevent false sharing between adjacent queues"); static_assert( offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast<std::ptrdiff_t>(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing");}
下面代码是Queue的构造函数,为什么要多申请一个Slot防止最初一个Slot的伪共享(不懂)?
而后查看了调配的内存起始地址是不是以Slot<T>的对齐数对齐,因为分配器并不被要求对over-aligned(适度对齐)的类型进行对齐,如果不满足对齐要求,则防止伪共享的要求达不到,开释内存抛出异样。
接着在capacity_个内存块上结构Slot<T>对象,最初进行一系列的动态断言,确保各个对象的内存散布与大小合乎设计要求。
~Queue() noexcept { for (size_t i = 0; i < capacity_; ++i) { slots_[i].~Slot(); } allocator_.deallocate(slots_, capacity_ + 1);}// non-copyable and non-movableQueue(const Queue &) = delete;Queue &operator=(const Queue &) = delete;
析构函数没有什么意外之处.
template <typename... Args> void emplace(Args &&... args) noexcept {static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&..."); auto const head = head_.fetch_add(1); auto &slot = slots_[idx(head)]; while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire)) ; slot.construct(std::forward<Args>(args)...); slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);}
向队列首部插入一个元素,这个函数波及到MPMCQueue外围的设计思路,因而对源码的剖析先暂停,钻研下MPMCQueue入队出队的操作实现。
MPMCQueue类应用head_和tail_两个数据成员作为队列的首元素和尾元素的索引标识,然而这两个数据不会有减小的操作,而是始终fetch_add(1),取元素的时候应用idx(head)取得真正的索引值,这里idx辅助函数就是head % capacity_,而turn函数的实现为head / capacity_,能够这么了解,turn的返回值代表了head遍历以后队列的趟数,假如capacity_ = 5,则:
head = 0,turn(head) = 0,以后head遍历了队列0趟。
head = 1, turn(head) = 0, head后退了一个单位,但还是0趟。
...
head = 5,turn(head) = 1, head又指向了队列的第一个Slot(因为idx(head) = 0),而曾经是第1趟遍历队列了。
因而,emplace函数中首先递增head_,这样就通过idx(head)取得了队首Slot的前一个Slot索引,也就是本次结构T类型对象的Slot的地位,并通过auto& slot = slots_[idx(head)]取得该Slot的援用。
接下来是一个while循环,通过一直比拟turn(head) * 2 和slot.turn的值,相等的时候认为该Slot是空的,否则在这里有限循环,期待slot.turn的值扭转。之后就调用construct函数在Slot对象中结构T类型对象,并给slot.turn赋值为turn(head) * 2 + 1。
暂且疏忽原子操作的内存一致性选项(之后剖析),能够剖析每个Slot对象turn的值代表了该Slot对象中是否存在T类型对象,当slot.turn = turn(head) 2时不存在,当slot.turn = turn(head) 2 + 1时存在。
head_第0趟遍历到该Slot对象的时候,slot.turn = 0, while判断胜利,结构对象,slot.turn被赋值为1。假如始终没有pop操作而一直插入数据,head_一直减少直到又找到了这个Slot对象(这个时候队列曾经满了),这时候head_的趟数变为1,所以while判断(1 * 2 != 1)失败,示意这个Slot对象中曾经含有T类型对象,不能插入。剖析到这里能够晓得,pop函数中也在一直批改slot.turn值,当tail_第0趟遍历队列的时候,会把slot.turn从1变为2,这时emplace操作的while判断就会胜利,即能够插入T类型对象。
因而对于每个slot.turn其实在一直经验如下过程:
slot.turn = 0 // init.
//emplace
wait slot.turn == 0 :
slot.turn = 1
construct object.
//pop
wait slot.turn == 1 :
slot.turn = 2
destruct object.
//emplace
wait slot.turn == 2 :
slot.turn = 3
construct object
...
当slot.turn为奇数的时候Slot中存在对象,当slot.turn为偶数的时候Slot中不存在对象,这时候咱们回顾下Slot的析构函数:
~Slot() noexcept { if (turn & 1) { destroy(); }}
当turn为奇数的时候turn & 1的后果为真,调用destroy函数。
为了验证这个猜测上面看下pop函数的代码:
void pop(T &v) noexcept { auto const tail = tail_.fetch_add(1); auto &slot = slots_[idx(tail)]; while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire)) ; v = slot.move(); slot.destroy(); slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);}
能够看到,pop函数中在期待slot.turn变为turn(tail) * 2 + 1,而后move出对象,并批改slot.turn为turn(tail) * 2 + 2。
Queue类中还有try_emplace,push,try_push,try_pop接口函数,外围逻辑与emplace和pop大同小异,预计和原子操作的内存一致性选项一起剖析吧。