关于golang:golang微服务之注册与发现

21次阅读

共计 15579 个字符,预计需要花费 39 分钟才能阅读完成。

在微服务中,服务注册与发现是必不可少的一环,其中 etcd,zookeeper,consul 在 golang 中较为罕用。

程序对于这种中间件的依赖,都倡议加一层接口,基于接口去实现

上面会分层三大板块去阐明

接口定义

因为是下层调用是基于接口调用的,所以须要将 discover 和 register 接口化。在 discover 发现 endpoint 有变动时,须要调用 callback 去更新本地的缓存,所以也须要 endpoint 的接口。如下

const (
    EtcdBackend      = "etcd"
    ZookeeperBackend = "zookeeper"
    ConsulBackend    = "consul"
)

// 服务发现接口

type Discover interface {
    // Start watch with block, 须要一个 callback 去更新本地 endpoint
    Start(callback EndpointCacher)
    Stop()}

// 服务注册接口

type Register interface {Start() error
    Stop() error}

// endpoint 接口
type EndpointCacher interface {AddOrUpdate(endpoint string, attribute []byte)
    Delete(endpoint string)
    AddError(err error)
    Error(err error)
}

接口定义完了,须要有连贯注册核心的信息配置。
// 服务发现端配置
type DiscoverConfig struct {

BackendType      string   // one of etcd|consul|zookeeper
BackendEndPoints []string // register backend endpoint
DiscoverPrefix   string
ServiceName      string
HostName         string

}

// 注册端配置
type RegisterConfig struct {

BackendType         string   // one of etcd|consul|zookeeper
BackendEndPoints    []string // register backend endpoint
DiscoverPrefix      string
ServiceName         string
HeartBeatPeriod     int64
ServiceEndPoint     string // register service endpoint to backend
Attr                string // custom attribute. like: {"hostname": "xxx", "weight": 1}
HealthCheckEndPoint string

}
有了配置之后,须要有注册和服务发现实例的创立办法

// NewDiscover 创立一个服务发现实例
func NewDiscover(cfg *DiscoverConfig) (Discover, error) {
    switch cfg.BackendType {
    case EtcdBackend:
        return newEtcdDiscover(cfg)
    case ConsulBackend:
        return newConsulDiscover(cfg)
    case ZookeeperBackend:
        return newZookeeperDiscover(cfg)
    }
    return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}



// NewRegister 创立一个注册实例
func NewRegister(cfg *RegisterConfig) (Register, error) {
    switch cfg.BackendType {
    case EtcdBackend:
        return newEtcdRegister(cfg)
    case ConsulBackend:
        return newConsulRegister(cfg)
    case ZookeeperBackend:
        return newZookeeperRegister(cfg)
    }
    return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}

这里实现了简易版的 endpoint,可供参考,次要是将 endpoint 存在 map 中,而后有变动时做变更
// LiteEndpoint EndpointCacher lite impl

type LiteEndpoint struct {Endpoints map[string][]byte `json:"value"`
    lock      sync.Mutex
    Err       error
}

func NewLiteEndpoint() *LiteEndpoint {
    return &LiteEndpoint{Endpoints: map[string][]byte{},
        lock:      sync.Mutex{},}
}

func (e *LiteEndpoint) AddOrUpdate(endpoint string, attribute []byte) {e.lock.Lock()
    defer e.lock.Unlock()
    e.Endpoints[endpoint] = attribute
}

func (e *LiteEndpoint) Delete(endpoint string) {e.lock.Lock()
    defer e.lock.Unlock()
    delete(e.Endpoints, endpoint)
}

func (e *LiteEndpoint) Error(err error) {e.Err = err}

func (e *LiteEndpoint) List() []string {var endpointSlice []string
    for k, _ := range e.Endpoints {endpointSlice = append(endpointSlice, k)
    }
    return endpointSlice
}

func (e *LiteEndpoint) Attr(endpoint string) []byte {return e.Endpoints[endpoint]
}

接口的具体实现

etcd

注册服务
服务注册到 etcd 后,利用 etcd 租期的个性,每次续租几秒,在续期过期前实现续租。当实例异样时无奈续租,则会在 etcd 端该实例会被过期删除,达到下线异样节点的成果。

func newEtcdRegister(cfg *RegisterConfig) (*etcdRegister, error) {
    var err error
    etcdClient, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
    if err != nil {return nil, err}

    r := &etcdRegister{
        etcdEndpoints:  cfg.BackendEndPoints,
        discoverPrefix: cfg.DiscoverPrefix,
        serviceName:    cfg.ServiceName,
        endpoint:       cfg.ServiceEndPoint,
        attr:           cfg.Attr,
        ttl:            cfg.HeartBeatPeriod,
        stopCh:         make(chan struct{}),
        etcdClient:     etcdClient,
    }
    return r, nil
}

