关于golang:动手实现一个localcache-实现篇

39次阅读

共计 8006 个字符,预计需要花费 21 分钟才能阅读完成。

原文链接:入手实现一个 localcache – 实现篇

前言

哈喽,大家好,我是asong,通过了后面两篇的介绍,咱们曾经根本理解该如何设计一个本地缓存了,本文就是这个系列的终结篇,本人入手实现一个本地缓存,接下来且听我细细道来!!!

本文代码曾经上传到 github:https://github.com/asong2020/go-localcache

当初这一版本算是一个 1.0,后续会持续进行优化和迭代。

第一步:形象接口

第一步很重要,以面向接口编程为准则,咱们先形象进去要裸露给用户的办法,给用户提供简略易懂的办法,因而我形象进去的后果如下:

// ICache abstract interface
type ICache interface {
    // Set value use default expire time. default does not expire.
    Set(key string, value []byte) error
    // Get value if find it. if value already expire will delete.
    Get(key string) ([]byte, error)
    // SetWithTime set value with expire time
    SetWithTime(key string, value []byte, expired time.Duration) error
    // Delete manual removes the key
    Delete(key string) error
    // Len computes number of entries in cache
    Len() int
    // Capacity returns amount of bytes store in the cache.
    Capacity() int
    // Close is used to signal a shutdown of the cache when you are done with it.
    // This allows the cleaning goroutines to exit and ensures references are not
    // kept to the cache preventing GC of the entire cache.
    Close() error
    // Stats returns cache's statistics
    Stats() Stats
    // GetKeyHit returns key hit
    GetKeyHit(key string) int64
}
  • Set(key string, value []byte):应用该办法存储的数据应用默认的过期工夫,如果革除过期的异步工作没有 enable,那么就永不过期,否则默认过期工夫为 10min。
  • Get(key string) ([]byte, error):依据 key 获取对象内容,如果数据过期了会在这一步删除。
  • SetWithTime(key string, value []byte, expired time.Duration):存储对象是应用自定义过期工夫
  • Delete(key string) error:依据 key 删除对应的缓存数据
  • Len() int:获取缓存的对象数量
  • Capacity() int:获取以后缓存的容量
  • Close() error:敞开缓存
  • Stats() Stats:缓存监控数据
  • GetKeyHit(key string) int64:获取 key 的命中率数据

第二步:定义缓存对象

第一步咱们形象好了接口,上面就要定义一个缓存对象实例实现接口,先看定义构造:

type cache struct {
    // hashFunc represents used hash func
    hashFunc HashFunc
    // bucketCount represents the number of segments within a cache instance. value must be a power of two.
    bucketCount uint64
    // bucketMask is bitwise AND applied to the hashVal to find the segment id.
    bucketMask uint64
    // segment is shard
    segments []*segment
    // segment lock
    locks    []sync.RWMutex
    // close cache
    close chan struct{}}
  • hashFunc:分片要用的哈希函数,用户能够自行定义,实现 HashFunc 接口即可,默认应用 fnv 算法。
  • bucketCount:分片的数量,肯定要是偶数,默认分片数为256
  • bucketMask:因为分片数是偶数,所以能够分片时能够应用位运算代替取余晋升性能效率,hashValue % bucketCount == hashValue & bucketCount - 1
  • segments:分片对象,每个分片的对象构造咱们在前面介绍。
  • locks:每个分片的读写锁
  • close:敞开缓存对象时告诉其余 goroutine 暂停

接下来咱们来写 cache 对象的构造函数:

// NewCache constructor cache instance
func NewCache(opts ...Opt) (ICache, error) {
    options := &options{hashFunc: NewDefaultHashFunc(),
        bucketCount: defaultBucketCount,
        maxBytes: defaultMaxBytes,
        cleanTime: defaultCleanTIme,
        statsEnabled: defaultStatsEnabled,
        cleanupEnabled: defaultCleanupEnabled,
    }
    for _, each := range opts{each(options)
    }

    if !isPowerOfTwo(options.bucketCount){return nil, errShardCount}

  if options.maxBytes <= 0 {return nil, ErrBytes}
  
    segments := make([]*segment, options.bucketCount)
    locks := make([]sync.RWMutex, options.bucketCount)

    maxSegmentBytes := (options.maxBytes + options.bucketCount - 1) / options.bucketCount
    for index := range segments{segments[index] = newSegment(maxSegmentBytes, options.statsEnabled)
    }

    c := &cache{
        hashFunc: options.hashFunc,
        bucketCount: options.bucketCount,
        bucketMask: options.bucketCount - 1,
        segments: segments,
        locks: locks,
        close: make(chan struct{}),
    }
    if options.cleanupEnabled {go c.cleanup(options.cleanTime)
    }
    
    return c, nil
}

