本文次要钻研一下cortex的kv.Client

kv.Client

github.com/cortexproject/cortex/pkg/ring/kv/client.go

// Client is a high-level client for key-value stores (such as Etcd and// Consul) that exposes operations such as CAS and Watch which take callbacks.// It also deals with serialisation by using a Codec and having a instance of// the the desired type passed in to methods ala json.Unmarshal.type Client interface {    // List returns a list of keys under the given prefix. Returned keys will    // include the prefix.    List(ctx context.Context, prefix string) ([]string, error)    // Get a specific key.  Will use a codec to deserialise key to appropriate type.    // If the key does not exist, Get will return nil and no error.    Get(ctx context.Context, key string) (interface{}, error)    // Delete a specific key. Deletions are best-effort and no error will    // be returned if the key does not exist.    Delete(ctx context.Context, key string) error    // CAS stands for Compare-And-Swap.  Will call provided callback f with the    // current value of the key and allow callback to return a different value.    // Will then attempt to atomically swap the current value for the new value.    // If that doesn't succeed will try again - callback will be called again    // with new value etc.  Guarantees that only a single concurrent CAS    // succeeds.  Callback can return nil to indicate it is happy with existing    // value.    CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error    // WatchKey calls f whenever the value stored under key changes.    WatchKey(ctx context.Context, key string, f func(interface{}) bool)    // WatchPrefix calls f whenever any value stored under prefix changes.    WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)}
kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix办法

Client

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// Client implements kv.Client interface, by using memberlist.KVtype Client struct {    kv    *KV // reference to singleton memberlist-based KV    codec codec.Codec}// List is part of kv.Client interface.func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {    err := c.awaitKVRunningOrStopping(ctx)    if err != nil {        return nil, err    }    return c.kv.List(prefix), nil}// Get is part of kv.Client interface.func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {    err := c.awaitKVRunningOrStopping(ctx)    if err != nil {        return nil, err    }    return c.kv.Get(key, c.codec)}// Delete is part of kv.Client interface.func (c *Client) Delete(ctx context.Context, key string) error {    return errors.New("memberlist does not support Delete")}// CAS is part of kv.Client interfacefunc (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {    err := c.awaitKVRunningOrStopping(ctx)    if err != nil {        return err    }    return c.kv.CAS(ctx, key, c.codec, f)}// WatchKey is part of kv.Client interface.func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {    err := c.awaitKVRunningOrStopping(ctx)    if err != nil {        return    }    c.kv.WatchKey(ctx, key, c.codec, f)}// WatchPrefix calls f whenever any value stored under prefix changes.// Part of kv.Client interface.func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {    err := c.awaitKVRunningOrStopping(ctx)    if err != nil {        return    }    c.kv.WatchPrefix(ctx, prefix, c.codec, f)}
Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix办法均代理给kv,其Delete办法返回error

KV.List

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// List returns all known keys under a given prefix.// No communication with other nodes in the cluster is done here.func (m *KV) List(prefix string) []string {    m.storeMu.Lock()    defer m.storeMu.Unlock()    var keys []string    for k := range m.store {        if strings.HasPrefix(k, prefix) {            keys = append(keys, k)        }    }    return keys}
KV.List办法遍历m.store,查找是否有指定prefix的key

KV.Get

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// Get returns current value associated with given key.// No communication with other nodes in the cluster is done here.func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {    val, _, err := m.get(key, codec)    return val, err}// Returns current value with removed tombstones.func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {    m.storeMu.Lock()    v := m.store[key]    m.storeMu.Unlock()    out = nil    if v.value != nil {        out, err = codec.Decode(v.value)        if err != nil {            return nil, 0, err        }        if mr, ok := out.(Mergeable); ok {            // remove ALL tombstones before returning to client.            // No need for clients to see them.            mr.RemoveTombstones(time.Time{})        }    }    return out, v.version, nil}
KV.Get办法次要是从m.store[key]获取数据

KV.CAS

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error {    var lastError error = nilouter:    for retries := m.maxCasRetries; retries > 0; retries-- {        m.casAttempts.Inc()        if lastError == errNoChangeDetected {            // We only get here, if 'f' reports some change, but Merge function reports no change. This can happen            // with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution.            // By waiting for one second, we hope that Merge will be able to detect change from 'f' function.            select {            case <-time.After(noChangeDetectedRetrySleep):                // ok            case <-ctx.Done():                lastError = ctx.Err()                break outer            }        }        change, newver, retry, err := m.trySingleCas(key, codec, f)        if err != nil {            level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry)            lastError = err            if !retry {                break            }            continue        }        if change != nil {            m.casSuccesses.Inc()            m.notifyWatchers(key)            if m.State() == services.Running {                m.broadcastNewValue(key, change, newver, codec)            } else {                level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)            }        }        return nil    }    if lastError == errVersionMismatch {        // this is more likely error than version mismatch.        lastError = errTooManyRetries    }    m.casFailures.Inc()    return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError)}
KV.CAS通过for循环m.maxCasRetries执行m.trySingleCas操作

KV.WatchKey

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) {    // keep one extra notification, to avoid missing notification if we're busy running the function    w := make(chan string, 1)    // register watcher    m.watchersMu.Lock()    m.watchers[key] = append(m.watchers[key], w)    m.watchersMu.Unlock()    defer func() {        // unregister watcher on exit        m.watchersMu.Lock()        defer m.watchersMu.Unlock()        removeWatcherChannel(key, w, m.watchers)    }()    for {        select {        case <-w:            // value changed            val, _, err := m.get(key, codec)            if err != nil {                level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)                continue            }            if !f(val) {                return            }        case <-m.shutdown:            // stop watching on shutdown            return        case <-ctx.Done():            return        }    }}
KV.WatchKey办法会往m.watchers[key]追加channel,而后for循环select期待channel的写入

KV.WatchPrefix

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool) {    // we use bigger buffer here, since keys are interesting and we don't want to lose them.    w := make(chan string, 16)    // register watcher    m.watchersMu.Lock()    m.prefixWatchers[prefix] = append(m.prefixWatchers[prefix], w)    m.watchersMu.Unlock()    defer func() {        // unregister watcher on exit        m.watchersMu.Lock()        defer m.watchersMu.Unlock()        removeWatcherChannel(prefix, w, m.prefixWatchers)    }()    for {        select {        case key := <-w:            val, _, err := m.get(key, codec)            if err != nil {                level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)                continue            }            if !f(key, val) {                return            }        case <-m.shutdown:            // stop watching on shutdown            return        case <-ctx.Done():            return        }    }}
KV.WatchPrefix办法与WatchKey类型,不过它channel的长度为16,追加到的是m.prefixWatchers[prefix]

小结

cortex的kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix办法;Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix办法均代理给kv,其Delete办法返回error。

doc

  • cortex