// Start 开启一个协程
func (r *etcdRegister) Start() error {go r.keepAlive()
    return nil
}

func (r *etcdRegister) Stop() error {close(r.stopCh)

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()
    _, err := r.etcdClient.Delete(ctx, r.key())
    //if r.grpcResolver != nil {//    return r.grpcResolver.Update(ctx, r.key(), grpcnaming.Update{Op: grpcnaming.Delete, Addr: r.endpoint})
    //}
    return err
}

// 定时续租
func (r *etcdRegister) keepAlive() {duration := time.Duration(r.ttl) * time.Second
    timer := time.NewTimer(duration)
    for {
        select {
        case <-r.stopCh:
            return
        case <-timer.C:
            if r.leaseID > 0 {if err := r.leaseRenewal(); err != nil {logrus.Warnf("%s leaseid[%x] keepAlive err: %s, try to reset...", r.endpoint, r.leaseID, err.Error())
                    r.leaseID = 0
                }
            } else {if err := r.register(); err != nil {logrus.Warnf("register endpoint %s error: %s", r.endpoint, err.Error())
                } else {logrus.Infof("register endppint %s success", r.endpoint)
                }
            }
            timer.Reset(duration)
        }
    }
}

func (r *etcdRegister) register() error {ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()
    resp, err := r.etcdClient.Grant(ctx, r.ttl+3)
    if err != nil {return err}

    _, err = r.etcdClient.Put(ctx, r.key(), r.attr, clientv3.WithLease(resp.ID))
    r.leaseID = resp.ID
    return err
}

func (r *etcdRegister) leaseRenewal() error {ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()
    _, err := r.etcdClient.KeepAliveOnce(ctx, r.leaseID)
    return err
}

func (r *etcdRegister) key() string {return toEtcdKey(r.discoverPrefix, r.serviceName, r.endpoint)
}

func toEtcdKey(elem ...string) string {return strings.Join(elem, "/")
}

func tailKey(key []byte) string {keyStr := string(key)
    topicSlice := strings.Split(keyStr, "/")
    if len(topicSlice) != 0 {return topicSlice[len(topicSlice)-1]
    }
    return keyStr
}

服务发现端
服务发现端须要一直的监听 key 的变动,所以须要 watch,有更新后须要调用 callback 来更新本地的 endpoint 列表

// etcd discover impl
type etcdDiscover struct {
    ctx        context.Context
    cancel     context.CancelFunc
    etcdClient *clientv3.Client
    prefix     string
}

func newEtcdDiscover(cfg *DiscoverConfig) (*etcdDiscover, error) {cli, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
    if err != nil {return nil, err}
    ctx, cancel := context.WithCancel(context.Background())
    return &etcdDiscover{
        ctx:        ctx,
        cancel:     cancel,
        etcdClient: cli,
        prefix:     cfg.DiscoverPrefix,
    }, nil
}

func (d *etcdDiscover) Start(callback EndpointCacher) {d.discover(callback)
}

func (d *etcdDiscover) discover(callback EndpointCacher) {ctx, cancel := context.WithCancel(d.ctx)
    defer cancel()

    if err := d.listService(ctx, callback); err != nil {callback.AddError(err)
    }

    watch := d.etcdClient.Watch(ctx, d.prefix, clientv3.WithPrefix())
    for {
        select {case <-d.ctx.Done():
            return
        case resp := <-watch:
            if err := resp.Err(); err != nil {callback.AddError(err)
                return
            }
            for _, event := range resp.Events {
                if event.Kv == nil {continue}
                switch event.Type {
                case mvccpb.PUT:
                    callback.AddOrUpdate(tailKey(event.Kv.Key), event.Kv.Value)
                case mvccpb.DELETE:
                    callback.Delete(tailKey(event.Kv.Key))
                }
            }
        }
    }
}

func (d *etcdDiscover) Stop() {d.cancel()
}

func (d *etcdDiscover) listService(ctx context.Context, callback EndpointCacher) error {resp, err := d.etcdClient.Get(ctx, d.prefix, clientv3.WithPrefix())
    if err != nil {return err}
    for _, kv := range resp.Kvs {callback.AddOrUpdate(tailKey(kv.Key), kv.Value)
    }
    return nil
}

zookeeper

zookeeper 则利用长期节点的个性,来做异样服务下线性能
服务注册端

