最近参加了一个不错的开源我的项目,是一个基于 BitCask 模型实现的 KV 存储引擎。我的项目地址:CouloyDB。大家感觉不错的话能够来一个小小的 star。
因为性能上想向 Redis 看齐,所以打算实现过期主动删除的性能。我采取了 小顶堆 来实现过期删除,并且在 Get 的时候,也会进行惰性删除。
工夫堆的实现
堆中的每个元素都是一个 Job
:
type Job struct {
Key string
Expiration time.Time
}
其中记录了每个 Key
和它的过期工夫Expiration
。
堆的实现定义如下:
type h struct {heap []*Job
index map[string]int
}
index
存储的是 Key
对应的 Job
在数组中的切片,用以疾速获取 Job
而无需遍历切片。
在 Go 中能够通过实现 heap.go
中的接口来实现堆:
type Interface interface {
sort.Interface
Push(x any) // add x as element Len()
Pop() any // remove and return element Len() - 1.
}
具体实现能够查看 CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub,这里就不给出具体代码了,重点留神实现的时候须要在接口办法里同样对 index 进行更新。
实现了堆的接口之后,h
构造体就能够应用堆的一些办法来操作了。额定对 h 封装了一层:
type TimeHeap struct {heap h}
TimeHeap
实现了如下办法:
- Push.
- Pop.
- Get.
- Remove.
- Peek.
- IsEmpty.
具体实现同样能够查看 CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub。
利用工夫堆实现 TTL 机制
具体实现能够查看 CouloyDB/ttl.go at master · Kirov7/CouloyDB · GitHub,这里就重点说一下 exec
办法:
func (ttl *ttl) exec() {now := time.Now()
duration := MaxDuration
ttl.mu.Lock()
job := ttl.timeHeap.Peek()
ttl.mu.Unlock()
if job != nil {if job.Expiration.After(now) {duration = job.Expiration.Sub(now)
} else {duration = 0}
}
if duration > 0 {timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ttl.eventCh:
return
case <-timer.C:
}
}
ttl.mu.Lock()
job = ttl.timeHeap.Pop()
ttl.mu.Unlock()
if job == nil {return}
go ttl.deleter(job.Key)
}
start
办法的执行流程如下:
- 从堆顶取出一个 Job;
- 判断这个 Key 是否曾经过期了,并计算残余的存活工夫;
- 如果这个 Key 没有过期,那么启动一个 Timer 来期待这个 Key 过期,Timer 触发之后,会将这个 Job 从堆中 Pop 进去;如果曾经过期了那么间接 Pop 进去。
- 开启一个协程来异步删除 Key。
这个流程中有一个很值得注意的中央是:
select {
case <-ttl.eventCh:
return
case <-timer.C:
}
一旦有堆中元素被插入或删除都须要间接返回,跳到下一次 exec
的调用。这是因为有可能新插入的元素会在以后堆顶元素之前过期,应该要先删除。或是以后堆顶元素曾经被删除了,再次 Pop 的话就会呈现谬误了。
调用接口
在 Get
办法中,如果 Key 存在就要去工夫堆中检查一下是否过期了,如果过期了就间接返回谬误。
if db.ttl.isExpired(string(key)) {
// if the key is expired, just return and don't delete the key now
return nil, public.ErrKeyNotFound
}
而后我将原先的 Put
办法又从新封装了一下:
func (db *DB) PutWithExpiration(key, value []byte, duration time.Duration) error {return db.put(key, value, duration)
}
func (db *DB) Put(key, value []byte) error {return db.put(key, value, 0)
}
在 put
办法中能够依据 duration
是不是 0 来进行相应的操作。
如果 duration
不为 0,那么就计算出过期工夫,并向工夫堆中增加 / 更新一个 Job
,但duration
如果为 0,有可能是要将之间设置了过期工夫的 Key 给勾销过期工夫,所以要进行一次删除。
var expiration int64
if duration != 0 {expiration = time.Now().Add(duration).UnixNano()
db.ttl.add(ds.NewJob(string(key), time.Unix(0, expiration)))
} else {
// If it is a key without an expiration time set
// you may need to remove the previously set expiration time
db.ttl.del(string(key))
}
接着在 Del
办法中,只须要简略的从工夫堆删除对应的 Job
即可。
db.ttl.del(string(key))
测试
测试了失常的过期删除、数据库重启之后的过期删除(重构工夫堆)、勾销过期工夫、对同一个 Key 再次 Put 来勾销过期工夫均无问题。具体实现能够查看 CouloyDB/ttl_test.go at master · Kirov7/CouloyDB · GitHub。
参加我的项目
目前我的项目还处于刚起步的状态,有很多性能还没有实现也有性能优化方面的事件要做,想要参加我的项目的敌人能够间接提 issue 或 pr,有什么问题也能够间接分割我,掘金私信或分割邮箱 bigboss2063@outlook.com。