关于数据库:为-BitCask-存储引擎实现过期删除功能

42次阅读

共计 2574 个字符,预计需要花费 7 分钟才能阅读完成。

最近参加了一个不错的开源我的项目,是一个基于 BitCask 模型实现的 KV 存储引擎。我的项目地址:CouloyDB。大家感觉不错的话能够来一个小小的 star

因为性能上想向 Redis 看齐,所以打算实现过期主动删除的性能。我采取了 小顶堆 来实现过期删除,并且在 Get 的时候,也会进行惰性删除。

工夫堆的实现

堆中的每个元素都是一个 Job

type Job struct {
    Key        string
    Expiration time.Time
}

其中记录了每个 Key 和它的过期工夫Expiration

堆的实现定义如下:

type h struct {heap  []*Job
    index map[string]int
}

index存储的是 Key 对应的 Job 在数组中的切片,用以疾速获取 Job 而无需遍历切片。

在 Go 中能够通过实现 heap.go 中的接口来实现堆:

type Interface interface {
    sort.Interface
    Push(x any) // add x as element Len()
    Pop() any   // remove and return element Len() - 1.
}

具体实现能够查看 CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub,这里就不给出具体代码了,重点留神实现的时候须要在接口办法里同样对 index 进行更新。

实现了堆的接口之后,h构造体就能够应用堆的一些办法来操作了。额定对 h 封装了一层:

type TimeHeap struct {heap h}

TimeHeap实现了如下办法:

  • Push.
  • Pop.
  • Get.
  • Remove.
  • Peek.
  • IsEmpty.

具体实现同样能够查看 CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub。

利用工夫堆实现 TTL 机制

具体实现能够查看 CouloyDB/ttl.go at master · Kirov7/CouloyDB · GitHub,这里就重点说一下 exec 办法:

func (ttl *ttl) exec() {now := time.Now()
    duration := MaxDuration

    ttl.mu.Lock()
    job := ttl.timeHeap.Peek()
    ttl.mu.Unlock()

    if job != nil {if job.Expiration.After(now) {duration = job.Expiration.Sub(now)
        } else {duration = 0}
    }

    if duration > 0 {timer := time.NewTimer(duration)
        defer timer.Stop()

        select {
        case <-ttl.eventCh:
            return
        case <-timer.C:
        }
    }

    ttl.mu.Lock()
    job = ttl.timeHeap.Pop()
    ttl.mu.Unlock()

    if job == nil {return}

    go ttl.deleter(job.Key)
}

start办法的执行流程如下:

  1. 从堆顶取出一个 Job;
  2. 判断这个 Key 是否曾经过期了,并计算残余的存活工夫;
  3. 如果这个 Key 没有过期,那么启动一个 Timer 来期待这个 Key 过期,Timer 触发之后,会将这个 Job 从堆中 Pop 进去;如果曾经过期了那么间接 Pop 进去。
  4. 开启一个协程来异步删除 Key。

这个流程中有一个很值得注意的中央是:

select {
case <-ttl.eventCh:
    return
case <-timer.C:
}

一旦有堆中元素被插入或删除都须要间接返回,跳到下一次 exec 的调用。这是因为有可能新插入的元素会在以后堆顶元素之前过期,应该要先删除。或是以后堆顶元素曾经被删除了,再次 Pop 的话就会呈现谬误了。

调用接口

Get 办法中,如果 Key 存在就要去工夫堆中检查一下是否过期了,如果过期了就间接返回谬误。

if db.ttl.isExpired(string(key)) {
    // if the key is expired, just return and don't delete the key now
    return nil, public.ErrKeyNotFound
}

而后我将原先的 Put 办法又从新封装了一下:

func (db *DB) PutWithExpiration(key, value []byte, duration time.Duration) error {return db.put(key, value, duration)
}

func (db *DB) Put(key, value []byte) error {return db.put(key, value, 0)
}

put 办法中能够依据 duration 是不是 0 来进行相应的操作。

如果 duration 不为 0,那么就计算出过期工夫,并向工夫堆中增加 / 更新一个 Job,但duration 如果为 0,有可能是要将之间设置了过期工夫的 Key 给勾销过期工夫,所以要进行一次删除。

var expiration int64
if duration != 0 {expiration = time.Now().Add(duration).UnixNano()
    db.ttl.add(ds.NewJob(string(key), time.Unix(0, expiration)))
} else {
    // If it is a key without an expiration time set
    // you may need to remove the previously set expiration time
    db.ttl.del(string(key))
}

接着在 Del 办法中,只须要简略的从工夫堆删除对应的 Job 即可。

db.ttl.del(string(key))

测试

测试了失常的过期删除、数据库重启之后的过期删除(重构工夫堆)、勾销过期工夫、对同一个 Key 再次 Put 来勾销过期工夫均无问题。具体实现能够查看 CouloyDB/ttl_test.go at master · Kirov7/CouloyDB · GitHub。

参加我的项目

目前我的项目还处于刚起步的状态,有很多性能还没有实现也有性能优化方面的事件要做,想要参加我的项目的敌人能够间接提 issue 或 pr,有什么问题也能够间接分割我,掘金私信或分割邮箱 bigboss2063@outlook.com。

正文完
 0