关于golang:聊聊cortex的kvClient

43次阅读

共计 7375 个字符,预计需要花费 19 分钟才能阅读完成。

本文次要钻研一下 cortex 的 kv.Client

kv.Client

github.com/cortexproject/cortex/pkg/ring/kv/client.go

// Client is a high-level client for key-value stores (such as Etcd and
// Consul) that exposes operations such as CAS and Watch which take callbacks.
// It also deals with serialisation by using a Codec and having a instance of
// the the desired type passed in to methods ala json.Unmarshal.
type Client interface {
    // List returns a list of keys under the given prefix. Returned keys will
    // include the prefix.
    List(ctx context.Context, prefix string) ([]string, error)

    // Get a specific key.  Will use a codec to deserialise key to appropriate type.
    // If the key does not exist, Get will return nil and no error.
    Get(ctx context.Context, key string) (interface{}, error)

    // Delete a specific key. Deletions are best-effort and no error will
    // be returned if the key does not exist.
    Delete(ctx context.Context, key string) error

    // CAS stands for Compare-And-Swap.  Will call provided callback f with the
    // current value of the key and allow callback to return a different value.
    // Will then attempt to atomically swap the current value for the new value.
    // If that doesn't succeed will try again - callback will be called again
    // with new value etc.  Guarantees that only a single concurrent CAS
    // succeeds.  Callback can return nil to indicate it is happy with existing
    // value.
    CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error

    // WatchKey calls f whenever the value stored under key changes.
    WatchKey(ctx context.Context, key string, f func(interface{}) bool)

    // WatchPrefix calls f whenever any value stored under prefix changes.
    WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
}

kv.Client 接口定义了 List、Get、Delete、CAS、WatchKey、WatchPrefix 办法

Client

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// Client implements kv.Client interface, by using memberlist.KV
type Client struct {
    kv    *KV // reference to singleton memberlist-based KV
    codec codec.Codec
}

// List is part of kv.Client interface.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {return nil, err}

    return c.kv.List(prefix), nil
}

// Get is part of kv.Client interface.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {return nil, err}

    return c.kv.Get(key, c.codec)
}

// Delete is part of kv.Client interface.
func (c *Client) Delete(ctx context.Context, key string) error {return errors.New("memberlist does not support Delete")
}

// CAS is part of kv.Client interface
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {return err}

    return c.kv.CAS(ctx, key, c.codec, f)
}

// WatchKey is part of kv.Client interface.
func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {return}

    c.kv.WatchKey(ctx, key, c.codec, f)
}

// WatchPrefix calls f whenever any value stored under prefix changes.
// Part of kv.Client interface.
func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {return}

    c.kv.WatchPrefix(ctx, prefix, c.codec, f)
}

Client 实现了 kv.Client 接口,其 List、Get、CAS、WatchKey、WatchPrefix 办法均代理给 kv,其 Delete 办法返回 error

KV.List

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// List returns all known keys under a given prefix.
// No communication with other nodes in the cluster is done here.
func (m *KV) List(prefix string) []string {m.storeMu.Lock()
    defer m.storeMu.Unlock()

    var keys []string
    for k := range m.store {if strings.HasPrefix(k, prefix) {keys = append(keys, k)
        }
    }
    return keys
}

KV.List 办法遍历 m.store,查找是否有指定 prefix 的 key

KV.Get

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// Get returns current value associated with given key.
// No communication with other nodes in the cluster is done here.
func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {val, _, err := m.get(key, codec)
    return val, err
}

// Returns current value with removed tombstones.
func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {m.storeMu.Lock()
    v := m.store[key]
    m.storeMu.Unlock()

    out = nil
    if v.value != nil {out, err = codec.Decode(v.value)
        if err != nil {return nil, 0, err}

        if mr, ok := out.(Mergeable); ok {
            // remove ALL tombstones before returning to client.
            // No need for clients to see them.
            mr.RemoveTombstones(time.Time{})
        }
    }

    return out, v.version, nil
}

KV.Get 办法次要是从 m.store[key] 获取数据

KV.CAS

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error {
    var lastError error = nil

outer:
    for retries := m.maxCasRetries; retries > 0; retries-- {m.casAttempts.Inc()

        if lastError == errNoChangeDetected {
            // We only get here, if 'f' reports some change, but Merge function reports no change. This can happen
            // with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution.
            // By waiting for one second, we hope that Merge will be able to detect change from 'f' function.

            select {case <-time.After(noChangeDetectedRetrySleep):
                // ok
            case <-ctx.Done():
                lastError = ctx.Err()
                break outer
            }
        }

        change, newver, retry, err := m.trySingleCas(key, codec, f)
        if err != nil {level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry)

            lastError = err
            if !retry {break}
            continue
        }

        if change != nil {m.casSuccesses.Inc()
            m.notifyWatchers(key)

            if m.State() == services.Running {m.broadcastNewValue(key, change, newver, codec)
            } else {level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
            }
        }

        return nil
    }

    if lastError == errVersionMismatch {
        // this is more likely error than version mismatch.
        lastError = errTooManyRetries
    }

    m.casFailures.Inc()
    return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError)
}

KV.CAS 通过 for 循环 m.maxCasRetries 执行 m.trySingleCas 操作

KV.WatchKey

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) {
    // keep one extra notification, to avoid missing notification if we're busy running the function
    w := make(chan string, 1)

    // register watcher
    m.watchersMu.Lock()
    m.watchers[key] = append(m.watchers[key], w)
    m.watchersMu.Unlock()

    defer func() {
        // unregister watcher on exit
        m.watchersMu.Lock()
        defer m.watchersMu.Unlock()

        removeWatcherChannel(key, w, m.watchers)
    }()

    for {
        select {
        case <-w:
            // value changed
            val, _, err := m.get(key, codec)
            if err != nil {level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
                continue
            }

            if !f(val) {return}

        case <-m.shutdown:
            // stop watching on shutdown
            return

        case <-ctx.Done():
            return
        }
    }
}

KV.WatchKey 办法会往 m.watchers[key] 追加 channel,而后 for 循环 select 期待 channel 的写入

KV.WatchPrefix

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool) {
    // we use bigger buffer here, since keys are interesting and we don't want to lose them.
    w := make(chan string, 16)

    // register watcher
    m.watchersMu.Lock()
    m.prefixWatchers[prefix] = append(m.prefixWatchers[prefix], w)
    m.watchersMu.Unlock()

    defer func() {
        // unregister watcher on exit
        m.watchersMu.Lock()
        defer m.watchersMu.Unlock()

        removeWatcherChannel(prefix, w, m.prefixWatchers)
    }()

    for {
        select {
        case key := <-w:
            val, _, err := m.get(key, codec)
            if err != nil {level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
                continue
            }

            if !f(key, val) {return}

        case <-m.shutdown:
            // stop watching on shutdown
            return

        case <-ctx.Done():
            return
        }
    }
}

KV.WatchPrefix 办法与 WatchKey 类型,不过它 channel 的长度为 16,追加到的是 m.prefixWatchers[prefix]

小结

cortex 的 kv.Client 接口定义了 List、Get、Delete、CAS、WatchKey、WatchPrefix 办法;Client 实现了 kv.Client 接口,其 List、Get、CAS、WatchKey、WatchPrefix 办法均代理给 kv,其 Delete 办法返回 error。

doc

  • cortex

正文完
 0