前言
leveldb中memdb模块应用skiplist作为一个kv的内存存储,相干代码实现十分丑陋。在上文介绍了上面内容:

  • 比照c++和golang版本中查问、插入、删除的实现
  • 剖析golang版本中能够优化的中央,而后对rust版本进行优化

而后在本文中将会介绍

  • 如何参考goleveldb的版本应用rust重写memdb(arena版本)
  • 应用rust重写一个非arena版本的memdb,也就是经典的链表构造实现形式

arena实现

参考 goleveldb DB 的代码,同时思考到并发平安,所以在Rust实现中,别离定义了db 和Db 两个构造

  • db蕴含所有的成员,非线程平安,提供查问相干办法;
  • Db 对db增加Mutex封装,线程平安,提供外围的插入,删除性能以及更多的查问性能;

具体如下:

db
https://github.com/kingeaster...

struct db<T: Comparer> {    cmp: T,  // 比拟器,用于比拟key    // rnd:rand    // 存储理论key,value的数据    kv_data: Vec<u8>,    // 用于记录key,value在kv_data中的索引 ,每一个节点的格局如下 ,其中 level 示意以后节点的层数,后跟 level 个数字,别离示意以后节点的level个层中每一层的下一个节点在node_data中的地位    // kvOffset1|len(key1)|len(value1)|level|next_node_1|...|next_node_i|...|next_node_h|    node_data: Vec<usize>, // 后面16个字节对应于 head 节点    // 在查问过程中,记录搜寻过程中所经验的节点(理论是节点在node_data的地位),次要用于插入和删除    prev_node: RefCell<[usize; MAX_HEIGHT]>,    // 以后skiplist的层数    max_height: usize,    // skiplist中的节点个数    n: usize,    // 数据总大小    kv_size: usize,}

这个db定义和goleveldb 定义的是十分相似的,没有太多简单的中央。

不过须要留神的是这里db中的prev_node成员变量,用于在查问或删除过程中记录每一层的前向节点,在golveldb中是一个一般的数组prevNode ,在咱们的Rust定义中是一个用RefCell封装的数组 RefCell<[usize; MAX_HEIGHT]> ,起因在于db的搜寻办法有两种应用场景,一种是用于纯正的搜寻查问,那么以后db就是只读的,应用不变借用 &self,如果用于插入或删除,须要往RefCell中插入数据,那么db就变成可变了,须要应用可变借用 &mut self,为了让db放弃 不变借用语义,所以应用RefCell来提供外部可变个性。那么为什么要让db放弃不变借用,间接不论 纯查问或批改查问都应用可变借用不就行了吗?

因为Rust中不变借用是能够共享的,而可变借用是不能够共享的,如果间接只用可变借用&mut self的话,就会限度纯 查问操作的应用场景,即便一个操作只是查问,也要将db申明为mut。

另外一种办法就是prev_node 不作为db的成员变量,而是在查问的时候作为一个额定的函数入参,具体能够参考 节点版本的做法

封装next节点的读取和设置

github 地址

为了进步代码的可读性和可维护性,将 获取node节点在level层的下一个节点在node_data中的地位的操作和 设置下一个节点的操作进行封装:

// 计算node节点在level层的下一个节点在node_data中的地位 ,封装一下,进步代码可读性    fn next_node(&self, node: usize, i: usize) -> usize {        // + NNEXT 示意在node_data 中,从node地位开始,要跳过  kvOffset1|len(key1)|len(value1)|level| 这4个字节,前面再挪动 i 个地位,就达到 next_node_i 了        self.node_data[node + NNEXT + i]    }    fn set_next_node(&mut self, node: usize, i: usize, next: usize) {        self.node_data[node + NNEXT + i] = next;    }

封装读取key或value的数据

github 地址

为了提供代码的可行性和可维护性,将 获取key的实在数据 和获取value的实在数的操作封装

// 依据 node 在 node_data中的地位,求出在kv_data 中的偏移量和长度,从而失去 key

fn get_key_data(&self, node: usize) -> &[u8] {    let offset = self.node_data[node];    &self.kv_data[offset..offset + self.node_data[node + NKEY]]}
    // 依据 node 在 node_data 中的地位,求出在kv_data 中的偏移量和长度,从而失去 value    fn get_value_data(&self, node: usize) -> &[u8] {        let key_offset = self.node_data[node] + self.node_data[node + NKEY];        &self.kv_data[key_offset..key_offset + self.node_data[node + NVAL]]    }

查问大于等于特定key的节点

github 地址

能够看到通过封装,find_great_or_equal的实现形式与skiplist的算法形容更加贴合。

// save_pre 标记 在搜寻过程中是否要记录遍历过的节点    pub fn find_great_or_equal(&self, key: &internalKey, save_pre: bool) -> (usize, bool) {        let mut node = 0;        // 从高层到底层开始搜寻        let mut i = self.max_height - 1;        // println!("max_height {}", i);        loop {            // 下个节点在 node_data 中的地位            let next = self.next_node(node, i);            let mut cmp = Ordering::Greater;            // 以后链表上没有走到尾            if next > 0 {                // 和下个节点next进行key比拟                cmp = self.cmp.compare(self.get_key_data(next), key.data());            }            // 大于下一个节点,持续沿着以后层 向右 跳            if cmp == Ordering::Less {                node = next;            } else {                // 小于等于下一个节点 或 下一个节点是空                // if save_pre {                //     // 对于插入或删除而进行的搜寻,即便遇到雷同的也要持续往下一层比拟,不能立刻返回                //     // 所以这里要先于 cmp == Ordering::Equal 的判断                //     self.prev_node.borrow_mut()[i] = node;                // } else if cmp == Ordering::Equal {                //     // find_great_or_equal 跟 find_less 的一个不同就是这里返回的是 next                //     return (next, true);                // }                // 改成上面的形式可读性更高                if (!save_pre) && cmp == Ordering::Equal {                    return (next, true);                }                if save_pre {// 如果须要放弃前向节点,记录到pre_node中                    self.prev_node.borrow_mut()[i] = node;                }                if i == 0 {                    return (next, cmp == Ordering::Equal);                }                i -= 1;            }        }    }

在下面实现中,对于以后节点小于等于下一个节点的解决,相比golang的写法进行了重构。参考golang的写法如下:

// 小于等于下一个节点 或 下一个节点是空                if save_pre {                     // 对于插入或删除而进行的搜寻,即便遇到雷同的也要持续往下一层比拟,不能立刻返回                     // 所以这里要先于 cmp == Ordering::Equal 的判断                     self.prev_node.borrow_mut()[i] = node;                 } else if cmp == Ordering::Equal {                     // find_great_or_equal 跟 find_less 的一个不同就是这里返回的是 next                     return (next, true);                 }

这里代码的次要含意是:如果只是纯正的查问操作的话,找到匹配的就能够间接返回了;然而如果是为了插入或删除而进行的查问,即便找到了匹配的节点也要往下一层跳,直到最上面的一层才能够返回。了解了代码的意思咱们进行重写
// 改成上面的形式可读性更高

