乐趣区

关于golang:docker-配置本地-etcd-集群并使用-clientapiv3-管理集群

一、用 docker 搭建集群

etcd 没有在 docker hub 中创立 image,所以天然拉取不到。

本文意在模仿应用步骤,所以创立三个 go 环境的容器,在每个容器中配置 etcd。

1 创立 go 容器

hub 中有 golang 镜像,能够间接拉取:

docker pull golang

拉取到的镜像是基于 debian buster 制作。

创立三个容器:

docker run -itd --name etcd1 golang
docker run -itd --name etcd2 golang
docker run -itd --name etcd3 golang

2 在每个镜像中 clone etcd

docker exec -it etcd1 bash
docker exec -it etcd2 bash
docker exec -it etcd3 bash

在每个容器中克隆:

git clone https://github.com/etcd-io/etcd.git

3 编绎 etcd

在每个容器中都须要执行上面的命令。

cd etcd
./build.sh

编绎脚本会拉取一些 golang 库,所以先设置好 goproxy 是十分有必要的。

go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.cn,direct

编绎实现后,会多一个 bin 目录,外面有两个可执行文件 etcdetcdctl,别离为服务端和客户端文件,搭建集群,应用的是etcd

4 写配置文件

etcd 的目录中有一个 etcd.conf.yml.sample 是示例配置文件,将其复制一份用来作待应用的配置文件:

cp etcd.conf.yml.sample etcd.conf.yml

批改配置文件,上面只列出配置文件中须要批改的局部:

# 节点别名,给人类看的
name: 'infra3'

# 数据文件寄存的目录
data-dir: /var/lib/etcd

# 用逗号分隔的 url 列表,用于与其余节点通信
listen-peer-urls: http://172.17.0.4:2380

# 用逗号分隔的 url 列表,用于与客户端通信
listen-client-urls: http://172.17.0.4:2379,http://localhost:2379

# 用逗号分隔的 url 列表,用于告诉其余节点,与通信端口雷同即可
initial-advertise-peer-urls: http://172.17.0.4:2380

# 用逗号分隔的 url 列表,用于公开告诉客户端
advertise-client-urls: http://172.17.0.4:2379

# 初始化集群配置。集群各节点别名与其 url 的键值对列表
initial-cluster: infra1=http://172.17.0.2:2380,infra2=http://172.17.0.3:2380,infra3=http://172.17.0.4:2380

# 初始化集群 token
initial-cluster-token: 'etcd-cluster-1'

5 应用配置文件运行各节点组成集群

在各个容器中别离执行:

bin/etcd --config-file etcd.conf.yml

集群应该曾经失常运行了。

二、应用 clientv3 通信

1 客户端

客户端构造体 Client 须要实现多个接口:

type Client struct {
    Cluster  // Cluster 接口
    KV  // KV 接口
    Lease  // Lease 接口
    Watcher  // Watcher 接口
    Auth  // Auth 接口
    Maintenance  // Maintenance 接口

    conn *grpc.ClientConn

    cfg      Config
    creds    grpccredentials.TransportCredentials
    resolver *resolver.EtcdManualResolver
    mu       *sync.RWMutex

    ctx    context.Context
    cancel context.CancelFunc

    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password        string
    authTokenBundle credentials.Bundle

    callOpts []grpc.CallOption

    lgMu *sync.RWMutex
    lg   *zap.Logger
}

这些接口在创立 Client 时实现:

func newClient(cfg *Config) (*Client, error) {
    ...
    client.Cluster = NewCluster(client)
    client.KV = NewKV(client)
    client.Lease = NewLease(client)
    client.Watcher = NewWatcher(client)
    client.Auth = NewAuth(client)
    client.Maintenance = NewMaintenance(client)
    ...
}

2 写入

写入操作 (put) 包含创立和更新:key 不存在会创立,key 存在则更新。

Put办法在 KV 接口中申明:

Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

