在微服务中,服务注册与发现是必不可少的一环,其中etcd,zookeeper,consul在golang中较为罕用。

程序对于这种中间件的依赖,都倡议加一层接口,基于接口去实现

上面会分层三大板块去阐明

接口定义

因为是下层调用是基于接口调用的,所以须要将discover和register接口化。 在discover发现endpoint有变动时,须要调用callback去更新本地的缓存,所以也须要endpoint的接口。如下

const (    EtcdBackend      = "etcd"    ZookeeperBackend = "zookeeper"    ConsulBackend    = "consul")

// 服务发现接口

type Discover interface {    // Start watch with block, 须要一个callback去更新本地endpoint    Start(callback EndpointCacher)    Stop()}

// 服务注册接口

type Register interface {    Start() error    Stop() error}// endpoint接口type EndpointCacher interface {    AddOrUpdate(endpoint string, attribute []byte)    Delete(endpoint string)    AddError(err error)    Error(err error)}

接口定义完了,须要有连贯注册核心的信息配置。
// 服务发现端配置
type DiscoverConfig struct {

BackendType      string   // one of etcd|consul|zookeeperBackendEndPoints []string // register backend endpointDiscoverPrefix   stringServiceName      stringHostName         string

}

// 注册端配置
type RegisterConfig struct {

BackendType         string   // one of etcd|consul|zookeeperBackendEndPoints    []string // register backend endpointDiscoverPrefix      stringServiceName         stringHeartBeatPeriod     int64ServiceEndPoint     string // register service endpoint to backendAttr                string // custom attribute. like: {"hostname": "xxx", "weight": 1}HealthCheckEndPoint string

}
有了配置之后,须要有注册和服务发现实例的创立办法

// NewDiscover 创立一个服务发现实例func NewDiscover(cfg *DiscoverConfig) (Discover, error) {    switch cfg.BackendType {    case EtcdBackend:        return newEtcdDiscover(cfg)    case ConsulBackend:        return newConsulDiscover(cfg)    case ZookeeperBackend:        return newZookeeperDiscover(cfg)    }    return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)}// NewRegister 创立一个注册实例func NewRegister(cfg *RegisterConfig) (Register, error) {    switch cfg.BackendType {    case EtcdBackend:        return newEtcdRegister(cfg)    case ConsulBackend:        return newConsulRegister(cfg)    case ZookeeperBackend:        return newZookeeperRegister(cfg)    }    return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)}

这里实现了简易版的endpoint,可供参考,次要是将endpoint存在map中,而后有变动时做变更
// LiteEndpoint EndpointCacher lite impl

type LiteEndpoint struct {    Endpoints map[string][]byte `json:"value"`    lock      sync.Mutex    Err       error}func NewLiteEndpoint() *LiteEndpoint {    return &LiteEndpoint{        Endpoints: map[string][]byte{},        lock:      sync.Mutex{},    }}func (e *LiteEndpoint) AddOrUpdate(endpoint string, attribute []byte) {    e.lock.Lock()    defer e.lock.Unlock()    e.Endpoints[endpoint] = attribute}func (e *LiteEndpoint) Delete(endpoint string) {    e.lock.Lock()    defer e.lock.Unlock()    delete(e.Endpoints, endpoint)}func (e *LiteEndpoint) Error(err error) {    e.Err = err}func (e *LiteEndpoint) List() []string {    var endpointSlice []string    for k, _ := range e.Endpoints {        endpointSlice = append(endpointSlice, k)    }    return endpointSlice}func (e *LiteEndpoint) Attr(endpoint string) []byte {    return e.Endpoints[endpoint]}

接口的具体实现

etcd

注册服务
服务注册到etcd后,利用etcd租期的个性,每次续租几秒,在续期过期前实现续租。当实例异样时无奈续租,则会在etcd端该实例会被过期删除,达到下线异样节点的成果。

func newEtcdRegister(cfg *RegisterConfig) (*etcdRegister, error) {    var err error    etcdClient, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})    if err != nil {        return nil, err    }    r := &etcdRegister{        etcdEndpoints:  cfg.BackendEndPoints,        discoverPrefix: cfg.DiscoverPrefix,        serviceName:    cfg.ServiceName,        endpoint:       cfg.ServiceEndPoint,        attr:           cfg.Attr,        ttl:            cfg.HeartBeatPeriod,        stopCh:         make(chan struct{}),        etcdClient:     etcdClient,    }    return r, nil}// Start 开启一个协程func (r *etcdRegister) Start() error {    go r.keepAlive()    return nil}func (r *etcdRegister) Stop() error {    close(r.stopCh)    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)    defer cancel()    _, err := r.etcdClient.Delete(ctx, r.key())    //if r.grpcResolver != nil {    //    return r.grpcResolver.Update(ctx, r.key(), grpcnaming.Update{Op: grpcnaming.Delete, Addr: r.endpoint})    //}    return err}// 定时续租func (r *etcdRegister) keepAlive() {    duration := time.Duration(r.ttl) * time.Second    timer := time.NewTimer(duration)    for {        select {        case <-r.stopCh:            return        case <-timer.C:            if r.leaseID > 0 {                if err := r.leaseRenewal(); err != nil {                    logrus.Warnf("%s leaseid[%x] keepAlive err: %s, try to reset...", r.endpoint, r.leaseID, err.Error())                    r.leaseID = 0                }            } else {                if err := r.register(); err != nil {                    logrus.Warnf("register endpoint %s error: %s", r.endpoint, err.Error())                } else {                    logrus.Infof("register endppint %s success", r.endpoint)                }            }            timer.Reset(duration)        }    }}func (r *etcdRegister) register() error {    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)    defer cancel()    resp, err := r.etcdClient.Grant(ctx, r.ttl+3)    if err != nil {        return err    }    _, err = r.etcdClient.Put(ctx, r.key(), r.attr, clientv3.WithLease(resp.ID))    r.leaseID = resp.ID    return err}func (r *etcdRegister) leaseRenewal() error {    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)    defer cancel()    _, err := r.etcdClient.KeepAliveOnce(ctx, r.leaseID)    return err}func (r *etcdRegister) key() string {    return toEtcdKey(r.discoverPrefix, r.serviceName, r.endpoint)}func toEtcdKey(elem ...string) string {    return strings.Join(elem, "/")}func tailKey(key []byte) string {    keyStr := string(key)    topicSlice := strings.Split(keyStr, "/")    if len(topicSlice) != 0 {        return topicSlice[len(topicSlice)-1]    }    return keyStr}

