序
本文次要钻研一下 storagetapper 的 cache
cache
storagetapper/pipe/cache.go
type cacheEntry struct {
pipe Pipe
cfg config.PipeConfig
}
var cache map[string]cacheEntry
var lock sync.Mutex
cache 是一个 cacheEntry 的 map,cacheEntry 定义了 Pipe 和 config.PipeConfig
CacheGet
storagetapper/pipe/cache.go
// CacheGet returns an instance of pipe with specified config from cache or
// creates new one if it's not in the cache yet
func CacheGet(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {lock.Lock()
defer lock.Unlock()
if cache == nil {cache = make(map[string]cacheEntry)
}
b, err := json.Marshal(cfg)
if err != nil {return nil, err}
h := sha256.New()
_, _ = h.Write([]byte(pipeType + "$$$" + fmt.Sprintf("%p", db) + "$$$"))
_, _ = h.Write(b)
hs := fmt.Sprintf("%0x", h.Sum(nil))
p, ok := cache[hs]
if ok && reflect.DeepEqual(cfg, &p.cfg) {return p.pipe, nil}
//FIXME: Implement proper collisions handling
np, err := Create(pipeType, cfg, db)
if err != nil {return nil, err}
cache[hs] = cacheEntry{np, *cfg}
log.Debugf("Created and cached new'%v'pipe (hash %v) with config: %+v. Cache size %v", pipeType, hs, *cfg, len(cache))
return np, nil
}
CacheGet 办法加锁操作 cache,首先通过 sha256 来对 pipeType 及 db 来作为 cache 的 key,而后取出 cacheEntry,若存在则判断 cfg 与 cacheEntry 的 cfg 是否一样,如果一样则返回 cacheEntry 的 pipe;否则通过 Create 创立 Pipe,而后放入 cache 中
CacheDestroy
storagetapper/pipe/cache.go
// CacheDestroy releases all resources associated with cached pipes
func CacheDestroy() {lock.Lock()
defer lock.Unlock()
for h, p := range cache {log.Debugf("Closing %v pipe (hash %v) with config %+v", p.pipe.Type(), h, p.cfg)
log.E(p.pipe.Close())
}
cache = nil
}
CacheDestroy 办法通过加锁遍历 cache,挨个执行 pipe.Close()
小结
storagetapper 的 cache 是一个 cacheEntry 的 map,cacheEntry 定义了 Pipe 和 config.PipeConfig;CacheGet 办法会获取 cache,获取不到则 Create;CacheDestroy 则会通过加锁遍历 cache,挨个执行 pipe.Close()。
doc
- storagetapper