最近参加了一个不错的开源我的项目,是一个基于 BitCask 模型实现的 KV 存储引擎。我的项目地址:CouloyDB。大家感觉不错的话能够来一个小小的 star

因为性能上想向 Redis 看齐,所以打算实现过期主动删除的性能。我采取了小顶堆来实现过期删除,并且在 Get 的时候,也会进行惰性删除。

工夫堆的实现

堆中的每个元素都是一个 Job

type Job struct {    Key        string    Expiration time.Time}

其中记录了每个Key和它的过期工夫Expiration

堆的实现定义如下:

type h struct {    heap  []*Job    index map[string]int}

index存储的是Key对应的Job在数组中的切片,用以疾速获取Job而无需遍历切片。

在 Go 中能够通过实现heap.go中的接口来实现堆:

type Interface interface {    sort.Interface    Push(x any) // add x as element Len()    Pop() any   // remove and return element Len() - 1.}

具体实现能够查看CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub,这里就不给出具体代码了,重点留神实现的时候须要在接口办法里同样对 index 进行更新。

实现了堆的接口之后,h构造体就能够应用堆的一些办法来操作了。额定对 h 封装了一层:

type TimeHeap struct {    heap h}

TimeHeap实现了如下办法:

  • Push.
  • Pop.
  • Get.
  • Remove.
  • Peek.
  • IsEmpty.

具体实现同样能够查看CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub。

利用工夫堆实现 TTL 机制

具体实现能够查看CouloyDB/ttl.go at master · Kirov7/CouloyDB · GitHub,这里就重点说一下exec办法:

func (ttl *ttl) exec() {    now := time.Now()    duration := MaxDuration    ttl.mu.Lock()    job := ttl.timeHeap.Peek()    ttl.mu.Unlock()    if job != nil {        if job.Expiration.After(now) {            duration = job.Expiration.Sub(now)        } else {            duration = 0        }    }    if duration > 0 {        timer := time.NewTimer(duration)        defer timer.Stop()        select {        case <-ttl.eventCh:            return        case <-timer.C:        }    }    ttl.mu.Lock()    job = ttl.timeHeap.Pop()    ttl.mu.Unlock()    if job == nil {        return    }    go ttl.deleter(job.Key)}

start办法的执行流程如下:

  1. 从堆顶取出一个 Job;
  2. 判断这个 Key 是否曾经过期了,并计算残余的存活工夫;
  3. 如果这个 Key 没有过期,那么启动一个 Timer 来期待这个 Key 过期,Timer 触发之后,会将这个 Job 从堆中 Pop 进去;如果曾经过期了那么间接 Pop 进去。
  4. 开启一个协程来异步删除 Key。

这个流程中有一个很值得注意的中央是:

select {case <-ttl.eventCh:    returncase <-timer.C:}

一旦有堆中元素被插入或删除都须要间接返回,跳到下一次exec的调用。这是因为有可能新插入的元素会在以后堆顶元素之前过期,应该要先删除。或是以后堆顶元素曾经被删除了,再次 Pop 的话就会呈现谬误了。

调用接口

Get办法中,如果 Key 存在就要去工夫堆中检查一下是否过期了,如果过期了就间接返回谬误。

if db.ttl.isExpired(string(key)) {    // if the key is expired, just return and don't delete the key now    return nil, public.ErrKeyNotFound}

而后我将原先的Put办法又从新封装了一下:

func (db *DB) PutWithExpiration(key, value []byte, duration time.Duration) error {    return db.put(key, value, duration)}func (db *DB) Put(key, value []byte) error {    return db.put(key, value, 0)}

put办法中能够依据duration是不是 0 来进行相应的操作。

如果duration不为 0,那么就计算出过期工夫,并向工夫堆中增加/更新一个Job,但duration如果为 0,有可能是要将之间设置了过期工夫的 Key 给勾销过期工夫,所以要进行一次删除。

var expiration int64if duration != 0 {    expiration = time.Now().Add(duration).UnixNano()    db.ttl.add(ds.NewJob(string(key), time.Unix(0, expiration)))} else {    // If it is a key without an expiration time set    // you may need to remove the previously set expiration time    db.ttl.del(string(key))}

接着在Del办法中,只须要简略的从工夫堆删除对应的Job即可。

db.ttl.del(string(key))

测试

测试了失常的过期删除、数据库重启之后的过期删除(重构工夫堆)、勾销过期工夫、对同一个 Key 再次 Put 来勾销过期工夫均无问题。具体实现能够查看CouloyDB/ttl_test.go at master · Kirov7/CouloyDB · GitHub。

参加我的项目

目前我的项目还处于刚起步的状态,有很多性能还没有实现也有性能优化方面的事件要做,想要参加我的项目的敌人能够间接提 issue 或 pr,有什么问题也能够间接分割我,掘金私信或分割邮箱 bigboss2063@outlook.com。