type zookeeperRegister struct {zkEndpoints []string
    prefix      string
    serviceName string
    endpoint    string
    attr        string
    ttl         int64
    stopCh      chan struct{}
    conn        *zk.Conn
}

func newZookeeperRegister(cfg *RegisterConfig) (*zookeeperRegister, error) {
    r := zookeeperRegister{
        zkEndpoints: cfg.BackendEndPoints,
        prefix:      cfg.DiscoverPrefix,
        serviceName: cfg.ServiceName,
        endpoint:    cfg.ServiceEndPoint,
        attr:        cfg.Attr,
        ttl:         cfg.HeartBeatPeriod,
    }
    return &r, nil
}

func (r *zookeeperRegister) Start() error {
    var err error
    r.conn, _, err = zk.Connect(r.zkEndpoints, time.Second*5)
    if err != nil {return err}

    return r.register()}

func (r *zookeeperRegister) Stop() error {
    if r.conn != nil {r.conn.Close()
    }
    return nil
}

func (r *zookeeperRegister) register() error {if err := r.createIfNotExist(r.node(), nil, 0); err != nil {return err}

    return r.createOrUpdateEndpoint(r.key(), []byte(r.attr))
}

func (r *zookeeperRegister) createOrUpdateEndpoint(path string, data []byte) error {exist, _, err := r.conn.Exists(path)
    if err != nil {return err}

    if !exist {_, err = r.conn.Create(path, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if err != nil {return err}
        return nil
    }

    _, stat, err := r.conn.Get(path)
    if err != nil {return err}
    _, err = r.conn.Set(path, []byte(r.attr), stat.Version)
    if err != nil {return err}

    return nil

}

func (r *zookeeperRegister) createIfNotExist(path string, data []byte, flag int32) error {exist, _, err := r.conn.Exists(r.node())
    if err != nil {return err}

    if !exist {_, err = r.conn.Create(path, data, flag, zk.WorldACL(zk.PermAll))
        if err != nil {return err}
    }
    return nil
}

func (r *zookeeperRegister) node() string {return fmt.Sprintf("%s/%s", r.prefix, r.serviceName)
}

func (r *zookeeperRegister) key() string {return fmt.Sprintf("%s/%s/%s", r.prefix, r.serviceName, r.endpoint)
}

服务发现端

type zookeeperDiscover struct {
    ctx         context.Context
    cancel      context.CancelFunc
    conn        *zk.Conn
    prefix      string
    serviceName string
}

func newZookeeperDiscover(cfg *DiscoverConfig) (*zookeeperDiscover, error) {conn, _, err := zk.Connect(cfg.BackendEndPoints, 10*time.Second)
    if err != nil {return nil, err}

    ctx, cancel := context.WithCancel(context.Background())
    d := &zookeeperDiscover{
        ctx:         ctx,
        cancel:      cancel,
        conn:        conn,
        prefix:      cfg.DiscoverPrefix,
        serviceName: cfg.ServiceName,
    }
    return d, nil
}

func (d *zookeeperDiscover) Start(callback EndpointCacher) {d.discover(callback)
}

func (d *zookeeperDiscover) Stop() {d.cancel()
    if d.conn != nil {d.conn.Close()
    }
}

func (d *zookeeperDiscover) discover(callback EndpointCacher) {if err := d.listService(callback); err != nil {callback.AddError(err)
        return
    }
    for {snapshot, _, ch, err := d.conn.ChildrenW(d.key())
        if err != nil {callback.AddError(err)
            return
        }
        select {
        case e := <-ch:
            switch e.Type {
            case zk.EventNodeCreated, zk.EventNodeChildrenChanged:
                for _, v := range snapshot {callback.Delete(v)
                }
                if err := d.listService(callback); err != nil {callback.AddError(err)
                }
            case zk.EventNodeDeleted:
                for _, v := range snapshot {callback.Delete(v)
                }
            }
        }
    }
}

func (d *zookeeperDiscover) getNodeProperty(path string) ([]byte, error) {value, _, err := d.conn.Get(path)
    return value, err
}

func (d *zookeeperDiscover) listService(callback EndpointCacher) error {childs, _, err := d.conn.Children(d.key())
    if err != nil {return err}

    for _, c := range childs {value, _, err := d.conn.Get(fmt.Sprintf("%s/%s", d.key(), c))
        if err != nil {return err}
        callback.AddOrUpdate(c, value)
    }
    return nil
}

func (d *zookeeperDiscover) key() string {return fmt.Sprintf("%s/%s", d.prefix, d.serviceName)
}

consul

服务注册端

type consulRegister struct {
    prefix              string
    serviceName         string
    serviceId           string
    endpoint            string
    healthCheckEndpoint string
    attr                string
    ttl                 int64
    stopCh              chan struct{}
    client              *consulapi.Client
}

func newConsulRegister(cfg *RegisterConfig) (*consulRegister, error) {config := consulapi.DefaultConfig()
    config.Address = strings.Join(cfg.BackendEndPoints, ",")
    client, err := consulapi.NewClient(config)
    if err != nil {return nil, err}

    r := &consulRegister{
        prefix:              cfg.DiscoverPrefix,
        serviceName:         cfg.ServiceName,
        endpoint:            cfg.ServiceEndPoint,
        healthCheckEndpoint: cfg.HealthCheckEndPoint,
        attr:                cfg.Attr,
        ttl:                 cfg.HeartBeatPeriod,
        stopCh:              make(chan struct{}),
        client:              client,
    }
    return r, nil
}

func (s *consulRegister) Start() error {return s.register()
}

func (s *consulRegister) Stop() error {
    if s.serviceId != "" {return s.client.Agent().ServiceDeregister(s.serviceId)
    }
    return nil
}

func (s *consulRegister) register() error {registration := new(consulapi.AgentServiceRegistration)

    address, port, err := net.SplitHostPort(s.endpoint)
    if err != nil {return err}

    registration.Address = address
    portInt, err := strconv.Atoi(port)
    if err != nil {return err}
    registration.Port = portInt

    serviceId := fmt.Sprintf("%s_%s", s.serviceName, address)
    s.serviceId = serviceId

    registration.Name = s.serviceName
    registration.ID = serviceId

    serviceCheck := new(consulapi.AgentServiceCheck)
    serviceCheck.HTTP = fmt.Sprintf("http://%s", s.healthCheckEndpoint)
    serviceCheck.Timeout = "2s"
    serviceCheck.Interval = "2s"
    serviceCheck.DeregisterCriticalServiceAfter = "30s"

    registration.Check = serviceCheck
    return s.client.Agent().ServiceRegister(registration)
}

服务发现端

type consulDiscover struct {
    ctx         context.Context
    cancel      context.CancelFunc
    prefix      string
    serviceName string
    client      *consulapi.Client
}

func newConsulDiscover(cfg *DiscoverConfig) (*consulDiscover, error) {config := consulapi.DefaultConfig()
    config.Address = strings.Join(cfg.BackendEndPoints, ",")
    client, err := consulapi.NewClient(config)
    if err != nil {return nil, err}

    ctx, cancel := context.WithCancel(context.Background())
    d := &consulDiscover{
        ctx:         ctx,
        cancel:      cancel,
        prefix:      cfg.DiscoverPrefix,
        serviceName: cfg.ServiceName,
        client:      client,
    }
    return d, nil
}

func (d *consulDiscover) Start(callback EndpointCacher) {d.discover(callback)
}

func (d *consulDiscover) Stop() {d.cancel()
}

func (d *consulDiscover) discover(callback EndpointCacher) {
    var lastIndex uint64
    for {
        select {case <-d.ctx.Done():
            return
        default:
            services, queryMeta, err := d.client.Health().Service(
                d.serviceName, "", false, &consulapi.QueryOptions{WaitIndex: lastIndex,})
            if err != nil {callback.AddError(err)
            }
            lastIndex = queryMeta.LastIndex

            for _, service := range services {var attr []byte
                endpoint := fmt.Sprintf("%s:%v", service.Service.Address, service.Service.Port)
                switch service.Checks.AggregatedStatus() {
                case consulapi.HealthPassing:
                    if service.Service.Meta != nil {attr, _ = json.Marshal(service.Service.Meta)
                    }
                    callback.AddOrUpdate(endpoint, attr)
                case consulapi.HealthCritical, consulapi.HealthWarning:
                    callback.Delete(endpoint)
                }
            }
        }
    }
}

func (d *consulDiscover) listService() {d.client.Agent().Services()}

示例代码

etcd

服务发现端

package main

import (
    "fmt"
    "os"
    
    "github.com/goeasya/discox"
)

func main() {
    cfg := discox.RegisterConfig{
        BackendType:      discox.EtcdBackend,
        BackendEndPoints: []string{"http://10.1.1.1:23790"},
        DiscoverPrefix:   "/discox/etcddemo",
        ServiceName:      "demo",
        HeartBeatPeriod:  5,
        ServiceEndPoint:  "127.0.0.1:8111",
    }
    service, err := discox.NewRegister(&cfg)
    if err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }

    if err = service.Start(); err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }
    defer service.Stop()
    select {}}

