转载自:实现etcd服务注册与发现

0.1、目录构造

.├── api│   └── main.go├── common│   └── common.go├── docker-compose.yml├── etcd│   └── Dockerfile├── go.mod├── go.sum├── rpc│   ├── courseware│   │   ├── courseware.pb.go│   │   └── courseware_grpc.pb.go│   ├── courseware.proto│   └── main.go└── server    ├── service_discovery.go    └── service_registration.go

1、docker-compose部署一个3节点的集群

我的项目根目录下创立etcd目录,并在目录下新增Dockerfile文件

FROM bitnami/etcd:latestLABEL maintainer="liuyuede123 <liufutianoppo@163.com>"

我的项目根目录下新增docker-compose.yml

version: '3.5'# 网络配置networks:  backend:    driver: bridge# 服务容器配置services:  etcd1:                                  # 自定义容器名称    build:      context: etcd                    # 指定构建应用的 Dockerfile 文件    environment:      - TZ=Asia/Shanghai      - ALLOW_NONE_AUTHENTICATION=yes      - ETCD_NAME=etcd1      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380      - ETCD_INITIAL_CLUSTER_STATE=new    ports:                               # 设置端口映射      - "12379:2379"      - "12380:2380"    networks:      - backend    restart: always  etcd2: # 自定义容器名称    build:      context: etcd                    # 指定构建应用的 Dockerfile 文件    environment:      - TZ=Asia/Shanghai      - ALLOW_NONE_AUTHENTICATION=yes      - ETCD_NAME=etcd2      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380      - ETCD_INITIAL_CLUSTER_STATE=new    ports: # 设置端口映射      - "22379:2379"      - "22380:2380"    networks:      - backend    restart: always  etcd3: # 自定义容器名称    build:      context: etcd                    # 指定构建应用的 Dockerfile 文件    environment:      - TZ=Asia/Shanghai      - ALLOW_NONE_AUTHENTICATION=yes      - ETCD_NAME=etcd3      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380      - ETCD_INITIAL_CLUSTER_STATE=new    ports: # 设置端口映射      - "32379:2379"      - "32380:2380"    networks:      - backend    restart: always

相干参数概念:

  1. ETCD_INITIAL_ADVERTISE_PEER_URLS:该成员节点在整个集群中的通信地址列表,这个地址用来传输集群数据的地址。因而这个地址必须是能够连贯集群中所有的成员的。
  2. ETCD_LISTEN_PEER_URLS:该节点与其余节点通信时所监听的地址列表,多个地址应用逗号隔开,其格局能够划分为scheme://IP:PORT,这里的scheme能够是http、https
  3. ETCD_LISTEN_CLIENT_URLS:该节点与客户端通信时监听的地址列表
  4. ETCD_ADVERTISE_CLIENT_URLS:播送给集群中其余成员本人的客户端地址列表
  5. ETCD_INITIAL_CLUSTER_TOKEN:初始化集群token
  6. ETCD_INITIAL_CLUSTER:配置集群外部所有成员地址,其格局为:ETCD_NAME=ETCD_INITIAL_ADVERTISE_PEER_URLS,如果有多个应用逗号隔开
  7. ETCD_INITIAL_CLUSTER_STATE:初始化集群状态,new示意新建

启动集群

docker-compose up -dCreating network "etcd_backend" with driver "bridge"Creating etcd_etcd1_1 ... doneCreating etcd_etcd2_1 ... doneCreating etcd_etcd3_1 ... done

测试集群可用性

# 登录其中一个节点docker exec -it 5f97bf0b446f6e6514576fc1eb46c2f60d2c2b3e3f3ee3b1ad6219414fa915c8 /bin/sh# 写入一个键值etcdctl put name "liuyuede"OK# 查看etcdctl get namenameliuyuede# 登录另外俩个节点docker exec -it a6ccc9b6e5cc81ee7c779e2b9e7235cd6d814e92fbc66b7e4846798acff8ee2a /bin/shetcdctl get namenameliuyuededocker exec -it 6817fa89e3e9e422628e0049910b672df389c62d41bf2349a0f77e22c99e5270 /bin/shetcdctl get namenameliuyuede

etcd集群采纳的是raft协定,个别至多为俩个集群,只有一个master,如果删除到只剩一个节点以后节点也不能提供服务

查看集群状况

etcdctl --endpoints=http://0.0.0.0:12379,0.0.0.0:22379,0.0.0.0:32379 endpoint status --write-out=table+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+|       ENDPOINT       |        ID        | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+| http://0.0.0.0:12379 | ade526d28b1f92f7 |   3.5.4 |   20 kB |      true |      false |         3 |         13 |                 13 |        ||        0.0.0.0:22379 | d282ac2ce600c1ce |   3.5.4 |   20 kB |     false |      false |         3 |         13 |                 13 |        ||        0.0.0.0:32379 | bd388e7810915853 |   3.5.4 |   20 kB |     false |      false |         3 |         13 |                 13 |        |+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+

2、减少服务注册性能