这里为了更好的扩大,咱们应用 Options 编程模式,咱们的构造函数次要做三件事:

  • 前置参数查看,对于内部传入的参数,咱们还是要做根本的校验
  • 分片对象初始化
  • 结构缓存对象

这里结构缓存对象时咱们要先计算每个分片的容量,默认整个本地缓存 256M 的数据,而后在平均分到每一片区内,用户能够自行抉择要缓存的数据大小。

第三步:定义分片构造

每个分片构造如下:

type segment struct {hashmap map[uint64]uint32
    entries buffer.IBuffer
    clock   clock
    evictList  *list.List
    stats IStats
}
  • hashmp:存储 key 所对应的存储索引
  • entries:存储 key/value 的底层构造,咱们在第四步的时候介绍,也是代码的外围局部。
  • clock:定义工夫办法
  • evicList:这里咱们应用一个队列来记录 old 索引,当容量有余时进行删除(长期解决方案,以后存储构造不适宜应用 LRU 淘汰算法)
  • stats:缓存的监控数据。

接下来咱们再来看一下每个分片的构造函数:

func newSegment(bytes uint64, statsEnabled bool) *segment {
    if bytes == 0 {panic(fmt.Errorf("bytes cannot be zero"))
    }
    if bytes >= maxSegmentSize{panic(fmt.Errorf("too big bytes=%d; should be smaller than %d", bytes, maxSegmentSize))
    }
    capacity := (bytes + segmentSize - 1) / segmentSize
    entries := buffer.NewBuffer(int(capacity))
    entries.Reset()
    return &segment{
        entries: entries,
        hashmap: make(map[uint64]uint32),
        clock:   &systemClock{},
        evictList: list.New(),
        stats: newStats(statsEnabled),
    }
}

这里次要留神一点:

咱们要依据每个片区的缓存数据大小来计算出容量,与上文的缓存对象初始化步骤对应上了。

第四步:定义缓存构造

缓存对象当初也结构好了,接下来就是本地缓存的外围:定义缓存构造。

bigcachefastcachefreecache都应用字节数组代替 map 存储缓存数据,从而缩小 GC 压力,所以咱们也能够借鉴其思维持续放弃应用字节数组,这里咱们应用二维字节切片存储缓存数据key/value;画个图示意一下:

应用二维数组存储数据的相比于 bigcache 的劣势在于能够间接依据索引删除对应的数据,尽管也会有虫洞的问题,然而咱们能够记录下来虫洞的索引,一直填充。

每个缓存的封装构造如下:

根本思维曾经明确,接下来看一下咱们对存储层的封装:

type Buffer struct {array [][]byte
    capacity int
    index int
    // maxCount = capacity - 1
    count int
    // availableSpace If any objects are removed after the buffer is full, the idle index is logged.
    // Avoid array "wormhole"
    availableSpace map[int]struct{}
    // placeholder record the index that buffer has stored.
    placeholder map[int]struct{}}
  • array [][]byte:存储缓存对象的二维切片
  • capacity:缓存构造的最大容量
  • index:索引,记录缓存所在的地位的索引
  • count:记录缓存数量
  • availableSpace:记录 ” 虫洞 ”,当缓存对象被删除时记录下闲暇地位的索引,不便前面容量满了后应用 ” 虫洞 ”
  • placeholder:记录缓存对象的索引,迭代革除过期缓存能够用上。

buffer 写入数据的流程(不贴代码了):

<img src=”https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4e339104a34d4fabb45bc45a4a830a3a~tplv-k3u1fbpfcp-zoom-1.image” style=”zoom: 33%;” />

第五步:欠缺向缓存写入数据办法

下面咱们定义好了所有须要的构造,接下来就是填充咱们的写入缓存办法就能够了:

func (c *cache) Set(key string, value []byte) error  {hashKey := c.hashFunc.Sum64(key)
    bucketIndex := hashKey&c.bucketMask
    c.locks[bucketIndex].Lock()
    defer c.locks[bucketIndex].Unlock()
    err := c.segments[bucketIndex].set(key, hashKey, value, defaultExpireTime)
    return err
}

func (s *segment) set(key string, hashKey uint64, value []byte, expireTime time.Duration) error {
    if expireTime <= 0{return ErrExpireTimeInvalid}
    expireAt := uint64(s.clock.Epoch(expireTime))

    if previousIndex, ok := s.hashmap[hashKey]; ok {if err := s.entries.Remove(int(previousIndex)); err != nil{return err}
        delete(s.hashmap, hashKey)
    }

    entry := wrapEntry(expireAt, key, hashKey, value)
    for {index, err := s.entries.Push(entry)
        if err == nil {s.hashmap[hashKey] = uint32(index)
            s.evictList.PushFront(index)
            return nil
        }
        ele := s.evictList.Back()
        if err := s.entries.Remove(ele.Value.(int)); err != nil{return err}
        s.evictList.Remove(ele)
    }
}

