乐趣区

golang-etcd简明教程

etcd 是一个高可用强一致性的键值仓库在很多分布式系统架构中得到了广泛的应用,本教程结合一些简单的例子介绍 golang 版本的 etcd/clientv3 中提供的主要功能及其使用方法。

如果还不熟悉 etcd 推荐先阅读:

看图轻松了解 etcd

etcd 常用操作介绍

Let’s get started now!

安装 package

我们使用 v3 版本的 etcd client,首先通过 go get 下载并编译安装etcd clinet v3

go get github.com/coreos/etcd/clientv3

该命令会将包下载到 $GOPATH/src/github.com/coreos/etcd/clientv3 中,所有相关依赖包会自动下载编译,包括 protobufgrpc 等。

官方文档地址:https://godoc.org/github.com/…

文档中列出了 Go 官方实现的 etcd client 中支持的所有方法,方法还是很多的,我们主要梳理一下使用 etcd 时经常用到的主要 API 并进行演示。

连接客户端

用程序访问 etcd 首先要创建 client,它需要传入一个 Config 配置,这里传了 2 个选项:

  • Endpoints:etcd 的多个节点服务地址。
  • DialTimeout:创建 client 的首次连接超时时间,这里传了 5 秒,如果 5 秒都没有连接成功就会返回 err;一旦 client 创建成功,我们就不用再关心后续底层连接的状态了,client 内部会重连。
cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"localhost:2379"},
   // Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}
   DialTimeout: 5 * time.Second,
})

返回的client,它的类型具体如下:

type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance
    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password string
    // contains filtered or unexported fields
}

类型中的成员是 etcd 客户端几何核心功能模块的具体实现,它们分别用于:

  • Cluster:向集群里增加 etcd 服务端节点之类,属于管理员操作。
  • KV:我们主要使用的功能,即 K - V 键值库的操作。
  • Lease:租约相关操作,比如申请一个 TTL=10 秒的租约(应用给 key 可以实现键值的自动过期)。
  • Watcher:观察订阅,从而监听最新的数据变化。
  • Auth:管理 etcd 的用户和权限,属于管理员操作。
  • Maintenance:维护 etcd,比如主动迁移 etcd 的 leader 节点,属于管理员操作。

我们需要使用什么功能,就去 client 里获取对应的成员即可。

