本文次要钻研一下storagetapper的cache

cache

storagetapper/pipe/cache.go

type cacheEntry struct {    pipe Pipe    cfg  config.PipeConfig}var cache map[string]cacheEntryvar lock sync.Mutex
cache是一个cacheEntry的map,cacheEntry定义了Pipe和config.PipeConfig

CacheGet

storagetapper/pipe/cache.go

// CacheGet returns an instance of pipe with specified config from cache or// creates new one if it's not in the cache yetfunc CacheGet(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {    lock.Lock()    defer lock.Unlock()    if cache == nil {        cache = make(map[string]cacheEntry)    }    b, err := json.Marshal(cfg)    if err != nil {        return nil, err    }    h := sha256.New()    _, _ = h.Write([]byte(pipeType + "$$$" + fmt.Sprintf("%p", db) + "$$$"))    _, _ = h.Write(b)    hs := fmt.Sprintf("%0x", h.Sum(nil))    p, ok := cache[hs]    if ok && reflect.DeepEqual(cfg, &p.cfg) {        return p.pipe, nil    }    //FIXME: Implement proper collisions handling    np, err := Create(pipeType, cfg, db)    if err != nil {        return nil, err    }    cache[hs] = cacheEntry{np, *cfg}    log.Debugf("Created and cached new '%v' pipe (hash %v) with config: %+v. Cache size %v", pipeType, hs, *cfg, len(cache))    return np, nil}
CacheGet办法加锁操作cache,首先通过sha256来对pipeType及db来作为cache的key,而后取出cacheEntry,若存在则判断cfg与cacheEntry的cfg是否一样,如果一样则返回cacheEntry的pipe;否则通过Create创立Pipe,而后放入cache中

CacheDestroy

storagetapper/pipe/cache.go

// CacheDestroy releases all resources associated with cached pipesfunc CacheDestroy() {    lock.Lock()    defer lock.Unlock()    for h, p := range cache {        log.Debugf("Closing %v pipe (hash %v) with config %+v", p.pipe.Type(), h, p.cfg)        log.E(p.pipe.Close())    }    cache = nil}
CacheDestroy办法通过加锁遍历cache,挨个执行pipe.Close()

小结

storagetapper的cache是一个cacheEntry的map,cacheEntry定义了Pipe和config.PipeConfig;CacheGet办法会获取cache,获取不到则Create;CacheDestroy则会通过加锁遍历cache,挨个执行pipe.Close()。

doc

  • storagetapper