GO 中 ETCD 的编码案例分享
咱们来回顾一下上次咱们说到的 服务注册和发现
- 分享了服务注册和发现是什么
- CAP 定理是什么
- ETCD 是什么,以及 ETCD 和 Zookeeper 的比照
- ETCD 的分布式锁实现的简略原理
要是对 服务注册与发现,ETCD 还有点趣味的话,欢送查看文章 服务注册与发现之 ETCD
明天咱们来看看 GO 如何去操作 ETCD,这个开源的、高可用的分布式 key-value 存储系统
感兴趣的小伙伴能够看看 GO 的 ETCD 官网文档
https://pkg.go.dev/go.etcd.io…
依据官网文档,咱们本次分享几个点
- ETCD 如何装置
- ETCD 外面对于 KEY 的 PUT 和 GET 操作
- WATCH 操作
- Lease 租约
- KeepAlive 保活
- ETCD 分布式锁的实现
ETCD 如何装置
ETCD 的装置和部署
这里咱们就做一个简略的单机部署
- 到
github
上 下载最新的 etcd 包,https://github.com/etcd-io/et… - 解压后,将 etcd 和 etcdctl 拷贝到咱们的
$GOBIN
目录下,或者退出咱们零碎的环境变量即可(目标是 能够间接键入 etcd 零碎可能运行该可执行文件) - 能够应用
etcd --version
查看版本
对于 ETCD 的命令就不在此做过的分享了,明天次要是分享 GO 如何 应用 ETCD
包的装置
本次咱们应用的是 ETCD 的 clientv3
包,咱们执行如下命令即可正确装置 ETCD
go get go.etcd.io/etcd/clientv3
无论你是间接执行下面的命令,还是通过 go mod 的形式,去下载 ETCD 的 clientv3
包,可能会呈现如下问题:
/root/go/pkg/mod/github.com/coreos/etcd@v3.3.25+incompatible/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
/root/go/pkg/mod/github.com/coreos/etcd@v3.3.25+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption
/root/go/pkg/mod/github.com/coreos/etcd@v3.3.25+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOption
如上问题,是因为包抵触了,咱们只须要将如下替换包的命令放到 咱们 go.mod
上面即可
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
例如我的 go.mod 是这样的
module my_etcd
go 1.15
require (
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.2.0 // indirect
go.etcd.io/etcd v3.3.25+incompatible
go.uber.org/zap v1.17.0 // indirect
google.golang.org/grpc v1.38.0 // indirect
)
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
这里顺便插一句,go mod 进行包治理的形式从 GO 1.14 之后就开始有了,go mod 治理包十分不便,这里简略分享一下如何应用
- 在和 main.go 的同级目录下,初始化一个 go mod,执行如下命令
go mod init xxx
- 写好咱们的代码在
main.go
文件中,即可在main.go
的同级目录下执行 go build 进行编译 go 程序 - 若编译呈现上述问题,那么就能够在 生成的 go.mod 文件中 退出上述替换包的语句即可
包装置好了,咱们能够开始进行编码了
ETCD 的 设置 KEY 和获取 KEY 操作
ETCD 的默认端口是这样的:
2379
端口
提供 HTTP API 服务
2380
端口
用来与 peer 通信
咱们开始写一个 GET 和 PUT KEY 的 DEMO
package main
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 设置 log 参数,打印以后工夫 和 以后行数
log.SetFlags(log.Ltime | log.Llongfile)
// ETCD 默认端口号是 2379
// 应用 ETCD 的 clientv3 包
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},
// 超时工夫 10 秒
DialTimeout: 10 * time.Second,
})
if err != nil {log.Printf("connect to etcd error : %v\n", err)
return
}
log.Printf("connect to etcd successfully ...")
// defer 最初敞开 连贯
defer client.Close()
// PUT KEY 为 name , value 为 xiaomotong
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = client.Put(ctx, "name", "xiaomotong")
cancel()
if err != nil {log.Printf("PUT key to etcd error : %v\n", err)
return
}
// 获取 ETCD 的 KEY
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := client.Get(ctx, "name")
cancel()
if err != nil {log.Printf("GET key-value from etcd error : %v\n", err)
return
}
// 遍历读出 KEY 和对应的 value
for _, ev := range resp.Kvs {log.Printf("%s : %s\n", ev.Key, ev.Value)
}
}
感兴趣的小伙伴能够将上述代码拷贝到你的环境中进行运行,即可看到你想要的答案
ETCD 的 WATCH 操 作
WATCH 操作就是拍一个哨兵监控某一个 key 对应值的变动,包含新增,删除,批改
func main() {
// 设置 log 参数,打印以后工夫 和 以后行数
log.SetFlags(log.Ltime | log.Llongfile)
// ETCD 默认端口号是 2379
// 应用 ETCD 的 clientv3 包
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {log.Printf("connect to etcd error : %v\n", err)
return
}
log.Printf("connect to etcd successfully ...")
defer client.Close()
// 派一个哨兵 始终监督 name 的变动
// respCh 是一个通道
respCh := client.Watch(context.Background(), "name")
// 若 respCh 为空,会阻塞在这里
for watchResp := range respCh {
for _, v := range watchResp.Events {
log.Printf("type = %s , Key = %s , Value = %s\n",
v.Type, v.Kv.Key, v.Kv.Value)
}
}
}
上述代码因为 respCh
是一个通道,若外面没有数据的话,上面的 for 循环,会阻塞的等,因而须要咱们本人在终端下面模仿 新增,删除,批改 name 对应的值,那么,咱们的程序就会做出对应的相应
例如,我在终端命令中敲入:etcdctl --endpoints=http://127.0.0.1:2379 put name "xiaomotong"
那么,咱们上述代码运行的程序就会输入如下语句
./my_etcd
22:18:39 /home/xiaomotong/my_etcd/main.go:23: connect to etcd successfully ...
22:18:43 /home/xiaomotong/my_etcd/main.go:31:type = PUT , Key = name , Value = xiaomotong
ETCD 的 LEASE 操作
LEASE,租约,就是将本人的某一个 key 设置一个无效工夫 / 过期工夫,相似于 REDIS
外面的 SETNX
func main() {
// 设置 log 参数,打印以后工夫 和 以后行数
log.SetFlags(log.Ltime | log.Llongfile)
// ETCD 默认端口号是 2379
// 应用 ETCD 的 clientv3 包
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {log.Printf("connect to etcd error : %v\n", err)
return
}
log.Printf("connect to etcd successfully ...")
defer client.Close()
// 咱们创立一个 20 秒钟的租约
resp, err := client.Grant(context.TODO(), 20)
if err != nil {log.Printf("client.Grant error : %v\n", err)
return
}
// 20 秒钟之后, /name 这个 key 就会被移除
_, err = client.Put(context.TODO(), "/name", "xiaomotong", clientv3.WithLease(resp.ID))
if err != nil {log.Printf("client.Put error : %v\n", err)
return
}
}
上述 name , 20 秒钟之后 就会主动生效
ETCD 的保活操作
顺便说一下,keepalived
也是一个开源的组件,用作高可用,感兴趣的能够深刻理解一下
此处的 keepalived
是 保活,这里是 ETCD 的保活,能够在上述代码中做一个调整,上述的 name,不生效
func main() {
// 设置 log 参数,打印以后工夫 和 以后行数
log.SetFlags(log.Ltime | log.Llongfile)
// ETCD 默认端口号是 2379
// 应用 ETCD 的 clientv3 包
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {log.Printf("connect to etcd error : %v\n", err)
return
}
log.Printf("connect to etcd successfully ...")
defer client.Close()
// 咱们创立一个 20 秒钟的租约
resp, err := client.Grant(context.TODO(), 20)
if err != nil {log.Printf("client.Grant error : %v\n", err)
return
}
// 20 秒钟之后, /name 这个 key 就会被移除
_, err = client.Put(context.TODO(), "/name", "xiaomotong", clientv3.WithLease(resp.ID))
if err != nil {log.Printf("client.Put error : %v\n", err)
return
}
// 这个 key name,将永恒被保留
ch, kaerr := client.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {log.Fatal(kaerr)
}
for {
ka := <-ch
log.Println("ttl:", ka.TTL)
}
}
咱能够看看 keepalived
的官网阐明 ,
KeepAlive
使给定的租约永远存活。如果发送到通道的 keepalive
响应没有立刻被应用,租期客户端将至多每秒钟持续向 etcd
服务器发送keepalive
申请,直到应用最新的响应。
// KeepAlive keeps the given lease alive forever. If the keepalive response
// posted to the channel is not consumed immediately, the lease client will
// continue sending keep alive requests to the etcd server at least every
// second until latest response is consumed.
//
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
// alive stream is interrupted in some way the client cannot handle itself;
// given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
// from this closed channel is nil.
//
// If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
// no leader") or canceled by the caller (e.g. context.Canceled), the error
// is returned. Otherwise, it retries.
//
// TODO(v4.0): post errors to last keep alive message before closing
// (see https://github.com/coreos/etcd/pull/7866)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
来看看 ETCD 的分布式锁实现
这里须要引入一个新的包,"github.com/coreos/etcd/clientv3/concurrency"
不过应用 go mod
治理形式的小伙伴就不必操心了,写完代码,间接 go build
,GO 工具 会间接帮咱们下载相干包,并编译好
Go 这一点真的相当不戳
package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
)
func main (){
// 设置 log 参数,打印以后工夫 和 以后行数
log.SetFlags(log.Ltime | log.Llongfile)
// ETCD 默认端口号是 2379
// 应用 ETCD 的 clientv3 包
// Endpoints 需填入 url 列表
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"/name"}})
if err != nil {log.Printf("connect to etcd error : %v\n", err)
return
}
defer client.Close()
// 创立第一个 会话
session1, err := concurrency.NewSession(client)
if err != nil {log.Printf("concurrency.NewSession 1 error : %v\n", err)
return
}
defer session1.Close()
// 设置锁
myMu1 := concurrency.NewMutex(session1, "/lock")
// 创立第二个 会话
session2, err := concurrency.NewSession(client)
if err != nil {log.Printf("concurrency.NewSession 2 error : %v\n", err)
return
}
defer session2.Close()
// 设置锁
myMu2 := concurrency.NewMutex(session2, "/lock")
// 会话 s1 获取锁
if err := myMu1.Lock(context.TODO()); err != nil {log.Printf("myMu1.Lock error : %v\n", err)
return
}
log.Println("Get session1 lock")
m2Chan := make(chan struct{})
go func() {defer close(m2Chan)
// 如果加锁不胜利会阻塞,晓得加锁胜利为止
// 这里是应用一个通道的形式来通信
// 当 myMu2 能加锁胜利,阐明 myMu1 解锁胜利
// 当 myMu2 加锁胜利的时候,会敞开 通道
// 敞开通道,从通道中读出来的就是 nil
if err := myMu2.Lock(context.TODO()); err != nil {log.Printf("myMu2.Lock error : %v\n", err)
return
}
}()
// 解锁
if err := myMu1.Unlock(context.TODO()); err != nil {log.Printf("myMu1.Unlock error : %v\n", err)
return
}
log.Println("Release session1 lock")
// 读取到 nil
<-m2Chan
log.Println("Get session2 lock")
}
在上述代码中,咱们创立 2 个会话来模仿分布式锁
咱们先让第 1 个会话拿到锁,并且第 2 个会话会去尝试加锁
当 第 2 个会话,正确加锁胜利的时候,会敞开一个通道,来确认本人真的加到锁了
上述第 2 个会话加锁的逻辑如下:
- 如果加锁不胜利会阻塞,晓得加锁胜利为止
- 这里是应用一个通道的形式来通信
- 当
myMu2
能加锁胜利,阐明myMu1
解锁胜利 - 当
myMu2
加锁胜利的时候,会敞开m2Chan
通道 - 敞开通道,从
m2Chan
通道中读出来的就是 nil,确认会话 2 加锁胜利
总结
- 分享了 ETCD 的简略单点部署,ETCD 应用到的包装置,以及会遇到的问题
- ETCD 的设置 和 获取 KEY
- ETCD 的 WATCH 监控 KEY 的简化
- ETCD 的租约 和保活机制
- ETCD 的分布式锁的简略实现
如上的编码案例,大家能够拿下来本人运行看看成果,一起学习,一起提高
若想更多的深刻理解和学习,能够看文章最开始说到的官网文档,官网文档中的案例更加详尽
具体的源码也是十分具体的,就怕你学不会
欢送点赞,关注,珍藏
敌人们,你的反对和激励,是我保持分享,提高质量的能源
好了,本次就到这里,下一次 分享 GO 中 string 的实现原理
技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。
我是 小魔童哪吒,欢送点赞关注珍藏,下次见~