                if (!save_pre) && cmp == Ordering::Equal {                    return (next, true);                }                if save_pre {                    self.prev_node.borrow_mut()[i] = node;                }

其它

find_lessthan, find_last 这两个method的Rust实现跟goleveldb统一,就不多讲,大家能够间接点击去看源码。

Db

github Db

pub struct Db<T: Comparer> {    db: sync::RwLock<db<T>>,}

db次要用来提供搜寻办法,非线程平安的, Db执行插入和删除,线程平安。

插入put

github 插入

首先获取写锁,在Rust中,sync::RwLock<T>通过调用write() 办法获取写锁:

let mut db = self.db.write().unwrap();

搜寻key以获取插入的地位

let (node, exist) = db.find_great_or_equal(key, true);

如果key曾经存在,就能够重复使用之前node_data

let (node, exist) = db.find_great_or_equal(key, true);        if exist {            // TODO 优化,如果新value的长度小于等于旧的value,间接笼罩间接的,不必重新分配            // 如果key曾经存在,间接笼罩之前的value            // 数据是追加的形式,所以以后kv_data的长度就是node节点的在 skv_data 上的新的偏移量            let offset = db.kv_data.len();            // 追加 key 和 value 的数据            db.kv_data.append(&mut key.data().to_vec());            let mut v_len = 0;            if let Some(value) = value {                v_len = value.len();                db.kv_data.append(&mut value.to_vec());            }            // 更新node的偏移量            db.node_data[node] = offset;            // value 的长度可能也变动了            // 之前的长度            let v_old = db.node_data[node + NVAL];            // 更新为新的长度            db.node_data[node + NVAL] = v_len;            // 更新数据总大小            db.kv_size += v_len - v_old;            return Ok(());        }

对于新插入的点,先计算调配的层高

let h = db.rand_heigth();

同样如果调配的层高比以后skiplist的最大层高还要高,就要给pre_node弥补缺失的前向节点,也就是把head节点补充进去。

// 解决head节点        if h > db.max_height {            for i in db.max_height..h + 1 {                db.prev_node.borrow_mut()[i] = 0;            }            db.max_height = h;            println!("height {}", h);        }

失去新节点数据在kv_data中的起始偏移,而后将key,value数据追加到kv_data前面.

// 新增节点在 kv_data 中的起始偏移        let kv_offset = db.kv_data.len();        // 追加 key 和 value 的数据        db.kv_data.append(&mut key.data().to_vec());        let mut v_len = 0;        if let Some(value) = value {            v_len = value.len();            db.kv_data.append(&mut value.to_vec());        }

记录以后node_data的长度,也就是新节点在node_data中的起始偏移,而后在node_data中追加新节点,

// 创立新节点,因为是追加形式,所以以后 node_data 的长度 就是新节点在 node_data 的地位        let node = db.node_data.len();        // 增加新节点        db.node_data.push(kv_offset); // 在kv_data中的偏移        db.node_data.push(key.data().len()); // key的长度        db.node_data.push(v_len);// value的长度        db.node_data.push(h); // 以后节点的层高

随后依照goleveldb的写法执行链表插入,goleveldb中是这么写的

// 遍历每层的前向节点        for i, n := range p.prevNode[:h] {            m := n + nNext + i // n节点在i层的下一个节点            p.nodeData = append(p.nodeData, p.nodeData[m]) // 以后节点第n层的下一个节点指向m            p.nodeData[m] = node // n节点在i层的下一个节点指向以后节点node        }

遍历prev_node中保留的前向节点,而后执行插入,也就是将以后节点指向前向节点的下一个节点,而后前向节点的下一个节点指向以后节点:

// 遍历每层的前向节点,iter()只会返回Item,利用enumerate封装能够同时返回下标         for (i,n )in db.prev_node.borrow()[0..h].iter().enumerate(){             let next = db.next_node(*n, i);             db.node_data.push(next);             db.set_next_node(*n, i, node);         }

编译器报错:

error[E0502]: cannot borrow `db` as mutable because it is also borrowed as immutable            --> src/memdb/memdb.rs:343:13             |         341 |         for (i,n )in db.prev_node.borrow()[0..h].iter().enumerate(){             |                      ---------------------             |                      |             |                      immutable borrow occurs here             |                      a temporary with access to the immutable borrow is created here ...         342 |             let next = db.next_node(*n, i);         343 |             db.node_data.push(next);             |             ^^ mutable borrow occurs here         344 |         345 |         }

在for循环的作用域中,同时存在对db的不可变借用和可变借用,那么只能将两者离开;另外为了进步代码可读性,先提前给node_data扩大h长度,用于存储h个下个节点:

db.node_data.resize(node + NNEXT + h, 0);

执行节点插入的代码改写为

// 遍历每一层        for i in 0..h {            let n = db.prev_node.borrow()[i];// 前向节点            let next = db.next_node(n, i); // 前向节点在第i层的下一节点            db.set_next_node(node, i, next); // 以后节点第i层的下一个节点指向next            db.set_next_node(n, i, node); // 前向节点在第i层的下一个节点指向以后节点node        }

更新统计信息

// 更新数据大小和个数        db.kv_size += key.data().len() + v_len;        db.n += 1;

删除

github delete

// 删除    pub fn delete(&mut self, key: &internalKey) -> Option<()> {        let mut db = self.db.write().unwrap();        let (node, exist) = db.find_great_or_equal(key, true);        if !exist {            return None;        }        // 以后节点有几层        let h = db.node_data[node + NHEIGHT];        // 开始删除, 让前一个节点指向前一个节点的下一个节点的下一个节点 pre->next = pre->next->next        for i in 0..h {            let pre = db.prev_node.borrow()[i];            // let pre_next = db.next_node(pre, i);            // if pre_next != node {            //     print!("{}:{}", pre_next, node);            // }            // let next_next = db.next_node(pre_next, i);            // db.set_next_node(pre, i, next_next);            // 因为 前一个节点的下一个节点 pre_node 就是以后节点 node ,所以下面代码能够优化为            let next_next = db.next_node(node, i);            db.set_next_node(pre, i, next_next);        }        db.kv_size -= db.node_data[node + NKEY] + db.node_data[node + NVAL];        db.n -= 1;        Some(())    }

delete的代码中,跟put相似,在遍历pre_node的时候,要通过下标进行拜访,获取每一层的前向节点

let pre = db.prev_node.borrow()[i];

另外因为 前一个节点的下一个节点 pre_node 就是以后节点 node ,所以将节点删除的代码从

// 前向节点的下一个节点             let pre_next = db.next_node(pre, i);             let next_next = db.next_node(pre_next, i);             db.set_next_node(pre, i, next_next);

改为

// 间接应用以后节点            let next_next = db.next_node(node, i);            db.set_next_node(pre, i, next_next);

其它

其它的办法比较简单,大家间接看源码即可

节点指针实现

节点定义

node,RcNode 如下:

type RcNode = Rc<RefCell<node>>;// 每一个节点struct node {    offset: usize,             // 对应kv_data 中的起始地位    key_len: usize,            // key的长度    value_len: usize,          // value的长度    next: Vec<Option<RcNode>>, // 以后节点有 height 层, 第i元素示意第i层的下一个节点}

因为链表中节点要被其它节点援用,要共享所有权,所以要应用Rc,另外因为在操作中要更改next,所以应用RefCell提供外部可变性,type RcNode = Rc<RefCell<node>>;

node的next属性中,每一层的下一个节点可能是空,所以应用Option;

为node实现如下获取理论key,value数据的办法,这里因为返回的是&[u8],所以要应用申明周期`a;

// 依据 node 在 node_data中的地位,求出在kv_data 中的偏移量和长度,从而失去 key    fn get_key_data<'a>(&self, node: &node, kv_data: &'a [u8]) -> &'a [u8] {        &kv_data[node.offset..node.offset + node.key_len]    }    // 依据 node 在 node_data 中的地位,求出在kv_data 中的偏移量和长度,从而失去 value    fn get_value_data<'a>(&self, node: &node, kv_data: &'a [u8]) -> &'a [u8] {        &kv_data[node.offset + node.key_len..node.offset + node.key_len + node.value_len]    }

db_skip

github db_skip

db_skip的定义如下:

struct db_skip<T: Comparer> {    cmp: T, // 比拟器,用于比拟key    kv_data: Vec<u8>, // 存储理论key,value的数据 ,offset从1开始,  offset为0的示意head节点    head: RcNode, // 头部,    // 以后skiplist的层数    max_height: usize,    // skiplist中的节点个数    n: usize,    // 数据总大小    kv_size: usize,}

比拟于agena版本,这里少了pre_node,多了一个head成员用于保留skiplist的首节点。

封装读取key或value的数据

// 依据 node 在 node_data中的地位,求出在kv_data 中的偏移量和长度,从而失去 key    fn get_key_data(&self, node: &node) -> &[u8] {        &self.kv_data[node.offset..node.offset + node.key_len]    }    // 依据 node 在 node_data 中的地位,求出在kv_data 中的偏移量和长度,从而失去 value    fn get_value_data(&self, node: &node) -> &[u8] {        &self.kv_data[node.offset + node.key_len..node.offset + node.key_len + node.value_len]    }

查问大于等于特定key的节点

github find_great_or_equal

pub fn find_great_or_equal(        &self,        key: &internalKey,        pre_node: &mut Option<Vec<RcNode>>, // 留神 不能用 Option<&mut Vec<RcNode>>    ) -> (RcNode, bool) {        let mut node = Rc::clone(&self.head); // 从头节点开始        let mut next_node = Rc::clone(&node);        let mut i = self.max_height - 1;        loop {            // 这里将 cmp 事后设置为 Ordering::Less 是一个十分奇妙的形式, 就能够主动蕴含 下个节点为空(当作是无穷大)的状况了            let mut cmp = Ordering::Less;            if let Some(ref next) = node.borrow().next[i] {                // 下一个节点存在                cmp = self                    .cmp                    .compare(key.data(), self.get_key_data(&node.borrow()));                next_node = Rc::clone(&next);            }            // 大于下一个节点,持续沿着以后层 向右 跳            if cmp == Ordering::Greater {                node = Rc::clone(&next_node);                continue;            }            // 走到这里,阐明: node 小于等于下一个节点 或 下一个节点是空            // 如果不保留前向节点,只是一般的搜寻,找到匹配就间接返回            if (pre_node.is_none()) && cmp == Ordering::Equal {                return (next_node, true);            }            // 如果保留前向节点node            if let Some(ref mut pre) = pre_node {                pre.push(Rc::clone(&node));            }            if i == 0 {                return (next_node, cmp == Ordering::Equal);            }            i -= 1;        }    }

首先把pre_node作为入参传入find_great_or_equal , pre_node: &mut Option<Vec<RcNode>> 因为pre_node要存储每一层的前向节点,是可变的所以&mut ,另外因为pre_node是可选的,所以应用Option。这里留神不能写成 pre_node: Option<&mut Vec<RcNode>>,这样在从Option中提取Vec的时候就会造成pre_node的move,导致编译失败。

接下来应用node.borrow()获取以后节点的不变借用,而后通过next[i]获取第i层的下一节点,因为是Option类型,通过 if let Some(ref next)来在next节点存在的状况下获取next节点的援用。

// 如果下一个节点存储            if let Some(ref next) = node.borrow().next[i] {                // 下一个节点存在,和下个节点进行比拟                cmp = self.cmp.compare(key.data(), self.get_key_data(&next.borrow()));                next_node = Rc::clone(&next);            }

另外留神因为next的申明周期只在 if let {}内,所以要通过 next_node = Rc::clone(&next);记录下来用于下一步迭代。

依据比拟后果,如果大于下一节点,就沿着以后层跳到下一节点

// 大于下一个节点,持续沿着以后层 向右 跳            if cmp == Ordering::Greater {                node = Rc::clone(&next_node);                continue;            }

接下来同理,优先判断如果不保留前向节点且找到匹配节点的状况:

// 走到这里,阐明: node 小于等于下一个节点 或 下一个节点是空            // 如果不保留前向节点,只是一般的搜寻,找到匹配就间接返回            if (pre_node.is_none()) && cmp == Ordering::Equal {                return (next_node, true);            }

如果保留前向节点,利用 if let Some(ref mut pre)=pre_node 从pre_node获取Vec的可变借用,而后将 node 的共享借用放入:

// 如果保留前向节点node            if let Some(ref mut pre) = pre_node {                pre.push(Rc::clone(&node));            }

最初就是如果到底层了,就返回,没有到底层,就跳过下一层:

// 如果到了最初一层,就返回            if i == 0 {                return (next_node, cmp == Ordering::Equal);            }            i -= 1;

其它

其它局部比较简单,间接看源码就能够了。

DBSkip

pub struct DBSkip<T: Comparer> {    db: sync::RwLock<db_skip<T>>,}

put

key曾经存在的解决逻辑如下,跟arena版本解决逻辑差不多,差异就在于更新以后节点node时候,应用borrow_mut()获取以后节点的可变借用进行批改:

// 如果key曾经存在,间接笼罩之前的value            // 数据是追加的形式,所以以后kv_data的长度就是node节点的在 skv_data 上的新的偏移量            let offset = db.kv_data.len();            // 追加 key 和 value 的数据            db.kv_data.append(&mut key.data().to_vec());            let mut v_len = 0;            if let Some(value) = value {                v_len = value.len();                db.kv_data.append(&mut value.to_vec());            }            // 更新node的偏移量            node.borrow_mut().offset = offset;            // value 的长度可能也变动了            // 之前的长度            let v_old = node.borrow().value_len;            // 更新为新的长度            node.borrow_mut().value_len = v_len;            // 更新数据总大小            db.kv_size += v_len - v_old;            return Ok(());

如果是新增的key,先调配层高,依据层高解决pre_node

let mut pre_node = pre_node.unwrap();        let h = db.rand_heigth();        // 解决head节点        if h > db.max_height {            // 补充 高出的局部            for i in db.max_height..h{                pre_node.push(Rc::clone(&db.head));            }            db.max_height = h;            println!("height {}", h);        }

保留新节点数据在kv_data中的起始偏移 ,而后给kv_data追加key,value数据。

// 新增节点在 kv_data 中的起始偏移        let kv_offset = db.kv_data.len();        // 追加 key 和 value 的数据        db.kv_data.append(&mut key.data().to_vec());        let mut v_len = 0;        if let Some(value) = value {            v_len = value.len();            db.kv_data.append(&mut value.to_vec());        }
Vec的append办法pub fn append(&mut self, other: &mut Self) 对应阐明 Moves all the elements of other into Self, leaving other empty.

创立新节点

let node = Rc::new(RefCell::new(node::new(            kv_offset,            key.data().len(),            v_len,            h,        )));

对查问通过的每一层链表执行插入:

// 执行插入        for (i, pre) in pre_node.iter().enumerate() {            // 新节点->next  =  pre->next            if let Some(ref pre_next) = pre.borrow().next[i] {                node.borrow_mut().next[i] = Some(Rc::clone(pre_next));            }            // pre->next = 新节点            pre.borrow_mut().next[i] = Some(Rc::clone(&node));        }

更新统计数据

db.kv_size+=key.data().len()+v_len;       db.n+=1;       Ok(())

删除

看懂put办法,delete办法就简略许多

pub fn delete(&mut self, key: &internalKey) -> Option<()> {        let mut db = self.db.write().unwrap();        let mut pre_node = Some(vec![]);        let (node, exist) = db.find_great_or_equal(key, &mut pre_node);        if !exist{            return None;        }        let pre_node = pre_node.unwrap();        // 执行删除        for (i, pre) in pre_node.iter().enumerate() {            // 前向节点执行以后节点的下一个节点 pre->next = node->next            if let Some(ref node_next) = node.borrow().next[i] { // node_next: 以后节点在第i层的下一跳节点                pre.borrow_mut().next[i] = Some(Rc::clone(node_next));            }        }        db.kv_size-=node.borrow().key_len+node.borrow().value_len;        db.n -=1;        Some(())    }

参考资料
跳跃表 https://www.bookstack.cn/read...
跳跃链表 https://www.cnblogs.com/s-lis...
level-rs残缺我的项目地址 https://github.com/kingeaster...