服务注册端

package main

import (
    "fmt"
    "os"
    "time"
    
    "github.com/goeasya/discox"
)

func main() {timer := time.NewTimer(time.Second * 5)
    cfg := discox.DiscoverConfig{BackendEndPoints: []string{"http://10.1.1.1:23790"},
        BackendType:      discox.EtcdBackend,
        DiscoverPrefix:   "/discox/etcddemo",
        ServiceName:      "demo",
    }
    server, err := discox.NewDiscover(&cfg)
    if err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }

    endpointCacher := discox.NewLiteEndpoint()
    go server.Start(endpointCacher)
    defer server.Stop()

    for {
        select {
        case <-timer.C:
            fmt.Println("time 5 seconds")
            endpoints := endpointCacher.List()
            fmt.Println(endpoints, len(endpoints))
            timer.Reset(time.Second * 5)
        }
    }
}

zookeeper

服务发现端

package main

import (
    "fmt"
    "github.com/goeasya/discox"
    "os"
)

func main() {
    cfg := discox.RegisterConfig{
        BackendType:      discox.ZookeeperBackend,
        BackendEndPoints: []string{"10.1.1.1:2181"},
        DiscoverPrefix:   "/soaservices",
        ServiceName:      "demo",
        HeartBeatPeriod:  5,
        ServiceEndPoint:  "127.0.0.1:8111",
    }
    service, err := discox.NewRegister(&cfg)
    if err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }

    if err = service.Start(); err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }
    defer service.Stop()
    select {}}

