共计 11361 个字符,预计需要花费 29 分钟才能阅读完成。
前言
最近在钻研学习 leveldb 的源码,并且尝试用 Rust 进行重写 leveldb-rs,leveldb 中 memdb 模块是应用 skiplist 作为一个 kv 的内存存储,相干代码实现十分丑陋,所以有了这篇文章。leveldb 通过应用 Arena 模式来实现 skiplist。简略来说,就是利用线性数组来模仿节点之间的关系,能够无效防止循环援用。
- c++ 版本的 leveldb 尽管也是应用的 arena 模式,然而节点数据内存的申请和拜访进行了封装,skiplist 的构造定义和实现跟传统意义上的 skiplist 的代码实现十分类似,如果如果大家之前理解过 skiplist 的话,c++ 版本的代码是非常容易看懂的。
- golang 版本 leveldb 不足 arena 的封装,间接操作 slice,如果对 arena 模式不相熟的话,了解起来就比拟麻烦。从软件工程角度上开,golang 版本的 memdb 的代码写的不太好,能够进一步优化的和重构 arena 的操作。
在本文中将会解说上面内容:
- 比照 c ++ 和 golang 版本中查问、插入、删除的实现
- 剖析 golang 版本中能够优化的中央
而后在下一篇文章中将会介绍
- 基于 golang 版本应用 rust 重写 memdb(arena 版本)
- 应用 rust 重写一个非 arena 版本的 memdb,也就是经典的链表构造实现形式
类型申明
首先咱们来比照 C ++ 和 Golang 的代码中的 skiplist 定义:
C++
https://github.com/google/lev…
这里次要列出要害的成员变量,具体的能够去看源码:
template <typename Key, class Comparator>
class SkipList {
...
// Immutable after construction
Comparator const compare_;
Arena* const arena_; // Arena used for allocations of nodes
Node* const head_;
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
std::atomic<int> max_height_; // Height of the entire list
// Read/written only by Insert().
Random rnd_;
};
- Comparator const compare_; 用来在遍历 skiplist 进行节点 key 的比拟
- Arena* const arena_; 应用 Arena 模式的内存治理
- Node* const head_; 首节点
- std::atomic max_height_; skiplist 的层高,在插入的时候可能会变动
- Random rnd_; 随机数生成器,用于在每次插入的时候生成新节点的层高
Golang
https://github.com/syndtr/gol…
type DB struct {
cmp comparer.BasicComparer
rnd *rand.Rand
mu sync.RWMutex
kvData []byte
nodeData []int
prevNode [tMaxHeight]int
maxHeight int
n int
kvSize int
}
- cmp comparer.BasicComparer : 用来在遍历 skiplist 进行节点 key 的比拟
- rnd *rand.Rand:随机数生成器,用于在每次插入的时候生成新节点的层高
- kvData []byte: key 和 value 理论数据寄存的中央
- nodeData[]int: 存储各个节点的信息
- prevNode [tMaxHeight]int:用于在遍历 skiplist 的时候,保留每一层的前一个节点
- maxHeight int: skiplist 的层高,在插入的时候可能会变动
- n int: 节点的总个数
- kvsize: skiplist 中存储 key 和 value 的总字节数
golang 版本外面最难了解的就是 nodeData, 只有了解了 nodeData 的数据布局,前面代码就容易了解了。
- kvData 中存储的是 key,value 的实在的字节数据
- kvNode 中存储的是 skiplist 中的全副节点,然而节点不存储 key 和 value 的理论数据而是在 Kvdata 中的偏移以及 key 的长度,value 的长度,在比拟的时候再依据偏移和长度到 KvData 中读取。另外 KvNode 中还存储了以后节点的层高,以及每一层的下一个节点在 KvNode 中的偏移量,在查问的时候,就能够依据偏移量跳到 KvNode 中下一个节点的地位,在从外面读取信息。
查问大于等于特定 Key
首先看 skiplist 中的查问,leveldb 中查问的实现是最要害的,插入和删除也都是基于查问实现,咱们先来简略回顾下查问的过程:
- 首先依据跳表的高度选取最高层的头节点;
- 若跳表中的节点内容小于查找节点的内容,则取该层的下一个节点持续比拟;
- 若跳表中的节点内容等于查找节点的内容,则间接返回;
- 若跳表中的节点内容大于查找节点的内容,且层高不为 0,则升高层高,且从前一个节点开始,从新查找低一层中的节点信息;若层高为 0,则返回以后节点,该节点的 key 大于所查找节点的 key。
咱们举例来说,如果要在上面的 skiplist 中查问 key 为 17 节点
- 从最右边的 head 节点开始,以后层高是 4;
- head 节点在第 4 层的 next 节点的 key 是 6,因为 17 大于 6,所以在以后节点的左边,就沿着以后层的链表走到下一节点,也就是 key 是 6 节点。
- 6 节点 在第 4 层的 next 节点是 NIL,也就是前面没有节点了,那么就须要在以后节点往上层走,走到第 3 层。
- 6 节点 在第 3 层的 next 节点的 key 是 25,因为 17 小于 25,那么就须要在以后节点往上层走,走到第 2 层。
- 6 节点 在第 2 层的 next 节点的 key 是 9,因为 17 大于 9,那么就沿着以后层的链表走到下一节点,也就是 key 是 9 的节点。
- 9 节点 在第 2 层的 nex 节点的 key 是 25,因为 17 小于 25,那么就须要在以后节点往上层走,走到第 1 层。
- 8 节点 在第 1 层的 next 节点的 key 是 12,因为 17 大于 12,那么就沿着以后层的链表走到下一节点,也就是 key 是 12 的节点。
- 12 节点 在第 1 层的 next 节点的 key 是 19,因为 17 小于 19,原本应该要持续走到下一层,然而因为以后曾经是最初一层了,所以间接返回 12 的 next 节点,也就是 19 节点.
C++
https://github.com/google/lev…
在 skiplist 中查问大于等于 key 的最小节点的办法如下
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_; // head 节点
int level = GetMaxHeight() - 1;// 以后层高
while (true) {Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {// 如果以后层中 x 的下一个节点的 key 小于 key
x = next; // 持续在以后层的 list 往后搜寻
} else {if (prev != nullptr) prev[level] = x; // 如果要记录遍历过程中的 pre 节点,就记录
if (level == 0) { // 搜寻到底了就返回
return next;
} else {
// 如果以后层中 x 下一个节点的 key 大于 key,往下一层进行搜寻
level--;
}
}
}
}
Go
https://github.com/syndtr/gol…
在 skiplist 中查问大于等于 key 的最小节点的办法如下
// Must hold RW-lock if prev == true, as it use shared prevNode slice.
func (p *DB) findGE(key []byte, prev bool) (int, bool) {
node := 0 // head 节点
h := p.maxHeight - 1 // 以后层高
for {next := p.nodeData[node+nNext+h]
cmp := 1
if next != 0 {o := p.nodeData[next]
cmp = p.cmp.Compare(p.kvData[o:o+p.nodeData[next+nKey]], key)
}
// 如果以后层中 node 的下一个节点的 key 小于 key,持续在以后层的 list 往后搜寻
if cmp < 0 {
// Keep searching in this list
node = next
} else {
if prev { // 对于插入或删除而进行的搜寻,即便遇到雷同的也要持续往下一层比拟
p.prevNode[h] = node
} else if cmp == 0 {return next, true}
if h == 0 {return next, cmp == 0}
// 如果以后层中 node 的下一个节点的 key 大于 key,以后 node 往下一层进行搜寻
h--
}
}
}
node + nNext + i
咱们能够当作成 C ++ 代码中链表构造 skiplist 中第 node 个节点在第 i 层,而后p.nodeData[node+nNext+i]
当成node->Next(i)
p.nodeData[next+nKey]
获取 key 的长度
查问小于等于
GE 搜寻,和 LT 搜寻十分相似,其中要害的差异在于
- GE 搜寻返回的是 next 节点,LT 返回的是以后 node
- GE 搜寻过程中如果 next 和以后 Key 雷同就返回,LT 如果 next 和以后 key 雷同,进入下一层,这样就将搜寻区间限度在小于 key 的范畴了。
C++
https://github.com/google/lev… 在 skiplist 中查问小于 key 的最大节点的办法如下
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_; // head 节点
int level = GetMaxHeight() - 1; // 以后层高
while (true) {assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) { // 如果 next 节点为空或 next 节点大于等于申请 key
if (level == 0) { // 以后是最初一层了,返回以后节点
return x;
} else {level--; // 走到下一层}
} else {x = next; // 如果 next 节点小于申请 key,那么沿着以后层走到 next 节点}
}
}
Golang
https://github.com/syndtr/gol… 在 skiplist 中查问小于 key 的最大节点的办法如下
func (p *DB) findLT(key []byte) int {
node := 0 // head 节点
h := p.maxHeight - 1 // 以后层高
for {next := p.nodeData[node+nNext+h] // 求以后节点的下一个节点
o := p.nodeData[next] // next 节点在 nodeData 中的数据偏移
if next == 0 || p.cmp.Compare(p.kvData[o:o+p.nodeData[next+nKey]], key) >= 0 {// 如果 next 节点为空或 next 节点大于等于申请 key
if h == 0 {// 以后是最初一层了,返回以后节点
break
}
h-- // 走到下一层
} else {node = next // 如果 next 节点小于申请 key,那么沿着以后层走到 next 节点}
}
return node
}
node + nNext + i
咱们能够当作成 C ++ 代码中链表构造 skiplist 中第 node 个节点在第 i 层,而后p.nodeData[node+nNext+i]
当成node->Next(i)
,p.nodeData[next+nKey]
获取 key 的长度,从而p.kvData[o:o+p.nodeData[next+nKey]]
获取 key 对应的实在数据
这里获取 key 的操作能够封装为一个独自的办法,进步代码的可读性
查问最初一个节点
从高层到底层,判断 next 是否是空(即以后层 list 曾经是否到最初一个节点了),
- 如果不为空,就持续跳到下一个节点
- 如果为空,挪动到下一层
C++
https://github.com/google/lev…
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::FindLast()
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {Node* next = x->Next(level);
if (next == nullptr) { // 如果 next 节点是空,就移到下一层
if (level == 0) {return x;} else {
// Switch to next list
level--;
}
} else { // 如果以后 next 不是空,移到 next
x = next;
}
}
}
Golang
https://github.com/syndtr/gol…
func (p *DB) findLast() int {
node := 0
h := p.maxHeight - 1
for {next := p.nodeData[node+nNext+h] // 获取 next
if next == 0 { // next 是空,就走到下一层
if h == 0 {break}
h--
} else {node = next // 挪动到 next}
}
return node
}
- node + nNext + i 咱们能够当作成 C ++ 代码中链表构造 skiplist 中第 node 个节点在第 i 层,而后 p.nodeData[node+nNext+i] 当成 node->Next(i),
插入
这里借用 https://www.bookstack.cn/read… 的示例图
- 在查找的过程中,一直记录每一层的后任节点,如图中红色圆圈所示意的;
- 为新插入的节点随机产生层高(随机产生层高的算法较为简单,依赖最高层数和概率值 p,可见下文中的代码实现);
- 在适合的地位插入新节点(例如图中节点 12 与节点 19 之间),并根据查找时记录的后任节点信息,在每一层中,以链表插入的形式,将该节点插入到每一层的链接中。
C++
https://github.com/google/lev…
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];// 用于记录遍历过程中每一层的前一个节点
Node* x = FindGreaterOrEqual(key, prev); // 找到插入点
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
// 为要插入的点生成一个随机层高
int height = RandomHeight();
if (height > GetMaxHeight()) {
// 如果新生成的层高比以后 skiplist 的最大层高还要高,// 那么 head 节点之前在 (GetMaxHeight(), heigth] 是没有 next 节点的,所以要弥补下来
for (int i = GetMaxHeight(); i < height; i++) {prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
// 将新节点所穿过的每一层链表,进行插入
// 也就是将后面记录的每一个前向节点 preNode, x->next 指向 preNode->next, 而后 preNode->next 指向 x
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
Golang
https://github.com/syndtr/gol…
// Put sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map.
//
// It is safe to modify the contents of the arguments after Put returns.
func (p *DB) Put(key []byte, value []byte) error {p.mu.Lock()
defer p.mu.Unlock()
// 找到插入地位
if node, exact := p.findGE(key, true); exact {
// 如果 key 曾经存在了,kvOffset := len(p.kvData) // 因为上面 key 和 value 都是追加到 kvData 前面,所以这里记录 kvData 的长度,就是新节点的 offset
p.kvData = append(p.kvData, key...)// 追加 key 的数据
p.kvData = append(p.kvData, value...)// 追加 value 的数据
p.nodeData[node] = kvOffset // 更新 offset
m := p.nodeData[node+nVal] // 之前 value 的长度
p.nodeData[node+nVal] = len(value) // 更新 value 的长度, 因为 key 不变,所以 key 的长度不须要更新
p.kvSize += len(value) - m // 更新数据总长度
return nil
}
// 插入新节点,获取以后节点的层高
h := p.randHeight()
if h > p.maxHeight {
// 如果新生成的层高比以后 skiplist 的最大层高还要高,// 那么 head 节点之前在 (p.maxHeight, h] 是没有 next 节点的,所以要弥补下来
for i := p.maxHeight; i < h; i++ {p.prevNode[i] = 0
}
p.maxHeight = h
}
kvOffset := len(p.kvData) // 记录以后 kvData 的长度作为新节点的 offset
p.kvData = append(p.kvData, key...) // 追加 key 数据
p.kvData = append(p.kvData, value...)// 追加 value 数据
// Node
node := len(p.nodeData)
p.nodeData = append(p.nodeData, kvOffset, len(key), len(value), h)// 追加节点信息
for i, n := range p.prevNode[:h] {
m := n + nNext + i
p.nodeData = append(p.nodeData, p.nodeData[m]) // 以后节点每一层的 next 指向前向节点每一层的 next
p.nodeData[m] = node // 前向节点的 next 指向以后节点
}
p.kvSize += len(key) + len(value) // 总长度
p.n++
return nil
}
其中这一段比拟重要,https://github.com/syndtr/gol… 是执行插入的外围代码
// Node
node := len(p.nodeData)
p.nodeData = append(p.nodeData, kvOffset, len(key), len(value), h)
for i, n := range p.prevNode[:h] {
m := n + nNext + i
p.nodeData = append(p.nodeData, p.nodeData[m])
p.nodeData[m] = node
}
一开始不太容易看懂和解释,咱们把代码略微批改下, 改成这个
// Node
node := len(p.nodeData)
p.nodeData = append(p.nodeData, kvOffset, len(key), len(value), h)
// 增加 h 个 nextNode 的索引
p.nodeData = append(p.nodeData, make([]int,h)...)
for i, n := range p.prevNode[:h] {
m := n + nNext + i
p.nodeData[node+nNext+i] = p.nodeData[m]
p.nodeData[m] = node
}
这样就比拟容易解释了:
首先,m := n + nNext + i
咱们能够当作成 C ++ 代码中链表构造 skiplist 中第 n 个节点在第 i 层,而后p.nodeData[n+nNext+i]
当成 n->Next(i)
,p.nodeData[node+nNext+i]
当成 node->Next(i)
,那么下面循环中的局部就变成了
for i, n := range p.prevNode[:h] {
m := n + nNext + i
node.Next(i) = n->Next(i)// 以后节点指向 pre 节点的下一个节点
n->Next(i) = node // pre 节点的下一个节点变成以后节点
}
删除
删除只有 goleveldb 提供了这个办法,也比较简单
Golang
https://github.com/syndtr/gol…
func (p *DB) Delete(key []byte) error {p.mu.Lock()
defer p.mu.Unlock()
node, exact := p.findGE(key, true)// 查找删除点
if !exact {return ErrNotFound}
h := p.nodeData[node+nHeight]
for i, n := range p.prevNode[:h] {
m := n + nNext + i
p.nodeData[m] = p.nodeData[p.nodeData[m]+nNext+i] // 每一层前向节点的 next 指向 以后节点的 next
}
p.kvSize -= p.nodeData[node+nKey] + p.nodeData[node+nVal]
p.n--
return nil
}
- n + nNext + i 咱们能够当作成 C ++ 代码中链表构造 skiplist 中第 node 个节点在第 i 层,而后 p.nodeData[node+nNext+i] 当成 node->Next(i),
- p.nodeData[m] = p.nodeData[p.nodeData[m]+nNext+i] 就能够了解为 node->Next(i) = node->Next(i)->next(i),实现删除
Golang 版本优化
在钻研 goleveldb 的时候,发现了一些能够优化的中央,如下:
删除优化
在删除节点的时候,goleveldb 中遍历每一层的前向节点 preNode,而后让 preNode.next = preNode.next.next 链表删除 , 其实 前向节点的 next 节点就是以后节点,所以能够改为 preNode.next = cur.next 也就是
p.nodeData[m] = p.nodeData[node+nNext+i]
插入优化
插入节点的时候,不论 key 是不是曾经存在了,都是间接在 kvData 中追加数据,其实这里能够进一步优化,如果新的 value 长度等于原先的 value,或者新的 value 的长度小于原先的 value 的长度,都能够复用之前的内存空间。
原 goleveldb 的性能测试代码中插入的 value 都是 nil,所以这里咱们新写一个测试函数 性能测试代码
func BenchmarkPutRandomKV(b *testing.B) {buf := make([][4]byte, b.N)
value := make([][]byte, b.N)
for i := range buf {tmp := uint32(rand.Int()) % 100000
binary.LittleEndian.PutUint32(buf[i][:], tmp)
value[i] = make([]byte, tmp)
}
b.ResetTimer()
p := New(comparer.DefaultComparer, 0)
for i := range buf {p.Put(buf[i][:], value[i][:])
}
}
优化之前
goos: linux
goarch: amd64
pkg: github.com/syndtr/goleveldb/leveldb/memdb
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkPutRandomKV-8 20180 53760 ns/op 273344 B/op 0 allocs/op
PASS
ok github.com/syndtr/goleveldb/leveldb/memdb 2.000s
优化之后
goos: linux
goarch: amd64
pkg: github.com/syndtr/goleveldb/leveldb/memdb
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkPutRandomKV-8 29634 51464 ns/op 232687 B/op 0 allocs/op
PASS
ok github.com/syndtr/goleveldb/leveldb/memdb 2.264s
这个性能的晋升次要跟 key 反复的概率,以及新插入的 value 小于之前 value 的概率无关。
进一步思考
goleveldb 对于删除节点在 KvData 中占用的空间是始终占用没有失去复用的,这里也能够想方法进行复用,就留给读者思考了。
序幕
本文到这里基本上把 leveldb 中 memdb 的外围代码讲完了,如果有 Rust 背景的同学,能够期待下篇用 Rust 重写 skiplist 局部。
参考资料
- 跳跃表 https://www.bookstack.cn/read…
- 跳跃链表 https://www.cnblogs.com/s-lis…