一、etcd
1、简介
etcd是应用Go语言开发的一个开源的、高可用的分布式key-value存储系统,能够用于配置共享和服务的注册和发现。相似我的项目有`zookeeper`和`consul`。
2、特点
`齐全复制`:集群中的每个节点都能够应用残缺的存档`高可用性`:Etcd可用于防止硬件的单点故障或网络问题`一致性`:每次读取都会返回跨多主机的最新写入`简略`:包含一个定义良好、面向用户的API(gRPC)`平安`:实现了带有可选的客户端证书身份验证的自动化TLS`疾速`:每秒10000次写入的基准速度`牢靠`:应用Raft算法实现了强统一、高可用的服务存储目录
3、利用场景
服务发现配置核心分布式锁 - 1.放弃独占 2.管制时序
注: 奇数节点准则
4、etcd集群
...
5、go 操作 etcd
go get go.etcd.io/etcd/clientv3
官网文档
PUT GET
package mainimport ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3")func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! fmt.Printf("connect to etcd failed, err:%v\n", err) return } fmt.Println("connect to etcd success") defer cli.Close() // put ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err = cli.Put(ctx, "kk", "123") cancel() if err != nil { fmt.Printf("put to etcd failed, err:%v\n", err) return } // get ctx, cancel = context.WithTimeout(context.Background(), time.Second) resp, err := cli.Get(ctx, "kk") cancel() if err != nil { fmt.Printf("get from etcd failed, err:%v\n", err) return } for _, ev := range resp.Kvs { fmt.Printf("%s:%s\n", ev.Key, ev.Value) }}
WATCH
package mainimport ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3")func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { fmt.Printf("connect to etcd failed, err:%v\n", err) return } fmt.Println("connect to etcd success") defer cli.Close() // watch key:kk change rch := cli.Watch(context.Background(), "kk") // <-chan WatchResponse for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } }}
命令行操作更改键值
etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put kk "123"OKetcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 del kk1etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put kk "321"OK
告诉
watch> ./watchconnect to etcd successType: PUT Key:kk Value:123Type: DELETE Key:kk Value:Type: PUT Key:kk Value:321
LEASE
package mainimport ( "fmt" "time")// etcd leaseimport ( "context" "log" "go.etcd.io/etcd/clientv3")func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: time.Second * 5, }) if err != nil { log.Fatal(err) } fmt.Println("connect to etcd success.") defer cli.Close() // 创立一个5秒的租约 resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } // 5秒钟之后, /nazha/ 这个key就会被移除 _, err = cli.Put(context.TODO(), "/kk/", "123", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) }}
KEEPALIVE
package mainimport ( "context" "fmt" "log" "time" "go.etcd.io/etcd/clientv3")// etcd keepAlivefunc main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: time.Second * 5, }) if err != nil { log.Fatal(err) } fmt.Println("connect to etcd success.") defer cli.Close() resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } _, err = cli.Put(context.TODO(), "/kk/", "123", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) } // the key 'foo' will be kept forever ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID) if kaerr != nil { log.Fatal(kaerr) } for { ka := <-ch fmt.Println("ttl:", ka.TTL) }}
6、分布式锁
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})if err != nil { log.Fatal(err)}defer cli.Close()// 创立两个独自的会话用来演示锁竞争s1, err := concurrency.NewSession(cli)if err != nil { log.Fatal(err)}defer s1.Close()m1 := concurrency.NewMutex(s1, "/my-lock/")s2, err := concurrency.NewSession(cli)if err != nil { log.Fatal(err)}defer s2.Close()m2 := concurrency.NewMutex(s2, "/my-lock/")// 会话s1获取锁if err := m1.Lock(context.TODO()); err != nil { log.Fatal(err)}fmt.Println("acquired lock for s1")m2Locked := make(chan struct{})go func() { defer close(m2Locked) // 期待直到会话s1开释了/my-lock/的锁 if err := m2.Lock(context.TODO()); err != nil { log.Fatal(err) }}()if err := m1.Unlock(context.TODO()); err != nil { log.Fatal(err)}fmt.Println("released lock for s1")<-m2Lockedfmt.Println("acquired lock for s2")
输入
acquired lock for s1released lock for s1acquired lock for s2