流程剖析如下:

  • 依据 key 计算哈希值,而后依据分片数获取对应分片地位
  • 如果以后缓存中存在雷同的key,则先删除,在从新插入,会刷新过期工夫
  • 封装存储构造,依据过期工夫戳、key长度、哈希大小、缓存对象进行封装
  • 将数据存入缓存,如果缓存失败,移除最老的数据后再次重试

第六步:欠缺从缓存读取数据办法

第一步依据 key 计算哈希值,再依据分片数获取对应的分片地位:

func (c *cache) Get(key string) ([]byte, error)  {hashKey := c.hashFunc.Sum64(key)
    bucketIndex := hashKey&c.bucketMask
    c.locks[bucketIndex].RLock()
    defer c.locks[hashKey&c.bucketMask].RUnlock()
    entry, err := c.segments[bucketIndex].get(key, hashKey)
    if err != nil{return nil, err}
    return entry,nil
}

第二步执行分片办法获取缓存数据:

  • 先依据哈希值判断 key 是否存在于缓存中,不存返回 key 没有找到
  • 从缓存中读取数据失去缓存中的 key 判断是否产生哈希抵触
  • 判断缓存对象是否过期,过期删除缓存数据(能够依据业务优化须要是否返回以后过期数据)
  • 在每个记录缓存监控数据
func (s *segment) getWarpEntry(key string, hashKey uint64) ([]byte,error) {index, ok := s.hashmap[hashKey]
    if !ok {s.stats.miss()
        return nil, ErrEntryNotFound
    }
    entry, err := s.entries.Get(int(index))
    if err != nil{s.stats.miss()
        return nil, err
    }
    if entry == nil{s.stats.miss()
        return nil, ErrEntryNotFound
    }

    if entryKey := readKeyFromEntry(entry); key != entryKey {s.stats.collision()
        return nil, ErrEntryNotFound
    }
    return entry, nil
}

func (s *segment) get(key string, hashKey uint64) ([]byte, error) {currentTimestamp := s.clock.TimeStamp()
    entry, err := s.getWarpEntry(key, hashKey)
    if err != nil{return nil, err}
    res := readEntry(entry)

    expireAt := int64(readExpireAtFromEntry(entry))
    if currentTimestamp - expireAt >= 0{_ = s.entries.Remove(int(s.hashmap[hashKey]))
        delete(s.hashmap, hashKey)
        return nil, ErrEntryNotFound
    }
    s.stats.hit(key)

    return res, nil
}

第七步:来个测试用例体验一下

先来个简略的测试用例测试一下:

func (h *cacheTestSuite) TestSetAndGet() {cache, err := NewCache()
    assert.Equal(h.T(), nil, err)
    key := "asong"
    value := []byte("公众号:Golang 梦工厂")

    err = cache.Set(key, value)
    assert.Equal(h.T(), nil, err)

    res, err := cache.Get(key)
    assert.Equal(h.T(), nil, err)
    assert.Equal(h.T(), value, res)
    h.T().Logf("get value is %s", string(res))
}

运行后果:

=== RUN   TestCacheTestSuite
=== RUN   TestCacheTestSuite/TestSetAndGet
    cache_test.go:33: get value is 公众号:Golang 梦工厂
--- PASS: TestCacheTestSuite (0.00s)
    --- PASS: TestCacheTestSuite/TestSetAndGet (0.00s)
PASS

功败垂成,基本功能通了,剩下就是跑基准测试、优化、迭代了(不在文章赘述了,能够关注 github 仓库最新动静)。

参考文章

  • https://github.com/allegro/bi…
  • https://github.com/VictoriaMe…
  • https://github.com/coocood/fr…
  • https://github.com/patrickmn/…

总结

实现篇到这里就完结了,然而这个我的项目的编码仍未完结,我会持续以此版本为根底一直迭代优化,该本地缓存的长处:

  • 实现简略、提供给用户的办法简略易懂
  • 应用二维切片作为存储构造,防止了不能删除底层数据的毛病,也在肯定水平上防止了 ” 虫洞 ” 问题。
  • 测试用例齐全,适宜作为小白的入门我的项目

待优化点:

  • 没有应用高效的缓存淘汰算法,可能会导致热点数据被频繁删除
  • 定时删除过期数据会导致锁持有工夫过长,须要优化
  • 敞开缓存实例须要优化解决形式
  • 依据业务场景进行优化(特定业务场景)

迭代点:

  • 增加异步加载缓存性能
  • …… (思考中)

本文代码曾经上传到 github:https://github.com/asong2020/go-localcache

好啦,本文到这里就完结了,我是asong,咱们下期见。

欢送关注公众号:【Golang 梦工厂】

正文完
 0