最近参加了一个不错的开源我的项目,是一个基于 BitCask 模型实现的 KV 存储引擎。我的项目地址:CouloyDB。大家感觉不错的话能够来一个小小的 star。
因为性能上想向 Redis 看齐,所以打算实现过期主动删除的性能。我采取了小顶堆来实现过期删除,并且在 Get 的时候,也会进行惰性删除。
工夫堆的实现
堆中的每个元素都是一个 Job
:
type Job struct { Key string Expiration time.Time}
其中记录了每个Key
和它的过期工夫Expiration
。
堆的实现定义如下:
type h struct { heap []*Job index map[string]int}
index
存储的是Key
对应的Job
在数组中的切片,用以疾速获取Job
而无需遍历切片。
在 Go 中能够通过实现heap.go
中的接口来实现堆:
type Interface interface { sort.Interface Push(x any) // add x as element Len() Pop() any // remove and return element Len() - 1.}
具体实现能够查看CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub,这里就不给出具体代码了,重点留神实现的时候须要在接口办法里同样对 index 进行更新。
实现了堆的接口之后,h
构造体就能够应用堆的一些办法来操作了。额定对 h 封装了一层:
type TimeHeap struct { heap h}
TimeHeap
实现了如下办法:
- Push.
- Pop.
- Get.
- Remove.
- Peek.
- IsEmpty.
具体实现同样能够查看CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub。
利用工夫堆实现 TTL 机制
具体实现能够查看CouloyDB/ttl.go at master · Kirov7/CouloyDB · GitHub,这里就重点说一下exec
办法:
func (ttl *ttl) exec() { now := time.Now() duration := MaxDuration ttl.mu.Lock() job := ttl.timeHeap.Peek() ttl.mu.Unlock() if job != nil { if job.Expiration.After(now) { duration = job.Expiration.Sub(now) } else { duration = 0 } } if duration > 0 { timer := time.NewTimer(duration) defer timer.Stop() select { case <-ttl.eventCh: return case <-timer.C: } } ttl.mu.Lock() job = ttl.timeHeap.Pop() ttl.mu.Unlock() if job == nil { return } go ttl.deleter(job.Key)}
start
办法的执行流程如下:
- 从堆顶取出一个 Job;
- 判断这个 Key 是否曾经过期了,并计算残余的存活工夫;
- 如果这个 Key 没有过期,那么启动一个 Timer 来期待这个 Key 过期,Timer 触发之后,会将这个 Job 从堆中 Pop 进去;如果曾经过期了那么间接 Pop 进去。
- 开启一个协程来异步删除 Key。
这个流程中有一个很值得注意的中央是:
select {case <-ttl.eventCh: returncase <-timer.C:}
一旦有堆中元素被插入或删除都须要间接返回,跳到下一次exec
的调用。这是因为有可能新插入的元素会在以后堆顶元素之前过期,应该要先删除。或是以后堆顶元素曾经被删除了,再次 Pop 的话就会呈现谬误了。
调用接口
在Get
办法中,如果 Key 存在就要去工夫堆中检查一下是否过期了,如果过期了就间接返回谬误。
if db.ttl.isExpired(string(key)) { // if the key is expired, just return and don't delete the key now return nil, public.ErrKeyNotFound}
而后我将原先的Put
办法又从新封装了一下:
func (db *DB) PutWithExpiration(key, value []byte, duration time.Duration) error { return db.put(key, value, duration)}func (db *DB) Put(key, value []byte) error { return db.put(key, value, 0)}
在put
办法中能够依据duration
是不是 0 来进行相应的操作。
如果duration
不为 0,那么就计算出过期工夫,并向工夫堆中增加/更新一个Job
,但duration
如果为 0,有可能是要将之间设置了过期工夫的 Key 给勾销过期工夫,所以要进行一次删除。
var expiration int64if duration != 0 { expiration = time.Now().Add(duration).UnixNano() db.ttl.add(ds.NewJob(string(key), time.Unix(0, expiration)))} else { // If it is a key without an expiration time set // you may need to remove the previously set expiration time db.ttl.del(string(key))}
接着在Del
办法中,只须要简略的从工夫堆删除对应的Job
即可。
db.ttl.del(string(key))
测试
测试了失常的过期删除、数据库重启之后的过期删除(重构工夫堆)、勾销过期工夫、对同一个 Key 再次 Put 来勾销过期工夫均无问题。具体实现能够查看CouloyDB/ttl_test.go at master · Kirov7/CouloyDB · GitHub。
参加我的项目
目前我的项目还处于刚起步的状态,有很多性能还没有实现也有性能优化方面的事件要做,想要参加我的项目的敌人能够间接提 issue 或 pr,有什么问题也能够间接分割我,掘金私信或分割邮箱 bigboss2063@outlook.com。