服务发现端
服务发现端须要一直的监听key的变动,所以须要watch,有更新后须要调用callback来更新本地的endpoint列表

// etcd discover impltype etcdDiscover struct {    ctx        context.Context    cancel     context.CancelFunc    etcdClient *clientv3.Client    prefix     string}func newEtcdDiscover(cfg *DiscoverConfig) (*etcdDiscover, error) {    cli, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})    if err != nil {        return nil, err    }    ctx, cancel := context.WithCancel(context.Background())    return &etcdDiscover{        ctx:        ctx,        cancel:     cancel,        etcdClient: cli,        prefix:     cfg.DiscoverPrefix,    }, nil}func (d *etcdDiscover) Start(callback EndpointCacher) {    d.discover(callback)}func (d *etcdDiscover) discover(callback EndpointCacher) {    ctx, cancel := context.WithCancel(d.ctx)    defer cancel()    if err := d.listService(ctx, callback); err != nil {        callback.AddError(err)    }    watch := d.etcdClient.Watch(ctx, d.prefix, clientv3.WithPrefix())    for {        select {        case <-d.ctx.Done():            return        case resp := <-watch:            if err := resp.Err(); err != nil {                callback.AddError(err)                return            }            for _, event := range resp.Events {                if event.Kv == nil {                    continue                }                switch event.Type {                case mvccpb.PUT:                    callback.AddOrUpdate(tailKey(event.Kv.Key), event.Kv.Value)                case mvccpb.DELETE:                    callback.Delete(tailKey(event.Kv.Key))                }            }        }    }}func (d *etcdDiscover) Stop() {    d.cancel()}func (d *etcdDiscover) listService(ctx context.Context, callback EndpointCacher) error {    resp, err := d.etcdClient.Get(ctx, d.prefix, clientv3.WithPrefix())    if err != nil {        return err    }    for _, kv := range resp.Kvs {        callback.AddOrUpdate(tailKey(kv.Key), kv.Value)    }    return nil}

zookeeper

zookeeper则利用长期节点的个性,来做异样服务下线性能
服务注册端