Client.KV是一个interface`,提供了关于 K - V 操作的所有方法:

type KV interface {Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

    // Delete deletes a key, or optionally using WithRange(end), [key, end).
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

    // Compact compacts etcd KV history before the given rev.
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

    Do(ctx context.Context, op Op) (OpResponse, error)

    // Txn creates a transaction.
    Txn(ctx context.Context) Txn
}

我们通过方法 clientv3.NewKV() 来获得 KV 接口的实现(实现中内置了错误重试机制):

kv := clientv3.NewKV(cli)

接下来,我们将通过 kv 操作 etcd 中的数据。

Put

putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")

第一个参数是 goroutine 的上下文Context。后面两个参数分别是 key 和 value,对于 etcd 来说,key=/test/key1 只是一个字符串而已,但是对我们而言却可以模拟出目录层级关系。

Put 函数的声明如下:

// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

除了上面例子中的三个的参数,还支持一个变长参数,可以传递一些控制项来影响 Put 的行为,例如可以携带一个 lease ID 来支持 key 过期。

Put 操作返回的是 PutResponse,不同的 KV 操作对应不同的 response 结构,所有 KV 操作返回的 response 结构如下:

type (
   CompactResponse pb.CompactionResponse
   PutResponse     pb.PutResponse
   GetResponse     pb.RangeResponse
   DeleteResponse  pb.DeleteRangeResponse
   TxnResponse     pb.TxnResponse
)

程序代码里导入 clientv3 后在 GoLand 中可以很快定位到 PutResponse 的定义文件中,PutResponse 只是 pb.PutResponse 的类型别名,通过 Goland 跳转过去后可以看到 PutResponse 的详细定义。

type PutResponse struct {
   Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
   // if prev_kv is set in the request, the previous key-value pair will be returned.
   PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}

Header 里保存的主要是本次更新的 revision 信息,而 PrevKv 可以返回 Put 覆盖之前的 value 是什么(目前是 nil,后面会说原因),把返回的 PutResponse 打印出来看一下:

fmt.Printf("PutResponse: %v, err: %v", putResp, err)

// output
// PutResponse: &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:3 raft_term:7  <nil>}, err: <nil>%

我们需要判断 err 来确定操作是否成功。

我们再 Put 其他 2 个 key,用于后续演示:

kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testspam", "spam")

现在 /test 目录下有两个键: key1 和 key2,而 /testspam 并不归属于 /test 目录

Get

使用 KV 的 Get 方法来读取给定键的值:

getResp, err := kv.Get(context.TODO(), "/test/key1")

其函数声明如下:

// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

和 Put 类似,函数注释里提示我们可以传递一些控制参数来影响 Get 的行为,比如:WithFromKey 表示读取从参数 key 开始递增的所有 key,而不是读取单个 key。

在上面的例子中,我没有传递 opOption,所以就是获取 key=/test/key1 的最新版本数据。

这里 err 并不能反馈出 key 是否存在(只能反馈出本次操作因为各种原因异常了),我们需要通过 GetResponse(实际上是 pb.RangeResponse)判断 key 是否存在:

type RangeResponse struct {
    Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
    // kvs is the list of key-value pairs matched by the range request.
    // kvs is empty when count is requested.
    Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
    // more indicates if there are more keys to return in the requested range.
    More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
    // count is set to the number of keys within the range when requested.
    Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}

Kvs 字段,保存了本次 Get 查询到的所有 k - v 对,因为上述例子只 Get 了一个单 key,所以只需要判断一下 len(Kvs)是否等于 1 即可知道 key 是否存在。

RangeResponse.MoreCount,当我们使用withLimit() 等选项进行 Get 时会发挥作用,相当于翻页查询。

接下来,我们通过给 Get 查询增加 WithPrefix 选项,获取 /test 目录下的所有子元素:

rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())

WithPrefix()是指查找以 /test/ 为前缀的所有 key,因此可以模拟出查找子目录的效果。

etcd是一个有序的 k - v 存储,因此 /test/ 为前缀的 key 总是顺序排列在一起。

withPrefix()实际上会转化为范围查询,它根据前缀 /test/ 生成了一个前闭后开的 key range:[“/test/”,“/test0”),为什么呢?因为比 / 大的字符是 0,所以以/test0 作为范围的末尾,就可以扫描到所有以 /test/ 为前缀的 key 了。

在之前,我们 Put 了一个 /testspam 键值,因为不符合 /test/ 前缀(注意末尾的 /),所以就不会被这次 Get 获取到。但是,如果查询的前缀是 /test,那么/testspam 就会被返回,使用时一定要特别注意。

打印 rangeResp.Kvs 可以看到获得了两个键值:

[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!"  key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!"]

Lease

etcd 客户端的 Lease 对象可以通过以下的代码获取到

lease := clientv3.NewLease(cli)

lease 对象是 Lease 接口的实现,Lease 接口的声明如下:

type Lease interface {
    // Grant 创建一个新租约
    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

    // Revoke 销毁给定租约 ID 的租约
    Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

    // TimeToLive retrieves the lease information of the given lease ID.
    TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

    // Leases retrieves all leases.
    Leases(ctx context.Context) (*LeaseLeasesResponse, error)

    // KeepAlive keeps the given lease alive forever.
    KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

    // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
    // should be used instead of KeepAliveOnce.
    KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

    // Close releases all resources Lease keeps for efficient communication
    // with the etcd server.
    Close() error}

Lease 提供了以下功能:

  • Grant:分配一个租约。
  • Revoke:释放一个租约。
  • TimeToLive:获取剩余 TTL 时间。
  • Leases:列举所有 etcd 中的租约。
  • KeepAlive:自动定时的续约某个租约。
  • KeepAliveOnce:为某个租约续约一次。
  • Close:释放当前客户端建立的所有租约。

要想实现 key 自动过期,首先得创建一个租约,下面的代码创建一个 TTL 为 10 秒的租约:

grantResp, err := lease.Grant(context.TODO(), 10)

返回的 grantResponse 的结构体声明如下:

// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
    *pb.ResponseHeader
    ID    LeaseID
    TTL   int64
    Error string
}

在应用程序代码中主要使用到的是租约 ID。

接下来我们用这个 Lease 往 etcd 中存储一个 10 秒过期的 key:

kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))

这里特别需要注意,有一种情况是在 Put 之前 Lease 已经过期了,那么这个 Put 操作会返回 error,此时你需要重新分配 Lease。

当我们实现服务注册时,需要主动给 Lease 进行续约,通常是以小于 TTL 的间隔循环调用 Lease 的 KeepAliveOnce()方法对租约进行续期,一旦某个服务节点出错无法完成租约的续期,等 key 过期后客户端即无法在查询服务时获得对应节点的服务,这样就通过租约到期实现了服务的错误隔离。

keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)

或者使用 KeepAlive() 方法,其会返回 <-chan *LeaseKeepAliveResponse 只读通道,每次自动续租成功后会向通道中发送信号。一般都用 KeepAlive() 方法

KeepAlive 和 Put 一样,如果在执行之前 Lease 就已经过期了,那么需要重新分配 Lease。etcd 并没有提供 API 来实现原子的 Put with Lease,需要我们自己判断 err 重新分配 Lease。

Op

Op 字面意思就是”操作”,Get 和 Put 都属于 Op,只是为了简化用户开发而开放的特殊 API。

KV 对象有一个 Do 方法接受一个 Op:

// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)

其参数 Op 是一个抽象的操作,可以是 Put/Get/Delete…;而 OpResponse 是一个抽象的结果,可以是 PutResponse/GetResponse…

可以通过 Client 中定义的一些方法来创建 Op:

  • func OpDelete(key string, opts …OpOption) Op
  • func OpGet(key string, opts …OpOption) Op
  • func OpPut(key, val string, opts …OpOption) Op
  • func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op

其实和直接调用 KV.Put,KV.GET 没什么区别。

下面是一个例子:

cli, err := clientv3.New(clientv3.Config{
    Endpoints:   endpoints,
    DialTimeout: dialTimeout,
})
if err != nil {log.Fatal(err)
}
defer cli.Close()

ops := []clientv3.Op{clientv3.OpPut("put-key", "123"),
    clientv3.OpGet("put-key"),
    clientv3.OpPut("put-key", "456")}

for _, op := range ops {if _, err := cli.Do(context.TODO(), op); err != nil {log.Fatal(err)
    }
}

把 Op 交给 Do 方法执行,返回的 opResp 结构如下:

type OpResponse struct {
    put *PutResponse
    get *GetResponse
    del *DeleteResponse
    txn *TxnResponse
}

你的操作是什么类型,你就用哪个指针来访问对应的结果。

Txn 事务

etcd 中事务是原子执行的,只支持 if … then … else …这种表达。首先来看一下 Txn 中定义的方法:

type Txn interface {
    // If takes a list of comparison. If all comparisons passed in succeed,
    // the operations passed into Then() will be executed. Or the operations
    // passed into Else() will be executed.
    If(cs ...Cmp) Txn

    // Then takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() succeed.
    Then(ops ...Op) Txn

    // Else takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() fail.
    Else(ops ...Op) Txn

    // Commit tries to commit the transaction.
    Commit() (*TxnResponse, error)
}

Txn 必须是这样使用的:If(满足条件) Then(执行若干 Op) Else(执行若干 Op)。

If 中支持传入多个 Cmp 比较条件,如果所有条件满足,则执行 Then 中的 Op(上一节介绍过 Op),否则执行 Else 中的 Op。

首先,我们需要开启一个事务,这是通过 KV 对象的方法实现的:

txn := kv.Txn(context.TODO())

下面的测试程序,判断如果 k1 的值大于 v1 并且 k1 的版本号是 2,则 Put 键值 k2 和 k3,否则 Put 键值 k4 和 k5。

kv.Txn(context.TODO()).If(clientv3.Compare(clientv3.Value(k1), ">", v1),
 clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()

类似于 clientv3.Value()用于指定 key 属性的,有这么几个方法:

  • func CreateRevision(key string) Cmp:key=xxx 的创建版本必须满足…
  • func LeaseValue(key string) Cmp:key=xxx 的 Lease ID 必须满足…
  • func ModRevision(key string) Cmp:key=xxx 的最后修改版本必须满足…
  • func Value(key string) Cmp:key=xxx 的创建值必须满足…
  • func Version(key string) Cmp:key=xxx 的累计更新次数必须满足…

Watch

Watch 用于监听某个键的变化, Watch调用后返回一个WatchChan,它的类型声明如下:

type WatchChan <-chan WatchResponse

type WatchResponse struct {
    Header pb.ResponseHeader
    Events []*Event

    CompactRevision int64

    Canceled bool

    Created bool
}

当监听的 key 有变化后会向 WatchChan 发送WatchResponse。Watch 的典型应用场景是应用于系统配置的热加载,我们可以在系统读取到存储在 etcd key 中的配置后,用 Watch 监听 key 的变化。在单独的 goroutine 中接收 WatchChan 发送过来的数据,并将更新应用到系统设置的配置变量中,比如像下面这样在 goroutine 中更新变量 appConfig,这样系统就实现了配置变量的热加载。

type AppConfig struct {
  config1 string
  config2 string
}

var appConfig Appconfig

func watchConfig(clt *clientv3.Client, key string, ss interface{}) {watchCh := clt.Watch(context.TODO(), key)
    go func() {
        for res := range watchCh {value := res.Events[0].Kv.Value
            if err := json.Unmarshal(value, ss); err != nil {fmt.Println("now", time.Now(), "watchConfig err", err)
                continue
            }
            fmt.Println("now", time.Now(), "watchConfig", ss)
        }
    }()}

watchConfig(client, "config_key", &appConfig)

golang etcd clientv3 的主要功能就是这些,希望能帮大家梳理出学习脉络,这样工作中应用到 etcd 时再看官方文档就会容易很多。

退出移动版