服务注册的流程

  1. 向etcd新增一个蕴含rpc服务信息的键值对,并设置租约(比方5秒过期)
  2. 利用保活函数KeepAlive一直续约
package serverimport (    "context"    "encoding/json"    "errors"    clientv3 "go.etcd.io/etcd/client/v3"    "time")type ServiceInfo struct {    Name string    Ip   string}type Service struct {    ServiceInfo ServiceInfo    stop        chan error    leaseId     clientv3.LeaseID    client      *clientv3.Client}func NewService(serviceInfo ServiceInfo, endpoints []string) (service *Service, err error) {    client, err := clientv3.New(clientv3.Config{        Endpoints:   endpoints,        DialTimeout: time.Second * 10,    })    if err != nil {        return nil, err    }    service = &Service{        ServiceInfo: serviceInfo,        client:      client,    }    return}func (s *Service) Start(ctx context.Context) (err error) {    alive, err := s.KeepAlive(ctx)    if err != nil {        return    }    for {        select {        case err = <-s.stop: // 服务端敞开返回谬误            return err        case <-s.client.Ctx().Done(): // etcd敞开            return errors.New("server closed")        case _, ok := <-alive:            if !ok { // 保活通道敞开                return s.revoke(ctx)            }        }    }}func (s *Service) KeepAlive(ctx context.Context) (<-chan *clientv3.LeaseKeepAliveResponse, error) {    info := s.ServiceInfo    key := s.getKey()    val, _ := json.Marshal(info)    // 创立租约    leaseResp, err := s.client.Grant(ctx, 5)    if err != nil {        return nil, err    }    // 写入etcd    _, err = s.client.Put(ctx, key, string(val), clientv3.WithLease(leaseResp.ID))    if err != nil {        return nil, err    }    s.leaseId = leaseResp.ID    return s.client.KeepAlive(ctx, leaseResp.ID)}// 勾销租约func (s *Service) revoke(ctx context.Context) error {    _, err := s.client.Revoke(ctx, s.leaseId)    return err}func (s *Service) getKey() string {    return s.ServiceInfo.Name + "/" + s.ServiceInfo.Ip}

3、减少服务发现

服务发现流程

  1. 实现grpc中resolver.Builder接口的Build办法
  2. 通过etcdclient获取并监听grpc服务(是否有新增或者删除)
  3. 更新到resolver.State,State 蕴含与 ClientConn 相干的以后 Resolver 状态,包含grpc的地址resolver.Address
package serverimport (    "context"    "encoding/json"    "fmt"    "go.etcd.io/etcd/api/v3/mvccpb"    clientv3 "go.etcd.io/etcd/client/v3"    "google.golang.org/grpc/resolver")type Discovery struct {    endpoints  []string    service    string    client     *clientv3.Client    clientConn resolver.ClientConn}func NewDiscovery(endpoints []string, service string) resolver.Builder {    return &Discovery{        endpoints: endpoints,        service:   service,    }}func (d *Discovery) ResolveNow(rn resolver.ResolveNowOptions) {}func (d *Discovery) Close() {}func (d *Discovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {    var err error    d.client, err = clientv3.New(clientv3.Config{        Endpoints: d.endpoints,    })    if err != nil {        return nil, err    }    d.clientConn = cc    go d.watch(d.service)    return d, nil}func (d *Discovery) Scheme() string {    return "etcd"}func (d *Discovery) watch(service string) {    addrM := make(map[string]resolver.Address)    state := resolver.State{}    update := func() {        addrList := make([]resolver.Address, 0, len(addrM))        for _, address := range addrM {            addrList = append(addrList, address)        }        state.Addresses = addrList        err := d.clientConn.UpdateState(state)        if err != nil {            fmt.Println("更新地址出错:", err)        }    }    resp, err := d.client.Get(context.Background(), service, clientv3.WithPrefix())    if err != nil {        fmt.Println("获取地址出错:", err)    } else {        for i, kv := range resp.Kvs {            info := &ServiceInfo{}            err = json.Unmarshal(kv.Value, info)            if err != nil {                fmt.Println("解析value失败:", err)            }            addrM[string(resp.Kvs[i].Key)] = resolver.Address{                Addr:       info.Ip,                ServerName: info.Name,            }        }    }    update()    dch := d.client.Watch(context.Background(), service, clientv3.WithPrefix(), clientv3.WithPrevKV())    for response := range dch {        for _, event := range response.Events {            switch event.Type {            case mvccpb.PUT:                info := &ServiceInfo{}                err = json.Unmarshal(event.Kv.Value, info)                if err != nil {                    fmt.Println("监听时解析value报错:", err)                } else {                    addrM[string(event.Kv.Key)] = resolver.Address{Addr: info.Ip}                }                fmt.Println(string(event.Kv.Key))            case mvccpb.DELETE:                delete(addrM, string(event.Kv.Key))                fmt.Println(string(event.Kv.Key))            }        }        update()    }}

4、grpc课件服务

common参数

package commonconst CoursewareRpc = "rpc.courseware"var Endpoints = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}

生成课件服务grpc

syntax = "proto3";package rpc;option go_package = "./courseware";message GetRequest {  uint64 Id = 1;}message GetResponse {  uint64 Id = 1;  string Code = 2;  string Name = 3;  uint64 Type = 4;}service Courseware {  rpc Get(GetRequest) returns(GetResponse);}
protoc --go_out=./ --go-grpc_out=./ courseware.proto

课件服务入口

package mainimport (    "context"    "fmt"    "go-demo/etcd/common"    "go-demo/etcd/rpc/courseware"    "go-demo/etcd/server"    "google.golang.org/grpc"    "google.golang.org/grpc/reflection"    "net"    "os"    "strings"    "time")var Port stringtype service struct {    courseware.UnsafeCoursewareServer}func (s *service) Get(ctx context.Context, req *courseware.GetRequest) (res *courseware.GetResponse, err error) {    fmt.Println("获取课件详情 port:", Port, " time:", time.Now())    return &courseware.GetResponse{        Id:   1,        Code: "HD4544",        Name: "多媒体课件",        Type: 4,    }, nil}func main() {    args := os.Args[1:]    if len(args) == 0 {        panic("短少port参数:port=8400")    }    for _, arg := range args {        ports := strings.Split(arg, "=")        if len(ports) < 2 || ports[0] != "port" {            panic("port参数格局谬误:port=8400")        }        Port = ports[1]    }    listen, err := net.Listen("tcp", ":"+Port)    if err != nil {        fmt.Println("failed to listen", err)        return    }    s := grpc.NewServer()    courseware.RegisterCoursewareServer(s, &service{})    reflection.Register(s)  // 注册到etcd    newService, err := server.NewService(server.ServiceInfo{        Name: common.CoursewareRpc,        Ip:   "127.0.0.1:" + Port,    }, common.Endpoints)    if err != nil {        fmt.Println("增加到etcd失败:", err)        return    }    go func() {        err = newService.Start(context.Background())        if err != nil {            fmt.Println("开启服务注册失败:", err)        }    }()    if err = s.Serve(listen); err != nil {        fmt.Println("开启rpc服务失败:", err)    }}

5、api服务

package mainimport (    "context"    "fmt"    "go-demo/etcd/common"    "go-demo/etcd/rpc/courseware"    "go-demo/etcd/server"    "google.golang.org/grpc"    "google.golang.org/grpc/credentials/insecure"    "google.golang.org/grpc/resolver"    "time")func main() {    d := server.NewDiscovery(common.Endpoints, common.CoursewareRpc)    resolver.Register(d)    for {        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)    // 通过etcd注册核心和grpc服务建设连贯        conn, err := grpc.DialContext(ctx,            fmt.Sprintf(d.Scheme()+":///"+common.CoursewareRpc),            grpc.WithTransportCredentials(insecure.NewCredentials()),            grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),            grpc.WithBlock(),        )        if err != nil {            fmt.Println("和rpc建设连贯失败:", err)            return        }        client := courseware.NewCoursewareClient(conn)        get, err := client.Get(ctx, &courseware.GetRequest{Id: 1})        if err != nil {            fmt.Println("获取课件失败:", err)            return        }        fmt.Println(get)        time.Sleep(3 * time.Second)        cancel()    }}

6、测试

开启3个服务,能够看到客户端通过负载平衡随机到一个服务申请

go run main.go port=8400获取课件详情 port: 8400  time: 2022-08-25 18:47:43.784942 +0800 CST m=+78.228450885获取课件详情 port: 8400  time: 2022-08-25 18:47:52.925858 +0800 CST m=+87.369721731获取课件详情 port: 8400  time: 2022-08-25 18:48:02.001177 +0800 CST m=+96.445393312获取课件详情 port: 8400  time: 2022-08-25 18:48:05.060066 +0800 CST m=+99.504401028获取课件详情 port: 8400  time: 2022-08-25 18:48:14.154148 +0800 CST m=+108.598836458go run main.go port=8500获取课件详情 port: 8500  time: 2022-08-25 18:47:46.832479 +0800 CST m=+62.822399701获取课件详情 port: 8500  time: 2022-08-25 18:47:49.844536 +0800 CST m=+65.834573960获取课件详情 port: 8500  time: 2022-08-25 18:47:55.955638 +0800 CST m=+71.945912584获取课件详情 port: 8500  time: 2022-08-25 18:48:17.168293 +0800 CST m=+93.159391485获取课件详情 port: 8500  time: 2022-08-25 18:48:20.182787 +0800 CST m=+96.174002796go run main.go port=8600获取课件详情 port: 8600  time: 2022-08-25 18:47:58.968283 +0800 CST m=+1.317052360获取课件详情 port: 8600  time: 2022-08-25 18:48:08.106493 +0800 CST m=+10.455617422获取课件详情 port: 8600  time: 2022-08-25 18:48:11.125212 +0800 CST m=+13.474453269