type zookeeperRegister struct {    zkEndpoints []string    prefix      string    serviceName string    endpoint    string    attr        string    ttl         int64    stopCh      chan struct{}    conn        *zk.Conn}func newZookeeperRegister(cfg *RegisterConfig) (*zookeeperRegister, error) {    r := zookeeperRegister{        zkEndpoints: cfg.BackendEndPoints,        prefix:      cfg.DiscoverPrefix,        serviceName: cfg.ServiceName,        endpoint:    cfg.ServiceEndPoint,        attr:        cfg.Attr,        ttl:         cfg.HeartBeatPeriod,    }    return &r, nil}func (r *zookeeperRegister) Start() error {    var err error    r.conn, _, err = zk.Connect(r.zkEndpoints, time.Second*5)    if err != nil {        return err    }    return r.register()}func (r *zookeeperRegister) Stop() error {    if r.conn != nil {        r.conn.Close()    }    return nil}func (r *zookeeperRegister) register() error {    if err := r.createIfNotExist(r.node(), nil, 0); err != nil {        return err    }    return r.createOrUpdateEndpoint(r.key(), []byte(r.attr))}func (r *zookeeperRegister) createOrUpdateEndpoint(path string, data []byte) error {    exist, _, err := r.conn.Exists(path)    if err != nil {        return err    }    if !exist {        _, err = r.conn.Create(path, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))        if err != nil {            return err        }        return nil    }    _, stat, err := r.conn.Get(path)    if err != nil {        return err    }    _, err = r.conn.Set(path, []byte(r.attr), stat.Version)    if err != nil {        return err    }    return nil}func (r *zookeeperRegister) createIfNotExist(path string, data []byte, flag int32) error {    exist, _, err := r.conn.Exists(r.node())    if err != nil {        return err    }    if !exist {        _, err = r.conn.Create(path, data, flag, zk.WorldACL(zk.PermAll))        if err != nil {            return err        }    }    return nil}func (r *zookeeperRegister) node() string {    return fmt.Sprintf("%s/%s", r.prefix, r.serviceName)}func (r *zookeeperRegister) key() string {    return fmt.Sprintf("%s/%s/%s", r.prefix, r.serviceName, r.endpoint)}

服务发现端

type zookeeperDiscover struct {    ctx         context.Context    cancel      context.CancelFunc    conn        *zk.Conn    prefix      string    serviceName string}func newZookeeperDiscover(cfg *DiscoverConfig) (*zookeeperDiscover, error) {    conn, _, err := zk.Connect(cfg.BackendEndPoints, 10*time.Second)    if err != nil {        return nil, err    }    ctx, cancel := context.WithCancel(context.Background())    d := &zookeeperDiscover{        ctx:         ctx,        cancel:      cancel,        conn:        conn,        prefix:      cfg.DiscoverPrefix,        serviceName: cfg.ServiceName,    }    return d, nil}func (d *zookeeperDiscover) Start(callback EndpointCacher) {    d.discover(callback)}func (d *zookeeperDiscover) Stop() {    d.cancel()    if d.conn != nil {        d.conn.Close()    }}func (d *zookeeperDiscover) discover(callback EndpointCacher) {    if err := d.listService(callback); err != nil {        callback.AddError(err)        return    }    for {        snapshot, _, ch, err := d.conn.ChildrenW(d.key())        if err != nil {            callback.AddError(err)            return        }        select {        case e := <-ch:            switch e.Type {            case zk.EventNodeCreated, zk.EventNodeChildrenChanged:                for _, v := range snapshot {                    callback.Delete(v)                }                if err := d.listService(callback); err != nil {                    callback.AddError(err)                }            case zk.EventNodeDeleted:                for _, v := range snapshot {                    callback.Delete(v)                }            }        }    }}func (d *zookeeperDiscover) getNodeProperty(path string) ([]byte, error) {    value, _, err := d.conn.Get(path)    return value, err}func (d *zookeeperDiscover) listService(callback EndpointCacher) error {    childs, _, err := d.conn.Children(d.key())    if err != nil {        return err    }    for _, c := range childs {        value, _, err := d.conn.Get(fmt.Sprintf("%s/%s", d.key(), c))        if err != nil {            return err        }        callback.AddOrUpdate(c, value)    }    return nil}func (d *zookeeperDiscover) key() string {    return fmt.Sprintf("%s/%s", d.prefix, d.serviceName)}

