共计 4269 个字符,预计需要花费 11 分钟才能阅读完成。
- 对于 Queue
template <typename T, typename Allocator = AlignedAllocator<Slot<T>>>
class Queue {
private:
static_assert(std::is_nothrow_copy_assignable<T>::value || std::is_nothrow_move_assignable<T>::value, "T must be nothrow copy or move assignable");
static_assert(std::is_nothrow_destructible<T>::value, "T must be nothrow destructible");
第一个模板参数是队列存储的对象类型,第二个模板参数为内存分配器,默认应用 AlignedAllocator,即上文定义的内存分配器。
要求 T 类型的拷贝赋值,挪动赋值函数和析构函数都要是 noexcept 的。
public:
explicit Queue(const size_t capacity, const Allocator &allocator = Allocator()) : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) {if (capacity_ < 1) {throw std::invalid_argument("capacity < 1");
}
// Allocate one extra slot to prevent false sharing on the last slot
slots_ = allocator_.allocate(capacity_ + 1);
// Allocators are not required to honor alignment for over-aligned types (see http://eel.is/c++draft/allocator.requirements#10) so we verify alignment here
if (reinterpret_cast<size_t>(slots_) % alignof(Slot<T>) != 0) {allocator_.deallocate(slots_, capacity_ + 1);
throw std::bad_alloc();}
for (size_t i = 0; i < capacity_; ++i) {new (&slots_[i]) Slot<T>();}
static_assert(alignof(Slot<T>) == hardwareInterferenceSize, "Slot must be aligned to cache line boundary to prevent false sharing");
static_assert(sizeof(Slot<T>) % hardwareInterferenceSize == 0, "Slot size must be a multiple of cache line size to prevent false sharing between adjacent slots");
static_assert(sizeof(Queue) % hardwareInterferenceSize == 0, "Queue size must be a multiple of cache line size to prevent false sharing between adjacent queues");
static_assert(offsetof(Queue, tail_) - offsetof(Queue, head_) == static_cast<std::ptrdiff_t>(hardwareInterferenceSize), "head and tail must be a cache line apart to prevent false sharing");
}
下面代码是 Queue 的构造函数,为什么要多申请一个 Slot 防止最初一个 Slot 的伪共享(不懂)?
而后查看了调配的内存起始地址是不是以 Slot<T> 的对齐数对齐,因为分配器并不被要求对 over-aligned(适度对齐)的类型进行对齐,如果不满足对齐要求,则防止伪共享的要求达不到,开释内存抛出异样。
接着在 capacity_个内存块上结构 Slot<T> 对象,最初进行一系列的动态断言,确保各个对象的内存散布与大小合乎设计要求。
~Queue() noexcept {for (size_t i = 0; i < capacity_; ++i) {slots_[i].~Slot();}
allocator_.deallocate(slots_, capacity_ + 1);
}
// non-copyable and non-movable
Queue(const Queue &) = delete;
Queue &operator=(const Queue &) = delete;
析构函数没有什么意外之处.
template <typename... Args> void emplace(Args &&... args) noexcept {static_assert(std::is_nothrow_constructible<T, Args &&...>::value, "T must be nothrow constructible with Args&&...");
auto const head = head_.fetch_add(1);
auto &slot = slots_[idx(head)];
while (turn(head) * 2 != slot.turn.load(std::memory_order_acquire))
;
slot.construct(std::forward<Args>(args)...);
slot.turn.store(turn(head) * 2 + 1, std::memory_order_release);
}
向队列首部插入一个元素,这个函数波及到 MPMCQueue 外围的设计思路,因而对源码的剖析先暂停,钻研下 MPMCQueue 入队出队的操作实现。
MPMCQueue 类应用 head_和 tail_两个数据成员作为队列的首元素和尾元素的索引标识,然而这两个数据不会有减小的操作,而是始终 fetch_add(1),取元素的时候应用 idx(head)取得真正的索引值,这里 idx 辅助函数就是 head % capacity_,而 turn 函数的实现为 head / capacity_,能够这么了解,turn 的返回值代表了 head 遍历以后队列的趟数,假如 capacity_ = 5,则:
head = 0,turn(head) = 0,以后 head 遍历了队列 0 趟。
head = 1, turn(head) = 0, head 后退了一个单位,但还是 0 趟。
…
head = 5,turn(head) = 1, head 又指向了队列的第一个 Slot(因为 idx(head) = 0),而曾经是第 1 趟遍历队列了。
因而,emplace 函数中首先递增 head_,这样就通过 idx(head)取得了队首 Slot 的前一个 Slot 索引,也就是本次结构 T 类型对象的 Slot 的地位,并通过 auto& slot = slots_[idx(head)]取得该 Slot 的援用。
接下来是一个 while 循环,通过一直比拟 turn(head) * 2 和 slot.turn 的值,相等的时候认为该 Slot 是空的,否则在这里有限循环,期待 slot.turn 的值扭转。之后就调用 construct 函数在 Slot 对象中结构 T 类型对象,并给 slot.turn 赋值为 turn(head) * 2 + 1。
暂且疏忽原子操作的内存一致性选项(之后剖析),能够剖析每个 Slot 对象 turn 的值代表了该 Slot 对象中是否存在 T 类型对象,当 slot.turn = turn(head) 2 时不存在,当 slot.turn = turn(head) 2 + 1 时存在。
head_第 0 趟遍历到该 Slot 对象的时候,slot.turn = 0,while 判断胜利,结构对象,slot.turn 被赋值为 1。假如始终没有 pop 操作而一直插入数据,head_一直减少直到又找到了这个 Slot 对象(这个时候队列曾经满了),这时候 head_的趟数变为 1,所以 while 判断 (1 * 2 != 1) 失败,示意这个 Slot 对象中曾经含有 T 类型对象,不能插入。剖析到这里能够晓得,pop 函数中也在一直批改 slot.turn 值,当 tail_第 0 趟遍历队列的时候,会把 slot.turn 从 1 变为 2,这时 emplace 操作的 while 判断就会胜利,即能够插入 T 类型对象。
因而对于每个 slot.turn 其实在一直经验如下过程:
slot.turn = 0 // init.
//emplace
wait slot.turn == 0 :
slot.turn = 1
construct object.
//pop
wait slot.turn == 1 :
slot.turn = 2
destruct object.
//emplace
wait slot.turn == 2 :
slot.turn = 3
construct object
…
当 slot.turn 为奇数的时候 Slot 中存在对象,当 slot.turn 为偶数的时候 Slot 中不存在对象,这时候咱们回顾下 Slot 的析构函数:
~Slot() noexcept {if (turn & 1) {destroy();
}
}
当 turn 为奇数的时候 turn & 1 的后果为真,调用 destroy 函数。
为了验证这个猜测上面看下 pop 函数的代码:
void pop(T &v) noexcept {auto const tail = tail_.fetch_add(1);
auto &slot = slots_[idx(tail)];
while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))
;
v = slot.move();
slot.destroy();
slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
}
能够看到,pop 函数中在期待 slot.turn 变为 turn(tail) * 2 + 1,而后 move 出对象,并批改 slot.turn 为 turn(tail) * 2 + 2。
Queue 类中还有 try_emplace,push,try_push,try_pop 接口函数,外围逻辑与 emplace 和 pop 大同小异,预计和原子操作的内存一致性选项一起剖析吧。