RocksDB 是基于 Google LevelDB 研发的高性能 Key-Value 长久化存储引擎,以库组件模式嵌入程序中,为大规模分布式应用在 SSD 上运行提供优化。RocksDB 重点是提供工具反对,具体实现将交给下层利用。

正是这种高度可定制化能力,使得 RocksDB 可涵盖包含工作负载等泛滥场景的利用,明天咱们将逐个为大家介绍:

"为什么须要内存管理器?
为什么不应用现有的内存管理器?
RocksDB 到底是如何实现的?"

01 为什么须要内存管理器?

RocksDB 有很多外围场景须要分配内存的,包含但不限于 Memtable、 Cache、Iterator 等,高效优质的内存分配器就显得尤为重要。一个优良的通用内存分配器须要具备以下个性:

  • 尽量避免内存碎片;并发调配性能好;
  • 额定的空间损耗尽量少;
  • 兼具通用性、兼容性、可移植性且易调试。

内存治理能够分为 3 个档次,自下而上别离是:

  • 操作系统内核的内存治理;
  • glibc 层应用零碎调用保护的内存治理;
  • 应用程序从 glibc 层动静分配内存后,依据应用程序自身的程序个性进行优化, 比方应用援用计数、std::shared_ptr、apache 等内存池形式的内存治理。

目前大部分服务端程序应用 glibc 提供的 malloc/free 系列函数,而 glibc 应用的 ptmalloc2 在性能上远远落后于 Google 的 Tcmalloc 和 Facebook 的 Jemalloc 。 后两者只需应用 LD_PRELOAD 环境变量启动程序即可,甚至并不需要从新编译。

02 为什么不应用现有内存管理器?

RocksDB 应用内存的场景集中在 Memtable、Cache、Iterator 等外围链路上,在保障高性能的同时,还须要实现对内存分配器外部管制(包含监控等)。

以后现有的内存分配器不具备被动上报内存监控细节,所以从长远看,仍旧须要本人实现 RocksDB 专有的内存管理器。内存管理器设计思路如下:

  • 小内存的申请通过 Thread-cache 或者 Per-cpu cache,保障了 CPU 不会频繁的 cache-miss;
  • 若反对 MAP_HUGETLB,则会间接 mmap;
  • 若大页内存则间接从底层的分配器去申请。

03 RocksDB 是如何实现的?

如图所示,RocksDB 的内存管理器是反对并发的,接下来让咱们一起从源码动手,看看具体如何实现的。

Allocator

// Abstract interface for allocating memory in blocks. This memory is freed// When the allocator object is destroyed. See the Arena class for more info.class Allocator { public:  virtual ~Allocator() {}  // 分配内存  virtual char* Allocate(size_t bytes) = 0;                       // 对齐分配内存  virtual char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,                                Logger* logger = nullptr) = 0;    // 块大小  virtual size_t BlockSize() const = 0;                         };

Arena

Arena 负责实现 RocksDB 的内存调配,咱们从中能够看到其针对不同大小的内存调配申请,采取不同的调配策略,从而缩小内存碎片。