consul

服务注册端

type consulRegister struct {    prefix              string    serviceName         string    serviceId           string    endpoint            string    healthCheckEndpoint string    attr                string    ttl                 int64    stopCh              chan struct{}    client              *consulapi.Client}func newConsulRegister(cfg *RegisterConfig) (*consulRegister, error) {    config := consulapi.DefaultConfig()    config.Address = strings.Join(cfg.BackendEndPoints, ",")    client, err := consulapi.NewClient(config)    if err != nil {        return nil, err    }    r := &consulRegister{        prefix:              cfg.DiscoverPrefix,        serviceName:         cfg.ServiceName,        endpoint:            cfg.ServiceEndPoint,        healthCheckEndpoint: cfg.HealthCheckEndPoint,        attr:                cfg.Attr,        ttl:                 cfg.HeartBeatPeriod,        stopCh:              make(chan struct{}),        client:              client,    }    return r, nil}func (s *consulRegister) Start() error {    return s.register()}func (s *consulRegister) Stop() error {    if s.serviceId != "" {        return s.client.Agent().ServiceDeregister(s.serviceId)    }    return nil}func (s *consulRegister) register() error {    registration := new(consulapi.AgentServiceRegistration)    address, port, err := net.SplitHostPort(s.endpoint)    if err != nil {        return err    }    registration.Address = address    portInt, err := strconv.Atoi(port)    if err != nil {        return err    }    registration.Port = portInt    serviceId := fmt.Sprintf("%s_%s", s.serviceName, address)    s.serviceId = serviceId    registration.Name = s.serviceName    registration.ID = serviceId    serviceCheck := new(consulapi.AgentServiceCheck)    serviceCheck.HTTP = fmt.Sprintf("http://%s", s.healthCheckEndpoint)    serviceCheck.Timeout = "2s"    serviceCheck.Interval = "2s"    serviceCheck.DeregisterCriticalServiceAfter = "30s"    registration.Check = serviceCheck    return s.client.Agent().ServiceRegister(registration)}

服务发现端

type consulDiscover struct {    ctx         context.Context    cancel      context.CancelFunc    prefix      string    serviceName string    client      *consulapi.Client}func newConsulDiscover(cfg *DiscoverConfig) (*consulDiscover, error) {    config := consulapi.DefaultConfig()    config.Address = strings.Join(cfg.BackendEndPoints, ",")    client, err := consulapi.NewClient(config)    if err != nil {        return nil, err    }    ctx, cancel := context.WithCancel(context.Background())    d := &consulDiscover{        ctx:         ctx,        cancel:      cancel,        prefix:      cfg.DiscoverPrefix,        serviceName: cfg.ServiceName,        client:      client,    }    return d, nil}func (d *consulDiscover) Start(callback EndpointCacher) {    d.discover(callback)}func (d *consulDiscover) Stop() {    d.cancel()}func (d *consulDiscover) discover(callback EndpointCacher) {    var lastIndex uint64    for {        select {        case <-d.ctx.Done():            return        default:            services, queryMeta, err := d.client.Health().Service(                d.serviceName, "", false, &consulapi.QueryOptions{                    WaitIndex: lastIndex,                })            if err != nil {                callback.AddError(err)            }            lastIndex = queryMeta.LastIndex            for _, service := range services {                var attr []byte                endpoint := fmt.Sprintf("%s:%v", service.Service.Address, service.Service.Port)                switch service.Checks.AggregatedStatus() {                case consulapi.HealthPassing:                    if service.Service.Meta != nil {                        attr, _ = json.Marshal(service.Service.Meta)                    }                    callback.AddOrUpdate(endpoint, attr)                case consulapi.HealthCritical, consulapi.HealthWarning:                    callback.Delete(endpoint)                }            }        }    }}func (d *consulDiscover) listService() {    d.client.Agent().Services()}

示例代码

etcd

服务发现端

package mainimport (    "fmt"    "os"        "github.com/goeasya/discox")func main() {    cfg := discox.RegisterConfig{        BackendType:      discox.EtcdBackend,        BackendEndPoints: []string{"http://10.1.1.1:23790"},        DiscoverPrefix:   "/discox/etcddemo",        ServiceName:      "demo",        HeartBeatPeriod:  5,        ServiceEndPoint:  "127.0.0.1:8111",    }    service, err := discox.NewRegister(&cfg)    if err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    if err = service.Start(); err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    defer service.Stop()    select {}}

