前言
leveldb中memdb模块应用skiplist作为一个kv的内存存储,相干代码实现十分丑陋。在上文介绍了上面内容:
- 比照c++和golang版本中查问、插入、删除的实现
- 剖析golang版本中能够优化的中央,而后对rust版本进行优化
而后在本文中将会介绍
- 如何参考goleveldb的版本应用rust重写memdb(arena版本)
- 应用rust重写一个非arena版本的memdb,也就是经典的链表构造实现形式
arena实现
参考 goleveldb DB 的代码,同时思考到并发平安,所以在Rust实现中,别离定义了db 和Db 两个构造
- db蕴含所有的成员,非线程平安,提供查问相干办法;
- Db 对db增加Mutex封装,线程平安,提供外围的插入,删除性能以及更多的查问性能;
具体如下:
db
https://github.com/kingeaster...
struct db<T: Comparer> { cmp: T, // 比拟器,用于比拟key // rnd:rand // 存储理论key,value的数据 kv_data: Vec<u8>, // 用于记录key,value在kv_data中的索引 ,每一个节点的格局如下 ,其中 level 示意以后节点的层数,后跟 level 个数字,别离示意以后节点的level个层中每一层的下一个节点在node_data中的地位 // kvOffset1|len(key1)|len(value1)|level|next_node_1|...|next_node_i|...|next_node_h| node_data: Vec<usize>, // 后面16个字节对应于 head 节点 // 在查问过程中,记录搜寻过程中所经验的节点(理论是节点在node_data的地位),次要用于插入和删除 prev_node: RefCell<[usize; MAX_HEIGHT]>, // 以后skiplist的层数 max_height: usize, // skiplist中的节点个数 n: usize, // 数据总大小 kv_size: usize,}
这个db定义和goleveldb 定义的是十分相似的,没有太多简单的中央。
不过须要留神的是这里db中的prev_node成员变量,用于在查问或删除过程中记录每一层的前向节点,在golveldb中是一个一般的数组prevNode ,在咱们的Rust定义中是一个用RefCell封装的数组 RefCell<[usize; MAX_HEIGHT]> ,起因在于db的搜寻办法有两种应用场景,一种是用于纯正的搜寻查问,那么以后db就是只读的,应用不变借用 &self,如果用于插入或删除,须要往RefCell中插入数据,那么db就变成可变了,须要应用可变借用 &mut self,为了让db放弃 不变借用语义,所以应用RefCell来提供外部可变个性。那么为什么要让db放弃不变借用,间接不论 纯查问或批改查问都应用可变借用不就行了吗?
因为Rust中不变借用是能够共享的,而可变借用是不能够共享的,如果间接只用可变借用&mut self的话,就会限度纯 查问操作的应用场景,即便一个操作只是查问,也要将db申明为mut。
另外一种办法就是prev_node 不作为db的成员变量,而是在查问的时候作为一个额定的函数入参,具体能够参考 节点版本的做法
封装next节点的读取和设置
github 地址
为了进步代码的可读性和可维护性,将 获取node节点在level层的下一个节点在node_data中的地位的操作和 设置下一个节点的操作进行封装:
// 计算node节点在level层的下一个节点在node_data中的地位 ,封装一下,进步代码可读性 fn next_node(&self, node: usize, i: usize) -> usize { // + NNEXT 示意在node_data 中,从node地位开始,要跳过 kvOffset1|len(key1)|len(value1)|level| 这4个字节,前面再挪动 i 个地位,就达到 next_node_i 了 self.node_data[node + NNEXT + i] } fn set_next_node(&mut self, node: usize, i: usize, next: usize) { self.node_data[node + NNEXT + i] = next; }
封装读取key或value的数据
github 地址
为了提供代码的可行性和可维护性,将 获取key的实在数据 和获取value的实在数的操作封装
// 依据 node 在 node_data中的地位,求出在kv_data 中的偏移量和长度,从而失去 key
fn get_key_data(&self, node: usize) -> &[u8] { let offset = self.node_data[node]; &self.kv_data[offset..offset + self.node_data[node + NKEY]]}
// 依据 node 在 node_data 中的地位,求出在kv_data 中的偏移量和长度,从而失去 value fn get_value_data(&self, node: usize) -> &[u8] { let key_offset = self.node_data[node] + self.node_data[node + NKEY]; &self.kv_data[key_offset..key_offset + self.node_data[node + NVAL]] }
查问大于等于特定key的节点
github 地址
能够看到通过封装,find_great_or_equal的实现形式与skiplist的算法形容更加贴合。
// save_pre 标记 在搜寻过程中是否要记录遍历过的节点 pub fn find_great_or_equal(&self, key: &internalKey, save_pre: bool) -> (usize, bool) { let mut node = 0; // 从高层到底层开始搜寻 let mut i = self.max_height - 1; // println!("max_height {}", i); loop { // 下个节点在 node_data 中的地位 let next = self.next_node(node, i); let mut cmp = Ordering::Greater; // 以后链表上没有走到尾 if next > 0 { // 和下个节点next进行key比拟 cmp = self.cmp.compare(self.get_key_data(next), key.data()); } // 大于下一个节点,持续沿着以后层 向右 跳 if cmp == Ordering::Less { node = next; } else { // 小于等于下一个节点 或 下一个节点是空 // if save_pre { // // 对于插入或删除而进行的搜寻,即便遇到雷同的也要持续往下一层比拟,不能立刻返回 // // 所以这里要先于 cmp == Ordering::Equal 的判断 // self.prev_node.borrow_mut()[i] = node; // } else if cmp == Ordering::Equal { // // find_great_or_equal 跟 find_less 的一个不同就是这里返回的是 next // return (next, true); // } // 改成上面的形式可读性更高 if (!save_pre) && cmp == Ordering::Equal { return (next, true); } if save_pre {// 如果须要放弃前向节点,记录到pre_node中 self.prev_node.borrow_mut()[i] = node; } if i == 0 { return (next, cmp == Ordering::Equal); } i -= 1; } } }
在下面实现中,对于以后节点小于等于下一个节点的解决,相比golang的写法进行了重构。参考golang的写法如下:
// 小于等于下一个节点 或 下一个节点是空 if save_pre { // 对于插入或删除而进行的搜寻,即便遇到雷同的也要持续往下一层比拟,不能立刻返回 // 所以这里要先于 cmp == Ordering::Equal 的判断 self.prev_node.borrow_mut()[i] = node; } else if cmp == Ordering::Equal { // find_great_or_equal 跟 find_less 的一个不同就是这里返回的是 next return (next, true); }
这里代码的次要含意是:如果只是纯正的查问操作的话,找到匹配的就能够间接返回了;然而如果是为了插入或删除而进行的查问,即便找到了匹配的节点也要往下一层跳,直到最上面的一层才能够返回。了解了代码的意思咱们进行重写
// 改成上面的形式可读性更高
if (!save_pre) && cmp == Ordering::Equal { return (next, true); } if save_pre { self.prev_node.borrow_mut()[i] = node; }
其它
find_lessthan, find_last 这两个method的Rust实现跟goleveldb统一,就不多讲,大家能够间接点击去看源码。
Db
github Db
pub struct Db<T: Comparer> { db: sync::RwLock<db<T>>,}
db次要用来提供搜寻办法,非线程平安的, Db执行插入和删除,线程平安。
插入put
github 插入
首先获取写锁,在Rust中,sync::RwLock<T>通过调用write() 办法获取写锁:
let mut db = self.db.write().unwrap();
搜寻key以获取插入的地位
let (node, exist) = db.find_great_or_equal(key, true);
如果key曾经存在,就能够重复使用之前node_data
let (node, exist) = db.find_great_or_equal(key, true); if exist { // TODO 优化,如果新value的长度小于等于旧的value,间接笼罩间接的,不必重新分配 // 如果key曾经存在,间接笼罩之前的value // 数据是追加的形式,所以以后kv_data的长度就是node节点的在 skv_data 上的新的偏移量 let offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); } // 更新node的偏移量 db.node_data[node] = offset; // value 的长度可能也变动了 // 之前的长度 let v_old = db.node_data[node + NVAL]; // 更新为新的长度 db.node_data[node + NVAL] = v_len; // 更新数据总大小 db.kv_size += v_len - v_old; return Ok(()); }
对于新插入的点,先计算调配的层高
let h = db.rand_heigth();
同样如果调配的层高比以后skiplist的最大层高还要高,就要给pre_node弥补缺失的前向节点,也就是把head节点补充进去。
// 解决head节点 if h > db.max_height { for i in db.max_height..h + 1 { db.prev_node.borrow_mut()[i] = 0; } db.max_height = h; println!("height {}", h); }
失去新节点数据在kv_data中的起始偏移,而后将key,value数据追加到kv_data前面.
// 新增节点在 kv_data 中的起始偏移 let kv_offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); }
记录以后node_data的长度,也就是新节点在node_data中的起始偏移,而后在node_data中追加新节点,
// 创立新节点,因为是追加形式,所以以后 node_data 的长度 就是新节点在 node_data 的地位 let node = db.node_data.len(); // 增加新节点 db.node_data.push(kv_offset); // 在kv_data中的偏移 db.node_data.push(key.data().len()); // key的长度 db.node_data.push(v_len);// value的长度 db.node_data.push(h); // 以后节点的层高
随后依照goleveldb的写法执行链表插入,goleveldb中是这么写的
// 遍历每层的前向节点 for i, n := range p.prevNode[:h] { m := n + nNext + i // n节点在i层的下一个节点 p.nodeData = append(p.nodeData, p.nodeData[m]) // 以后节点第n层的下一个节点指向m p.nodeData[m] = node // n节点在i层的下一个节点指向以后节点node }
遍历prev_node中保留的前向节点,而后执行插入,也就是将以后节点指向前向节点的下一个节点,而后前向节点的下一个节点指向以后节点:
// 遍历每层的前向节点,iter()只会返回Item,利用enumerate封装能够同时返回下标 for (i,n )in db.prev_node.borrow()[0..h].iter().enumerate(){ let next = db.next_node(*n, i); db.node_data.push(next); db.set_next_node(*n, i, node); }
编译器报错:
error[E0502]: cannot borrow `db` as mutable because it is also borrowed as immutable --> src/memdb/memdb.rs:343:13 | 341 | for (i,n )in db.prev_node.borrow()[0..h].iter().enumerate(){ | --------------------- | | | immutable borrow occurs here | a temporary with access to the immutable borrow is created here ... 342 | let next = db.next_node(*n, i); 343 | db.node_data.push(next); | ^^ mutable borrow occurs here 344 | 345 | }
在for循环的作用域中,同时存在对db的不可变借用和可变借用,那么只能将两者离开;另外为了进步代码可读性,先提前给node_data扩大h长度,用于存储h个下个节点:
db.node_data.resize(node + NNEXT + h, 0);
执行节点插入的代码改写为
// 遍历每一层 for i in 0..h { let n = db.prev_node.borrow()[i];// 前向节点 let next = db.next_node(n, i); // 前向节点在第i层的下一节点 db.set_next_node(node, i, next); // 以后节点第i层的下一个节点指向next db.set_next_node(n, i, node); // 前向节点在第i层的下一个节点指向以后节点node }
更新统计信息
// 更新数据大小和个数 db.kv_size += key.data().len() + v_len; db.n += 1;
删除
github delete
// 删除 pub fn delete(&mut self, key: &internalKey) -> Option<()> { let mut db = self.db.write().unwrap(); let (node, exist) = db.find_great_or_equal(key, true); if !exist { return None; } // 以后节点有几层 let h = db.node_data[node + NHEIGHT]; // 开始删除, 让前一个节点指向前一个节点的下一个节点的下一个节点 pre->next = pre->next->next for i in 0..h { let pre = db.prev_node.borrow()[i]; // let pre_next = db.next_node(pre, i); // if pre_next != node { // print!("{}:{}", pre_next, node); // } // let next_next = db.next_node(pre_next, i); // db.set_next_node(pre, i, next_next); // 因为 前一个节点的下一个节点 pre_node 就是以后节点 node ,所以下面代码能够优化为 let next_next = db.next_node(node, i); db.set_next_node(pre, i, next_next); } db.kv_size -= db.node_data[node + NKEY] + db.node_data[node + NVAL]; db.n -= 1; Some(()) }
delete的代码中,跟put相似,在遍历pre_node的时候,要通过下标进行拜访,获取每一层的前向节点
let pre = db.prev_node.borrow()[i];
另外因为 前一个节点的下一个节点 pre_node 就是以后节点 node ,所以将节点删除的代码从
// 前向节点的下一个节点 let pre_next = db.next_node(pre, i); let next_next = db.next_node(pre_next, i); db.set_next_node(pre, i, next_next);
改为
// 间接应用以后节点 let next_next = db.next_node(node, i); db.set_next_node(pre, i, next_next);
其它
其它的办法比较简单,大家间接看源码即可
节点指针实现
节点定义
node,RcNode 如下:
type RcNode = Rc<RefCell<node>>;// 每一个节点struct node { offset: usize, // 对应kv_data 中的起始地位 key_len: usize, // key的长度 value_len: usize, // value的长度 next: Vec<Option<RcNode>>, // 以后节点有 height 层, 第i元素示意第i层的下一个节点}
因为链表中节点要被其它节点援用,要共享所有权,所以要应用Rc,另外因为在操作中要更改next,所以应用RefCell提供外部可变性,type RcNode = Rc<RefCell<node>>;
node的next属性中,每一层的下一个节点可能是空,所以应用Option;
为node实现如下获取理论key,value数据的办法,这里因为返回的是&[u8],所以要应用申明周期`a;
// 依据 node 在 node_data中的地位,求出在kv_data 中的偏移量和长度,从而失去 key fn get_key_data<'a>(&self, node: &node, kv_data: &'a [u8]) -> &'a [u8] { &kv_data[node.offset..node.offset + node.key_len] } // 依据 node 在 node_data 中的地位,求出在kv_data 中的偏移量和长度,从而失去 value fn get_value_data<'a>(&self, node: &node, kv_data: &'a [u8]) -> &'a [u8] { &kv_data[node.offset + node.key_len..node.offset + node.key_len + node.value_len] }
db_skip
github db_skip
db_skip的定义如下:
struct db_skip<T: Comparer> { cmp: T, // 比拟器,用于比拟key kv_data: Vec<u8>, // 存储理论key,value的数据 ,offset从1开始, offset为0的示意head节点 head: RcNode, // 头部, // 以后skiplist的层数 max_height: usize, // skiplist中的节点个数 n: usize, // 数据总大小 kv_size: usize,}
比拟于agena版本,这里少了pre_node,多了一个head成员用于保留skiplist的首节点。
封装读取key或value的数据
// 依据 node 在 node_data中的地位,求出在kv_data 中的偏移量和长度,从而失去 key fn get_key_data(&self, node: &node) -> &[u8] { &self.kv_data[node.offset..node.offset + node.key_len] } // 依据 node 在 node_data 中的地位,求出在kv_data 中的偏移量和长度,从而失去 value fn get_value_data(&self, node: &node) -> &[u8] { &self.kv_data[node.offset + node.key_len..node.offset + node.key_len + node.value_len] }
查问大于等于特定key的节点
github find_great_or_equal
pub fn find_great_or_equal( &self, key: &internalKey, pre_node: &mut Option<Vec<RcNode>>, // 留神 不能用 Option<&mut Vec<RcNode>> ) -> (RcNode, bool) { let mut node = Rc::clone(&self.head); // 从头节点开始 let mut next_node = Rc::clone(&node); let mut i = self.max_height - 1; loop { // 这里将 cmp 事后设置为 Ordering::Less 是一个十分奇妙的形式, 就能够主动蕴含 下个节点为空(当作是无穷大)的状况了 let mut cmp = Ordering::Less; if let Some(ref next) = node.borrow().next[i] { // 下一个节点存在 cmp = self .cmp .compare(key.data(), self.get_key_data(&node.borrow())); next_node = Rc::clone(&next); } // 大于下一个节点,持续沿着以后层 向右 跳 if cmp == Ordering::Greater { node = Rc::clone(&next_node); continue; } // 走到这里,阐明: node 小于等于下一个节点 或 下一个节点是空 // 如果不保留前向节点,只是一般的搜寻,找到匹配就间接返回 if (pre_node.is_none()) && cmp == Ordering::Equal { return (next_node, true); } // 如果保留前向节点node if let Some(ref mut pre) = pre_node { pre.push(Rc::clone(&node)); } if i == 0 { return (next_node, cmp == Ordering::Equal); } i -= 1; } }
首先把pre_node作为入参传入find_great_or_equal , pre_node: &mut Option<Vec<RcNode>> 因为pre_node要存储每一层的前向节点,是可变的所以&mut ,另外因为pre_node是可选的,所以应用Option。这里留神不能写成 pre_node: Option<&mut Vec<RcNode>>,这样在从Option中提取Vec的时候就会造成pre_node的move,导致编译失败。
接下来应用node.borrow()获取以后节点的不变借用,而后通过next[i]获取第i层的下一节点,因为是Option类型,通过 if let Some(ref next)来在next节点存在的状况下获取next节点的援用。
// 如果下一个节点存储 if let Some(ref next) = node.borrow().next[i] { // 下一个节点存在,和下个节点进行比拟 cmp = self.cmp.compare(key.data(), self.get_key_data(&next.borrow())); next_node = Rc::clone(&next); }
另外留神因为next的申明周期只在 if let {}内,所以要通过 next_node = Rc::clone(&next);记录下来用于下一步迭代。
依据比拟后果,如果大于下一节点,就沿着以后层跳到下一节点
// 大于下一个节点,持续沿着以后层 向右 跳 if cmp == Ordering::Greater { node = Rc::clone(&next_node); continue; }
接下来同理,优先判断如果不保留前向节点且找到匹配节点的状况:
// 走到这里,阐明: node 小于等于下一个节点 或 下一个节点是空 // 如果不保留前向节点,只是一般的搜寻,找到匹配就间接返回 if (pre_node.is_none()) && cmp == Ordering::Equal { return (next_node, true); }
如果保留前向节点,利用 if let Some(ref mut pre)=pre_node 从pre_node获取Vec的可变借用,而后将 node 的共享借用放入:
// 如果保留前向节点node if let Some(ref mut pre) = pre_node { pre.push(Rc::clone(&node)); }
最初就是如果到底层了,就返回,没有到底层,就跳过下一层:
// 如果到了最初一层,就返回 if i == 0 { return (next_node, cmp == Ordering::Equal); } i -= 1;
其它
其它局部比较简单,间接看源码就能够了。
DBSkip
pub struct DBSkip<T: Comparer> { db: sync::RwLock<db_skip<T>>,}
put
key曾经存在的解决逻辑如下,跟arena版本解决逻辑差不多,差异就在于更新以后节点node时候,应用borrow_mut()获取以后节点的可变借用进行批改:
// 如果key曾经存在,间接笼罩之前的value // 数据是追加的形式,所以以后kv_data的长度就是node节点的在 skv_data 上的新的偏移量 let offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); } // 更新node的偏移量 node.borrow_mut().offset = offset; // value 的长度可能也变动了 // 之前的长度 let v_old = node.borrow().value_len; // 更新为新的长度 node.borrow_mut().value_len = v_len; // 更新数据总大小 db.kv_size += v_len - v_old; return Ok(());
如果是新增的key,先调配层高,依据层高解决pre_node
let mut pre_node = pre_node.unwrap(); let h = db.rand_heigth(); // 解决head节点 if h > db.max_height { // 补充 高出的局部 for i in db.max_height..h{ pre_node.push(Rc::clone(&db.head)); } db.max_height = h; println!("height {}", h); }
保留新节点数据在kv_data中的起始偏移 ,而后给kv_data追加key,value数据。
// 新增节点在 kv_data 中的起始偏移 let kv_offset = db.kv_data.len(); // 追加 key 和 value 的数据 db.kv_data.append(&mut key.data().to_vec()); let mut v_len = 0; if let Some(value) = value { v_len = value.len(); db.kv_data.append(&mut value.to_vec()); }
Vec的append办法pub fn append(&mut self, other: &mut Self) 对应阐明 Moves all the elements of other into Self, leaving other empty.
创立新节点
let node = Rc::new(RefCell::new(node::new( kv_offset, key.data().len(), v_len, h, )));
对查问通过的每一层链表执行插入:
// 执行插入 for (i, pre) in pre_node.iter().enumerate() { // 新节点->next = pre->next if let Some(ref pre_next) = pre.borrow().next[i] { node.borrow_mut().next[i] = Some(Rc::clone(pre_next)); } // pre->next = 新节点 pre.borrow_mut().next[i] = Some(Rc::clone(&node)); }
更新统计数据
db.kv_size+=key.data().len()+v_len; db.n+=1; Ok(())
删除
看懂put办法,delete办法就简略许多
pub fn delete(&mut self, key: &internalKey) -> Option<()> { let mut db = self.db.write().unwrap(); let mut pre_node = Some(vec![]); let (node, exist) = db.find_great_or_equal(key, &mut pre_node); if !exist{ return None; } let pre_node = pre_node.unwrap(); // 执行删除 for (i, pre) in pre_node.iter().enumerate() { // 前向节点执行以后节点的下一个节点 pre->next = node->next if let Some(ref node_next) = node.borrow().next[i] { // node_next: 以后节点在第i层的下一跳节点 pre.borrow_mut().next[i] = Some(Rc::clone(node_next)); } } db.kv_size-=node.borrow().key_len+node.borrow().value_len; db.n -=1; Some(()) }
参考资料
跳跃表 https://www.bookstack.cn/read...
跳跃链表 https://www.cnblogs.com/s-lis...
level-rs残缺我的项目地址 https://github.com/kingeaster...