一、etcd

1、简介

etcd是应用Go语言开发的一个开源的、高可用的分布式key-value存储系统,能够用于配置共享和服务的注册和发现。相似我的项目有`zookeeper`和`consul`。

2、特点

`齐全复制`:集群中的每个节点都能够应用残缺的存档`高可用性`:Etcd可用于防止硬件的单点故障或网络问题`一致性`:每次读取都会返回跨多主机的最新写入`简略`:包含一个定义良好、面向用户的API(gRPC)`平安`:实现了带有可选的客户端证书身份验证的自动化TLS`疾速`:每秒10000次写入的基准速度`牢靠`:应用Raft算法实现了强统一、高可用的服务存储目录

3、利用场景

服务发现配置核心分布式锁 - 1.放弃独占 2.管制时序

注: 奇数节点准则

4、etcd集群

...

5、go 操作 etcd

go get go.etcd.io/etcd/clientv3

官网文档

PUT GET
package mainimport (    "context"    "fmt"    "time"    "go.etcd.io/etcd/clientv3")func main() {    cli, err := clientv3.New(clientv3.Config{        Endpoints:   []string{"127.0.0.1:2379"},        DialTimeout: 5 * time.Second,    })    if err != nil {        // handle error!        fmt.Printf("connect to etcd failed, err:%v\n", err)        return    }    fmt.Println("connect to etcd success")    defer cli.Close()    // put    ctx, cancel := context.WithTimeout(context.Background(), time.Second)    _, err = cli.Put(ctx, "kk", "123")    cancel()    if err != nil {        fmt.Printf("put to etcd failed, err:%v\n", err)        return    }    // get    ctx, cancel = context.WithTimeout(context.Background(), time.Second)    resp, err := cli.Get(ctx, "kk")    cancel()    if err != nil {        fmt.Printf("get from etcd failed, err:%v\n", err)        return    }    for _, ev := range resp.Kvs {        fmt.Printf("%s:%s\n", ev.Key, ev.Value)    }}
WATCH
package mainimport (    "context"    "fmt"    "time"    "go.etcd.io/etcd/clientv3")func main() {    cli, err := clientv3.New(clientv3.Config{        Endpoints:   []string{"127.0.0.1:2379"},        DialTimeout: 5 * time.Second,    })    if err != nil {        fmt.Printf("connect to etcd failed, err:%v\n", err)        return    }    fmt.Println("connect to etcd success")    defer cli.Close()    // watch key:kk change    rch := cli.Watch(context.Background(), "kk") // <-chan WatchResponse    for wresp := range rch {        for _, ev := range wresp.Events {            fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)        }    }}

命令行操作更改键值

etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put kk "123"OKetcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 del kk1etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put kk "321"OK

告诉

watch> ./watchconnect to etcd successType: PUT Key:kk Value:123Type: DELETE Key:kk Value:Type: PUT Key:kk Value:321
LEASE
package mainimport (    "fmt"    "time")// etcd leaseimport (    "context"    "log"    "go.etcd.io/etcd/clientv3")func main() {    cli, err := clientv3.New(clientv3.Config{        Endpoints:   []string{"127.0.0.1:2379"},        DialTimeout: time.Second * 5,    })    if err != nil {        log.Fatal(err)    }    fmt.Println("connect to etcd success.")    defer cli.Close()    // 创立一个5秒的租约    resp, err := cli.Grant(context.TODO(), 5)    if err != nil {        log.Fatal(err)    }    // 5秒钟之后, /nazha/ 这个key就会被移除    _, err = cli.Put(context.TODO(), "/kk/", "123", clientv3.WithLease(resp.ID))    if err != nil {        log.Fatal(err)    }}

KEEPALIVE

package mainimport (    "context"    "fmt"    "log"    "time"    "go.etcd.io/etcd/clientv3")// etcd keepAlivefunc main() {    cli, err := clientv3.New(clientv3.Config{        Endpoints:   []string{"127.0.0.1:2379"},        DialTimeout: time.Second * 5,    })    if err != nil {        log.Fatal(err)    }    fmt.Println("connect to etcd success.")    defer cli.Close()    resp, err := cli.Grant(context.TODO(), 5)    if err != nil {        log.Fatal(err)    }    _, err = cli.Put(context.TODO(), "/kk/", "123", clientv3.WithLease(resp.ID))    if err != nil {        log.Fatal(err)    }    // the key 'foo' will be kept forever    ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)    if kaerr != nil {        log.Fatal(kaerr)    }    for {        ka := <-ch        fmt.Println("ttl:", ka.TTL)    }}

6、分布式锁

cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})if err != nil {    log.Fatal(err)}defer cli.Close()// 创立两个独自的会话用来演示锁竞争s1, err := concurrency.NewSession(cli)if err != nil {    log.Fatal(err)}defer s1.Close()m1 := concurrency.NewMutex(s1, "/my-lock/")s2, err := concurrency.NewSession(cli)if err != nil {    log.Fatal(err)}defer s2.Close()m2 := concurrency.NewMutex(s2, "/my-lock/")// 会话s1获取锁if err := m1.Lock(context.TODO()); err != nil {    log.Fatal(err)}fmt.Println("acquired lock for s1")m2Locked := make(chan struct{})go func() {    defer close(m2Locked)    // 期待直到会话s1开释了/my-lock/的锁    if err := m2.Lock(context.TODO()); err != nil {        log.Fatal(err)    }}()if err := m1.Unlock(context.TODO()); err != nil {    log.Fatal(err)}fmt.Println("released lock for s1")<-m2Lockedfmt.Println("acquired lock for s2")

输入

acquired lock for s1released lock for s1acquired lock for s2