服务注册端

package main

import (
    "fmt"
    "github.com/goeasya/discox"
    "os"
    "time"
)

func main() {timer := time.NewTimer(time.Second * 5)
    cfg := discox.DiscoverConfig{BackendEndPoints: []string{"10.1.1.1:2181"},
        BackendType:      discox.ZookeeperBackend,
        DiscoverPrefix:   "/soaservices",
        ServiceName:      "demo",
    }
    server, err := discox.NewDiscover(&cfg)
    if err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }

    endpointCacher := discox.NewLiteEndpoint()
    go server.Start(endpointCacher)
    defer server.Stop()

    for {
        select {
        case <-timer.C:
            fmt.Println("time 5 seconds")
            endpoints := endpointCacher.List()
            fmt.Println(endpoints, len(endpoints))
            timer.Reset(time.Second * 5)
        }
    }
}

consul

服务发现端

package main

import (
    "fmt"
    "net/http"
    "os"
    
    "github.com/goeasya/discox"
)

func main() {
    cfg := discox.RegisterConfig{
        BackendType:         discox.ConsulBackend,
        BackendEndPoints:    []string{"consul.test.com"},
        DiscoverPrefix:      "/soaservices",
        ServiceName:         "demo",
        HeartBeatPeriod:     5,
        ServiceEndPoint:     "172.18.1.1:8080",
        HealthCheckEndPoint: "172.18.1.1:8080/check",
    }
    service, err := discox.NewRegister(&cfg)
    if err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }

    if err = service.Start(); err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }
    http.HandleFunc("/check", consulCheck)
    go http.ListenAndServe(":8080", nil)
    defer service.Stop()
    select {}}

var count int64

func consulCheck(w http.ResponseWriter, r *http.Request) {s := "consulCheck" + fmt.Sprint(count) + "remote:" + r.RemoteAddr + " " + r.URL.String()
    fmt.Println(s)
    fmt.Fprintln(w, s)
    count++
}

服务注册端

package main

import (
    "fmt"
    "os"
    "time"
    
    "github.com/goeasya/discox"
)

func main() {timer := time.NewTimer(time.Second * 5)
    cfg := discox.DiscoverConfig{BackendEndPoints: []string{"consul.test.com"},
        BackendType:      discox.ConsulBackend,
        DiscoverPrefix:   "/soaservices",
        ServiceName:      "demo",
    }
    server, err := discox.NewDiscover(&cfg)
    if err != nil {fmt.Println(err.Error())
        os.Exit(1)
    }

    endpointCacher := discox.NewLiteEndpoint()
    go server.Start(endpointCacher)
    defer server.Stop()

    for {
        select {
        case <-timer.C:
            fmt.Println("time 5 seconds")
            endpoints := endpointCacher.List()
            fmt.Println(endpoints, len(endpoints))
            timer.Reset(time.Second * 5)
        }
    }
}

代码 github 地址 https://github.com/goeasya/di…

正文完
 0