第一个参数是一个上下文,而 Client 构造体内是有上下文属性的,在调用 Put 办法时能够间接应用 Client 的上下文属性,也能够定义一个新的上下文 context.TODO()context.Background()context.WithCanle()context.WithTimeout() 等,依据理论须要定义上下文。

Client的上下文属性会在 client.Close() 时勾销,而手动创立的上下文须要手动勾销。

package main

import (
    "fmt"
    "time"

    clientv3 "go.etcd.io/etcd/client/v3"
)

func main() {
    client, err := clientv3.New(clientv3.Config{Endpoints:   []string{"172.17.0.2:2379", "172.17.0.3:2379", "172.17.0.4:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {panic(err)
    }
    defer client.Close()

    ctx := client.Ctx()
    // ctx, cancel := context.WithCancel(context.Background())

    putResp, err := client.Put(ctx, "test", "test0")
    if err != nil {panic(err)
    }

    fmt.Println("response:", putResp)
    fmt.Println("created or updated -> key: test, value: test0")
    // cancel()}

后果:

response:  &{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:22 raft_term:4  <nil> {} [] 0}
created or updated -> key: test, value: test0

3 查问

与写入相似,调用 Get 办法:

Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

3.1 查问一个后果

func main() {
    ...
    ctx := client.Ctx()
  
    getResp, err := client.Get(ctx, "test")
    if err != nil {panic(err)
    }
    fmt.Println("get response:", getResp)
}

后果:

get response: &{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:2 raft_term:2  [key:"test" create_revision:2 mod_revision:2 version:1 value:"test0"] false 1 {} [] 0}

3.2 查问蕴含前缀的多个后果

应用前缀查问时,就须要用到 OpOption 参数了。

前缀查问的办法为:

func WithPrefix() OpOption {return func(op *Op) {if len(op.key) == 0 {op.key, op.end = []byte{0}, []byte{0}
            return
        }
        op.end = getPrefix(op.key)
    }
}

向 etcd 中增加 test1、test2、test3 后,应用前缀查问:

    getResp, err := client.Get(ctx, "test", clientv3.WithPrefix())
    if err != nil {panic(err)
    }

    for _, kv := range getResp.Kvs {fmt.Printf("%s = %s\n", kv.Key, string(kv.Value))
    }

后果:

test = test0
test1 = 1
test2 = 2
test3 = 3

3.3 查问指定范畴的多个后果

范畴查问的办法为:

func WithRange(endKey string) OpOption {return func(op *Op) {op.end = []byte(endKey) }
}

上面查问 test 到 test2 的后果:

    getResp, err := client.Get(ctx, "test", clientv3.WithRange("test3"))  // 不蕴含 test3
    if err != nil {panic(err)
    }

    for _, kv := range getResp.Kvs {fmt.Printf("%s = %s\n", kv.Key, string(kv.Value))
    }

后果:

test = test0
test1 = 1
test2 = 2

3.4 查问历史版本的值

这里就须要用到 put 或 get 响应中的 Revision 属性了,代表每次批改的版本号,查问时指定想要查问的版本号即可查问对应历史版本的值。

版本号是全局版本号,不是某个值的版本号,任何一个值被创立、批改或删除,版本号都会减少 1。

在 3.1 的响应中,咱们可能看到每条查问后果中都有 create_revisionmod_revision两个属性,别离对应的创立时的版本号和最初一次批改时的版本号,依据这两个版本号能够进行历史版本的查问:

    getResp, err := client.Get(ctx, "test", clientv3.WithPrefix(), clientv3.WithRev(11))
    if err != nil {panic(err)
    }

    for _, kv := range getResp.Kvs {fmt.Println(kv)
    }

须要留神的是,在响应中有一个属性revision,代表以后最新版本号,要查问的版本号不能大于此值。

本例中,revision=12,所以咱们查问 11 号的值,后果为:

key:"test" create_revision:2 mod_revision:11 version:2 value:"00" 
key:"test1" create_revision:3 mod_revision:10 version:2 value:"10" 
key:"test2" create_revision:4 mod_revision:9 version:2 value:"20" 
key:"test3" create_revision:5 mod_revision:8 version:4 value:"30"

能够看到,查问到的后果最新版本号小于要查问的版本号时,返回的是最新版本的值。

3.5 查问大于等于某个键的值的所有后果

比方查问 key 大于等于 test2 的值:

    getResp, err := client.Get(ctx, "test2", clientv3.WithFromKey())
    if err != nil {panic(err)
    }

    for _, kv := range getResp.Kvs {fmt.Println(kv)
    }

后果:

key:"test2" create_revision:4 mod_revision:9 version:2 value:"20" 
key:"test3" create_revision:5 mod_revision:8 version:4 value:"30" 

看一下 WithFromKey() 办法的具体实现:

func WithFromKey() OpOption {return func(op *Op) {if len(op.key) == 0 {op.key = []byte{0}
        }
        op.end = []byte("\x00")
    }
}

浏览 Get 源码中能够晓得,在调用 Get 时,会初始化 Op 构造体:

ret := Op{t: tRange, key: []byte(key)}

将传入的 key 作为起始 key,WithFromKey()将空字符串作为完结 key(意为无限大)。

由此能够查问到所有大于起始二进制 key 的 转为二进制后的 key。

3.6 其余 options

除上述 options 外,还有以下 options 可用:

  • WithSerializable:让 Get 申请可序列化,可能升高服务端响应提早
  • WithSort:排序,须要与 WithPrefixWithRange合用,能够依据 key、version、revisions 或 value 进行排序
  • WithLimit:限度查问后果的数量
  • WithLease:为 Put 申请增加租约 id
  • WithKeysOnly:只查问符合条件的所有 key,不查问对应的 value

源码中还有很多 options,不一一列举了,能够自行浏览源码:https://github.com/etcd-io/et…

4 租约

租约的意义就是节点的只能存活租约设置的工夫。租约的存活工夫 TTL 如果到期,租约就会过期,并且所有附带的节点都会被删除。

租约不影响集群版本。

4.1 批准租约

    grantResp, err := client.Grant(ctx, 10)
    if err != nil {panic(err)
    }
    fmt.Println("新租约 ID:", grantResp.ID)

后果:

新租约 ID:4635744352028668180

4.2 破除租约

能够应用租约 id 破除租约,破除时其附带的节点也会被删除。

    revokeResp, err := client.Revoke(ctx, grantResp.ID)
    if err != nil {panic(err)
    }
    fmt.Println("租约已破除:", revokeResp)

后果:

租约已破除:&{cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2  {} [] 0}

4.3 获取租约剩余时间

租约过期时,剩余时间会是 -1。

    liveResp, err := client.TimeToLive(ctx, grantResp.ID)
    if err != nil {panic(err)
    }
    fmt.Printf("残余 %d 秒 \n", liveResp.TTL)

    time.Sleep(6 * time.Second)

    liveResp, err = client.TimeToLive(ctx, grantResp.ID)
    if err != nil {panic(err)
    }
    fmt.Printf("残余 %d 秒 \n", liveResp.TTL)

后果:

新租约 ID:2426447259829618775
残余 4 秒
残余 -1 秒

4.4 获取无效的所有租约

    leasesResp, err := client.Leases(ctx)
    if err != nil {panic(err)
    }
    fmt.Println(leasesResp.Leases)

返回的 Leases 构造体:

type LeaseStatus struct {
    ID LeaseID `json:"id"`
    // TODO: TTL int64
}

后果:

新租约 ID:4635744352028668184
[{4635744352028668184}]

4.5 主动续约

主动续约 KeepAlive,会返回一个LeaseKeepAliveResponse 的只读管道,续约胜利时后向这个管道内发送响应,咱们只须要解决接管到的响应即可:

    keepResp, err := client.KeepAlive(ctx, grantResp.ID)
    if err != nil {panic(err)
    }

    for resp := range keepResp {fmt.Println("续约胜利:", resp)
        liveResp, err := client.TimeToLive(ctx, grantResp.ID)
        if err != nil {panic(err)
        }
        fmt.Printf("残余 %d 秒 \n", liveResp.TTL)
    }

后果:

续约胜利:cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 
残余 9 秒
续约胜利:cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 
残余 9 秒
续约胜利:cluster_id:4577776708217222017 member_id:10966956327032670700 revision:43 raft_term:2 
...

4.6 续约一次

    time.Sleep(6 * time.Second)

    liveResp, err := client.TimeToLive(ctx, grantResp.ID)
    if err != nil {panic(err)
    }
    fmt.Printf("残余 %d 秒 \n", liveResp.TTL)

    keepResp, err := client.KeepAliveOnce(ctx, grantResp.ID)
    if err != nil {panic(err)
    }
    fmt.Println("续约胜利:", keepResp)

    liveResp, err = client.TimeToLive(ctx, grantResp.ID)
    if err != nil {panic(err)
    }
    fmt.Printf("残余 %d 秒 \n", liveResp.TTL)

后果:

残余 3 秒
续约胜利:cluster_id:4577776708217222017 member_id:10491173242589593685 revision:43 raft_term:2 
残余 9 秒

4.7 破除以后客户端批准的所有租约

开释所有租约的办法为:

client.Lease.Close()

但我测试时发现调用这个办法后,没有一个租约被开释,所以此条待欠缺,未能发现是我应用的问题,还是代码的问题。

5 删除

能够删除一个或符合条件的 key。

5.1 删除一个

    delResp, err := client.Delete(ctx, "test")
    if err != nil {panic(err)
    }
    fmt.Println(delResp)

没有报错即为删除胜利,但此处须要留神,删除一个不存在的键并不会报错,只是版本号不会发生变化,响应中的 Deleted 属性值为 0。

5.2 删除多个

    delResp, err := client.Delete(ctx, "test", clientv3.WithPrefix())
    if err != nil {panic(err)
    }
    fmt.Printf("删除了 %d 个值 \n", delResp.Deleted)

options 与 Get 中相似。

后果:

删除了 4 个值

6 监听

etcd 的监听是 Watch 办法。

6.1 监听一个节点

以监听 test 节点为例:

    wch := client.Watch(ctx, "test")

    for e := range wch {fmt.Println(e.Events[0])
    }

监听后,会返回一个装有 WatchResponse 的管道,咱们只须要遍历这个管道,即可监听 test 节点的所有变动。

WatchResponse构造体中存储节点变动的是一个事件的切片,监听一个节点时,外面不会有多个事件。

运行程序后,用其余程序修改 test 节点的值,监听程序会失去如下输入:

&{PUT key:"test" create_revision:24 mod_revision:27 version:4 value:"510"  <nil> {} [] 0}
&{PUT key:"test" create_revision:24 mod_revision:28 version:5 value:"10"  <nil> {} [] 0}
&{DELETE key:"test" mod_revision:29  <nil> {} [] 0}

6.2 监听多个节点

监听也能够应用 option,参考第 3 节中对于 option 的形容。

    wch := client.Watch(ctx, "test", clientv3.WithPrefix())

    for resp := range wch {
        for _, e := range resp.Events {  // 监听多个节点,可能会有多个事件,所以这里对事件切片进行遍历
            fmt.Println(e)
        }
    }

后果:

&{PUT key:"test" create_revision:37 mod_revision:38 version:2 value:"110"  <nil> {} [] 0}
&{PUT key:"test1" create_revision:34 mod_revision:39 version:2 value:"111"  <nil> {} [] 0}
&{DELETE key:"test" mod_revision:40  <nil> {} [] 0}
&{DELETE key:"test1" mod_revision:40  <nil> {} [] 0}
&{DELETE key:"test2" mod_revision:40  <nil> {} [] 0}
&{DELETE key:"test3" mod_revision:40  <nil> {} [] 0}

6.3 从节点的指定历史版本开始监听

默认的监听是监听以后版本之后的变动,但一些场景下须要从历史版本开始监听。

比方,一个程序始终在监听 etcd 的某一个节点,运行过程中,程序产生异样退出,程序退出后到重新启动前是无奈获取 etcd 的节点的所有改变的。如果就这样启动程序,程序获取到的是最新的版本,与程序退出前须要的参数并不统一,必然会引起其余问题,导致程序不能像预期一样运行。所以,程序应该在启动时,从退出前取得的最初一个版本号开始监听。

示例:

程序退出前,test节点的版本号是 38,但随着其余程序的调用更新,以后集群的版本号曾经被更新到了 40,程序重新启动时,就须要从 38 号开始监听:

wch := client.Watch(ctx, "test", clientv3.WithRev(38))

    for resp := range wch {
        for _, e := range resp.Events {fmt.Println(e)
        }
    }

历史监听启动时会立刻获取到从 38 到 40,test节点的所有改变:

&{PUT key:"test" create_revision:37 mod_revision:38 version:2 value:"110"  <nil> {} [] 0}
&{DELETE key:"test" mod_revision:40  <nil> {} [] 0}

7 压缩版本号

etcd 默认状况下会保留历史版本号以便程序能够读取历史版本,但如果不加以控制,版本号会有限叠加,历史数据也会有限保留,这样的话就会产生大量的无用的历史数据。

为了防止这个问题,etcd 提供了压缩性能:删除指定版本之前的历史版本和历史数据,被删除的所有数据将无法访问。

比方,以后版本号是 43,咱们想删除 42 之前的数据:

    comResp, err := client.Compact(ctx, 42)
    if err != nil {panic(err)
    }
    fmt.Println(comResp)

    // 获取 42 之前的版本
    getResp, err := client.Get(ctx, "test", clientv3.WithRev(40))
    if err != nil {panic(err)
    }
    fmt.Println(getResp)

后果:

&{cluster_id:4577776708217222017 member_id:10491173242589593685 revision:42 raft_term:2  {} [] 0}
{"level":"warn","ts":"2021-04-20T18:50:53.431+0800","caller":"v3@v3.5.0-alpha.0/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc000140380/#initially=[172.17.0.2:2379;172.17.0.3:2379;172.17.0.4:2379]","attempt":0,"error":"rpc error: code = OutOfRange desc = etcdserver: mvcc: required revision has been compacted"}
panic: etcdserver: mvcc: required revision has been compacted

能够看到,40 版本号曾经被压缩,无奈再拜访。

8 事务

etcd 中事务是原子性过程,只反对 If().Then().Else().Commit() 这种表白。

If中反对传入多个比拟条件 Cmp,如果条件都满足,执行Then 中的 Op,否则执行Else 中的Op,最初Commit

四舍五入示例:

如果 test 的值小于 5,将 test 的值改为 0,否则改为 10。

    txn := client.Txn(ctx)

    getResp, err := client.Get(ctx, "test")
    if err != nil {panic(err)
    }
    fmt.Println("事务前的值:", string(getResp.Kvs[0].Value))

    txnResp, err := txn.If(clientv3.Compare(clientv3.Value("test"), "<", "5")).
        Then(clientv3.OpPut("test", "0")).
        Else(clientv3.OpPut("test", "9")).
        Commit()
    if err != nil {panic(err)
    }

    if txnResp.Succeeded {fmt.Println("小于 5,舍")
    } else {fmt.Println("大于 5,入")
    }

    getResp, err = client.Get(ctx, "test")
    if err != nil {panic(err)
    }
    fmt.Println("事务提交胜利后的值:", string(getResp.Kvs[0].Value))

后果:

事务前的值:5
大于 5,入
事务提交胜利后的值:9

9 用户和角色 / Auth 身份认证

etcd 中的用户能够授予角色权限。root 用户

auth 的残缺示例(选自官网 example 文件):

unc main() {endpoints := []string{"172.17.0.2:2379", "172.17.0.3:2379", "172.17.0.4:2379"}
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {panic(err)
    }
    defer client.Close()
    
  // 增加角色 root
    _, err = client.RoleAdd(context.TODO(), "root")
    if err != nil {panic(err)
    }
    fmt.Println("已增加角色:root")
    
  // 增加用户
    _, err = client.UserAdd(context.TODO(), "root", "123")
    if err != nil {panic(err)
    }
    fmt.Println("已创立用户,用户名:root,明码:123")
    
  // 为用户授 root 授予 root 角色权限
    _, err = client.UserGrantRole(context.TODO(), "root", "root")
    if err != nil {panic(err)
    }
    fmt.Println("已为用户 root 授予 root 角色权限")
  
  // 增加角色 r
    _, err = client.RoleAdd(context.TODO(), "r")
    if err != nil {panic(err)
    }
    fmt.Println("已增加角色:r")
    
  // 为角色 r 设置权限,对 [test, test3) 有写权限
    _, err = client.RoleGrantPermission(context.TODO(),
        "r",
        "test",
        "test3",
        clientv3.PermissionType(clientv3.PermWrite),
    )
    if err != nil {panic(err)
    }
    fmt.Println("角色 r 对节点 test 到 test3 增加写权限")
    
  // 增加用户 u
    _, err = client.UserAdd(context.TODO(), "u", "123")
    if err != nil {panic(err)
    }
    fmt.Println("已创立用户,用户名:u,明码:123")
    
  // 为用户 u 授予 角色 r 的权限
    _, err = client.UserGrantRole(context.TODO(), "u", "r")
    if err != nil {panic(err)
    }
    fmt.Println("已为用户 u 授予角色 r 的权限")
    
  // 开启集权的身份认证
    _, err = client.AuthEnable(context.TODO())
    if err != nil {panic(err)
    }
    fmt.Println("已为集群开启权限认证")
    
  // 以指定用户创立客户端
    authClient, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
        Username:    "u",
        Password:    "123",
    })
    if err != nil {panic(err)
    }
    defer authClient.Close()
    
  // 增加或批改节点 test1
    _, err = authClient.Put(context.TODO(), "test1", "1")
    if err != nil {panic(err)
    }
    
  // 开启事务
    _, err = authClient.Txn(context.TODO()).
        If(clientv3.Compare(clientv3.Value("test4"), ">", "1")).
        Then(clientv3.OpPut("test4", "yes")).
        Else(clientv3.OpPut("test4", "no")).
        Commit()
    fmt.Println(err)

  // 以 root 用户创立客户端
    rootClient, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
        Username:    "root",
        Password:    "123",
    })
    if err != nil {panic(err)
    }
    defer rootClient.Close()
    
  // 获取角色 r 的信息
    resp, err := rootClient.RoleGet(context.TODO(), "r")
    if err != nil {panic(err)
    }
    fmt.Printf("用户 u 的权限:起始 key %q,完结 key %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd)
    
  // 敞开集群的身份认证
    _, err = rootClient.AuthDisable(context.TODO())
    if err != nil {panic(err)
    }
}

后果:

已增加角色:root
已创立用户,用户名:root,明码:123
已为用户 root 授予 root 角色权限
已增加角色:r
角色 r 对节点 test 到 test3 增加写权限
已创立用户,用户名:u,明码:123
已为用户 u 授予角色 r 的权限
已为集群开启权限认证
{"level":"warn","ts":"2021-04-21T09:01:38.789+0800","caller":"v3@v3.5.0-alpha.0/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc0003ba1c0/#initially=[172.17.0.2:2379;172.17.0.3:2379;172.17.0.4:2379]","attempt":0,"error":"rpc error: code = PermissionDenied desc = etcdserver: permission denied"}
etcdserver: permission denied
用户 u 的权限:起始 key "test",完结 key "test3"
退出移动版