服务注册端

package mainimport (    "fmt"    "os"    "time"        "github.com/goeasya/discox")func main() {    timer := time.NewTimer(time.Second * 5)    cfg := discox.DiscoverConfig{        BackendEndPoints: []string{"http://10.1.1.1:23790"},        BackendType:      discox.EtcdBackend,        DiscoverPrefix:   "/discox/etcddemo",        ServiceName:      "demo",    }    server, err := discox.NewDiscover(&cfg)    if err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    endpointCacher := discox.NewLiteEndpoint()    go server.Start(endpointCacher)    defer server.Stop()    for {        select {        case <-timer.C:            fmt.Println("time 5 seconds")            endpoints := endpointCacher.List()            fmt.Println(endpoints, len(endpoints))            timer.Reset(time.Second * 5)        }    }}

zookeeper

服务发现端

package mainimport (    "fmt"    "github.com/goeasya/discox"    "os")func main() {    cfg := discox.RegisterConfig{        BackendType:      discox.ZookeeperBackend,        BackendEndPoints: []string{"10.1.1.1:2181"},        DiscoverPrefix:   "/soaservices",        ServiceName:      "demo",        HeartBeatPeriod:  5,        ServiceEndPoint:  "127.0.0.1:8111",    }    service, err := discox.NewRegister(&cfg)    if err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    if err = service.Start(); err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    defer service.Stop()    select {}}

服务注册端

package mainimport (    "fmt"    "github.com/goeasya/discox"    "os"    "time")func main() {    timer := time.NewTimer(time.Second * 5)    cfg := discox.DiscoverConfig{        BackendEndPoints: []string{"10.1.1.1:2181"},        BackendType:      discox.ZookeeperBackend,        DiscoverPrefix:   "/soaservices",        ServiceName:      "demo",    }    server, err := discox.NewDiscover(&cfg)    if err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    endpointCacher := discox.NewLiteEndpoint()    go server.Start(endpointCacher)    defer server.Stop()    for {        select {        case <-timer.C:            fmt.Println("time 5 seconds")            endpoints := endpointCacher.List()            fmt.Println(endpoints, len(endpoints))            timer.Reset(time.Second * 5)        }    }}

consul

服务发现端

package mainimport (    "fmt"    "net/http"    "os"        "github.com/goeasya/discox")func main() {    cfg := discox.RegisterConfig{        BackendType:         discox.ConsulBackend,        BackendEndPoints:    []string{"consul.test.com"},        DiscoverPrefix:      "/soaservices",        ServiceName:         "demo",        HeartBeatPeriod:     5,        ServiceEndPoint:     "172.18.1.1:8080",        HealthCheckEndPoint: "172.18.1.1:8080/check",    }    service, err := discox.NewRegister(&cfg)    if err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    if err = service.Start(); err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    http.HandleFunc("/check", consulCheck)    go http.ListenAndServe(":8080", nil)    defer service.Stop()    select {}}var count int64func consulCheck(w http.ResponseWriter, r *http.Request) {    s := "consulCheck" + fmt.Sprint(count) + "remote:" + r.RemoteAddr + " " + r.URL.String()    fmt.Println(s)    fmt.Fprintln(w, s)    count++}

服务注册端

package mainimport (    "fmt"    "os"    "time"        "github.com/goeasya/discox")func main() {    timer := time.NewTimer(time.Second * 5)    cfg := discox.DiscoverConfig{        BackendEndPoints: []string{"consul.test.com"},        BackendType:      discox.ConsulBackend,        DiscoverPrefix:   "/soaservices",        ServiceName:      "demo",    }    server, err := discox.NewDiscover(&cfg)    if err != nil {        fmt.Println(err.Error())        os.Exit(1)    }    endpointCacher := discox.NewLiteEndpoint()    go server.Start(endpointCacher)    defer server.Stop()    for {        select {        case <-timer.C:            fmt.Println("time 5 seconds")            endpoints := endpointCacher.List()            fmt.Println(endpoints, len(endpoints))            timer.Reset(time.Second * 5)        }    }}

代码github地址https://github.com/goeasya/di...