class Arena : public Allocator {    static const size_t kInlineSize = 2048;    static const size_t kMinBlockSize;  // 4K    static const size_t kMaxBlockSize;  // 2G    private:    char inline_block_[kInlineSize] __attribute__((__aligned__(alignof(max_align_t))));    const size_t kBlockSize;       // 每个 Block 的大小    using Blocks = std::vector<char*>;    Blocks blocks_;                // 调配的新 Block 地址(不够了就从新调配)        struct MmapInfo {        void* addr_;        size_t length_;        MmapInfo(void* addr, size_t length) : addr_(addr), length_(length) {}    };    std::vector<MmapInfo> huge_blocks_;     // 大块应用 mmap    size_t irregular_block_num = 0;         // 不参差的块调配次数(块大于 kBlockSize/4)        char* unaligned_alloc_ptr_ = nullptr;   // 未对齐的一端指针(高地址开始)    char* aligned_alloc_ptr_ = nullptr;     // 对齐一端指针(低地址开始)    size_t alloc_bytes_remaining_ = 0;      // 内存残余        size_t blocks_memory_ = 0;              // 曾经调配的内存大小        #ifdef MAP_HUGETLB    size_t hugetlb_size_ = 0;    #endif  // MAP_HUGETLB    char* AllocateFromHugePage(size_t bytes);    char* AllocateFallback(size_t bytes, bool aligned);    char* AllocateNewBlock(size_t block_bytes);        AllocTracker* tracker_;}

针对 Allocate 和 AllocateAligned,咱们采纳对同一块 Block 的两端进行调配。AllocateAligned 从内存块的低地址开始调配,Allocate 从高地址开始调配。调配流程图如下:

ConcurrentArena

内存分配器不仅须要缩小内存碎片,同样须要保障并发调配的性能,那么 RocksDB 是如何实现 ConcurrentArena 的呢?

从内存治理架构图能够看出,RocksDB 保护了 CoreLocal 内存数组,每个线程从所在 CPU 对应的本地 Shard 上分配内存,若有余再去主内存 Arena 进行调配。咱们从几个外围类开始逐个介绍:

1、ConcurrentArena

class ConcurrentArena : public Allocator { public:  // block_size and huge_page_size are the same as for Arena (and are  // in fact just passed to the constructor of arena_.  The core-local  // shards compute their shard_block_size as a fraction of block_size  // that varies according to the hardware concurrency level.  explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,                           AllocTracker* tracker = nullptr,                           size_t huge_page_size = 0);                             char* Allocate(size_t bytes) override {    return AllocateImpl(bytes, false /*force_arena*/,                        [this, bytes]() { return arena_.Allocate(bytes); });  } private:  ...  CoreLocalArray<Shard> shards_;    // 保护了一个 CoreLocal 内存数组  Arena arena_;                     // 主内存  ...};

2、CoreLocalArray

// An array of core-local values. Ideally the value type, T, is cache aligned to// prevent false sharing.template <typename T>class CoreLocalArray { public:  ...  // returns pointer to element for the specified core index. This can be used,  // e.g., for aggregation, or if the client caches core index.  T* AccessAtCore(size_t core_idx) const; private:  std::unique_ptr<T[]> data_;  int size_shift_;};

3、并发调配流程

template <typename Func>  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {    size_t cpu;    // 1:大块则间接从 Arena 上调配,须要加锁    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);    if (bytes > shard_block_size_ / 4 || force_arena ||        ((cpu = tls_cpuid) == 0 &&         !shards_.AccessAtCore(0)->allocated_and_unused_.load(             std::memory_order_relaxed) &&         arena_lock.try_lock())) {      if (!arena_lock.owns_lock()) {        arena_lock.lock();      }      auto rv = func();      Fixup();      return rv;    }    // 2:筛选 CPU 对应的 Shard    Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));    if (!s->mutex.try_lock()) {      s = Repick();      s->mutex.lock();    }    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);        // 2.1:若以后 Shard 可用内存不够时,去 Arena 调配存入 Shard    if (avail < bytes) {      // reload      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);      // If the arena's current block is within a factor of 2 of the right      // size, we adjust our request to avoid arena waste.      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);      assert(exact == arena_.AllocatedAndUnused());      if (exact >= bytes && arena_.IsInInlineBlock()) {        // If we haven't exhausted arena's inline block yet, allocate from arena        // directly. This ensures that we'll do the first few small allocations        // without allocating any blocks.        // In particular this prevents empty memtables from using        // disproportionately large amount of memory: a memtable allocates on        // the order of 1 KB of memory when created; we wouldn't want to        // allocate a full arena block (typically a few megabytes) for that,        // especially if there are thousands of empty memtables.        auto rv = func();        Fixup();        return rv;      }      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2                  ? exact                  : shard_block_size_;      s->free_begin_ = arena_.AllocateAligned(avail);      Fixup();    }    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);    // 3:依据是否对齐,判断是从高地址/低地址调配    char* rv;    if ((bytes % sizeof(void*)) == 0) {      // aligned allocation from the beginning      rv = s->free_begin_;      s->free_begin_ += bytes;    } else {      // unaligned from the end      rv = s->free_begin_ + avail - bytes;    }    return rv;  }

总结

  1. 入参 Func 就是下面传入的 Lambda 表达式:[this, bytes]() { returnarena_.Allocate(bytes) ;
  2. 当申请内存块较大时,间接从 Arena 调配且须要加锁;否则间接从以后 CPU 对应的 Shard 调配;
  3. 若以后 Shard 可用内存不够,须要从 Arena 再次申请;4、依据是否对齐,判断从高/低地址调配。