在微服务中,服务注册与发现是必不可少的一环,其中etcd,zookeeper,consul在golang中较为罕用。
程序对于这种中间件的依赖,都倡议加一层接口,基于接口去实现
上面会分层三大板块去阐明
接口定义
因为是下层调用是基于接口调用的,所以须要将discover和register接口化。 在discover发现endpoint有变动时,须要调用callback去更新本地的缓存,所以也须要endpoint的接口。如下
const ( EtcdBackend = "etcd" ZookeeperBackend = "zookeeper" ConsulBackend = "consul")
// 服务发现接口
type Discover interface { // Start watch with block, 须要一个callback去更新本地endpoint Start(callback EndpointCacher) Stop()}
// 服务注册接口
type Register interface { Start() error Stop() error}// endpoint接口type EndpointCacher interface { AddOrUpdate(endpoint string, attribute []byte) Delete(endpoint string) AddError(err error) Error(err error)}
接口定义完了,须要有连贯注册核心的信息配置。
// 服务发现端配置
type DiscoverConfig struct {
BackendType string // one of etcd|consul|zookeeperBackendEndPoints []string // register backend endpointDiscoverPrefix stringServiceName stringHostName string
}
// 注册端配置
type RegisterConfig struct {
BackendType string // one of etcd|consul|zookeeperBackendEndPoints []string // register backend endpointDiscoverPrefix stringServiceName stringHeartBeatPeriod int64ServiceEndPoint string // register service endpoint to backendAttr string // custom attribute. like: {"hostname": "xxx", "weight": 1}HealthCheckEndPoint string
}
有了配置之后,须要有注册和服务发现实例的创立办法
// NewDiscover 创立一个服务发现实例func NewDiscover(cfg *DiscoverConfig) (Discover, error) { switch cfg.BackendType { case EtcdBackend: return newEtcdDiscover(cfg) case ConsulBackend: return newConsulDiscover(cfg) case ZookeeperBackend: return newZookeeperDiscover(cfg) } return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)}// NewRegister 创立一个注册实例func NewRegister(cfg *RegisterConfig) (Register, error) { switch cfg.BackendType { case EtcdBackend: return newEtcdRegister(cfg) case ConsulBackend: return newConsulRegister(cfg) case ZookeeperBackend: return newZookeeperRegister(cfg) } return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)}
这里实现了简易版的endpoint,可供参考,次要是将endpoint存在map中,而后有变动时做变更
// LiteEndpoint EndpointCacher lite impl
type LiteEndpoint struct { Endpoints map[string][]byte `json:"value"` lock sync.Mutex Err error}func NewLiteEndpoint() *LiteEndpoint { return &LiteEndpoint{ Endpoints: map[string][]byte{}, lock: sync.Mutex{}, }}func (e *LiteEndpoint) AddOrUpdate(endpoint string, attribute []byte) { e.lock.Lock() defer e.lock.Unlock() e.Endpoints[endpoint] = attribute}func (e *LiteEndpoint) Delete(endpoint string) { e.lock.Lock() defer e.lock.Unlock() delete(e.Endpoints, endpoint)}func (e *LiteEndpoint) Error(err error) { e.Err = err}func (e *LiteEndpoint) List() []string { var endpointSlice []string for k, _ := range e.Endpoints { endpointSlice = append(endpointSlice, k) } return endpointSlice}func (e *LiteEndpoint) Attr(endpoint string) []byte { return e.Endpoints[endpoint]}
接口的具体实现
etcd
注册服务
服务注册到etcd后,利用etcd租期的个性,每次续租几秒,在续期过期前实现续租。当实例异样时无奈续租,则会在etcd端该实例会被过期删除,达到下线异样节点的成果。
func newEtcdRegister(cfg *RegisterConfig) (*etcdRegister, error) { var err error etcdClient, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints}) if err != nil { return nil, err } r := &etcdRegister{ etcdEndpoints: cfg.BackendEndPoints, discoverPrefix: cfg.DiscoverPrefix, serviceName: cfg.ServiceName, endpoint: cfg.ServiceEndPoint, attr: cfg.Attr, ttl: cfg.HeartBeatPeriod, stopCh: make(chan struct{}), etcdClient: etcdClient, } return r, nil}// Start 开启一个协程func (r *etcdRegister) Start() error { go r.keepAlive() return nil}func (r *etcdRegister) Stop() error { close(r.stopCh) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _, err := r.etcdClient.Delete(ctx, r.key()) //if r.grpcResolver != nil { // return r.grpcResolver.Update(ctx, r.key(), grpcnaming.Update{Op: grpcnaming.Delete, Addr: r.endpoint}) //} return err}// 定时续租func (r *etcdRegister) keepAlive() { duration := time.Duration(r.ttl) * time.Second timer := time.NewTimer(duration) for { select { case <-r.stopCh: return case <-timer.C: if r.leaseID > 0 { if err := r.leaseRenewal(); err != nil { logrus.Warnf("%s leaseid[%x] keepAlive err: %s, try to reset...", r.endpoint, r.leaseID, err.Error()) r.leaseID = 0 } } else { if err := r.register(); err != nil { logrus.Warnf("register endpoint %s error: %s", r.endpoint, err.Error()) } else { logrus.Infof("register endppint %s success", r.endpoint) } } timer.Reset(duration) } }}func (r *etcdRegister) register() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() resp, err := r.etcdClient.Grant(ctx, r.ttl+3) if err != nil { return err } _, err = r.etcdClient.Put(ctx, r.key(), r.attr, clientv3.WithLease(resp.ID)) r.leaseID = resp.ID return err}func (r *etcdRegister) leaseRenewal() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _, err := r.etcdClient.KeepAliveOnce(ctx, r.leaseID) return err}func (r *etcdRegister) key() string { return toEtcdKey(r.discoverPrefix, r.serviceName, r.endpoint)}func toEtcdKey(elem ...string) string { return strings.Join(elem, "/")}func tailKey(key []byte) string { keyStr := string(key) topicSlice := strings.Split(keyStr, "/") if len(topicSlice) != 0 { return topicSlice[len(topicSlice)-1] } return keyStr}
服务发现端
服务发现端须要一直的监听key的变动,所以须要watch,有更新后须要调用callback来更新本地的endpoint列表
// etcd discover impltype etcdDiscover struct { ctx context.Context cancel context.CancelFunc etcdClient *clientv3.Client prefix string}func newEtcdDiscover(cfg *DiscoverConfig) (*etcdDiscover, error) { cli, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints}) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) return &etcdDiscover{ ctx: ctx, cancel: cancel, etcdClient: cli, prefix: cfg.DiscoverPrefix, }, nil}func (d *etcdDiscover) Start(callback EndpointCacher) { d.discover(callback)}func (d *etcdDiscover) discover(callback EndpointCacher) { ctx, cancel := context.WithCancel(d.ctx) defer cancel() if err := d.listService(ctx, callback); err != nil { callback.AddError(err) } watch := d.etcdClient.Watch(ctx, d.prefix, clientv3.WithPrefix()) for { select { case <-d.ctx.Done(): return case resp := <-watch: if err := resp.Err(); err != nil { callback.AddError(err) return } for _, event := range resp.Events { if event.Kv == nil { continue } switch event.Type { case mvccpb.PUT: callback.AddOrUpdate(tailKey(event.Kv.Key), event.Kv.Value) case mvccpb.DELETE: callback.Delete(tailKey(event.Kv.Key)) } } } }}func (d *etcdDiscover) Stop() { d.cancel()}func (d *etcdDiscover) listService(ctx context.Context, callback EndpointCacher) error { resp, err := d.etcdClient.Get(ctx, d.prefix, clientv3.WithPrefix()) if err != nil { return err } for _, kv := range resp.Kvs { callback.AddOrUpdate(tailKey(kv.Key), kv.Value) } return nil}
zookeeper
zookeeper则利用长期节点的个性,来做异样服务下线性能
服务注册端
type zookeeperRegister struct { zkEndpoints []string prefix string serviceName string endpoint string attr string ttl int64 stopCh chan struct{} conn *zk.Conn}func newZookeeperRegister(cfg *RegisterConfig) (*zookeeperRegister, error) { r := zookeeperRegister{ zkEndpoints: cfg.BackendEndPoints, prefix: cfg.DiscoverPrefix, serviceName: cfg.ServiceName, endpoint: cfg.ServiceEndPoint, attr: cfg.Attr, ttl: cfg.HeartBeatPeriod, } return &r, nil}func (r *zookeeperRegister) Start() error { var err error r.conn, _, err = zk.Connect(r.zkEndpoints, time.Second*5) if err != nil { return err } return r.register()}func (r *zookeeperRegister) Stop() error { if r.conn != nil { r.conn.Close() } return nil}func (r *zookeeperRegister) register() error { if err := r.createIfNotExist(r.node(), nil, 0); err != nil { return err } return r.createOrUpdateEndpoint(r.key(), []byte(r.attr))}func (r *zookeeperRegister) createOrUpdateEndpoint(path string, data []byte) error { exist, _, err := r.conn.Exists(path) if err != nil { return err } if !exist { _, err = r.conn.Create(path, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) if err != nil { return err } return nil } _, stat, err := r.conn.Get(path) if err != nil { return err } _, err = r.conn.Set(path, []byte(r.attr), stat.Version) if err != nil { return err } return nil}func (r *zookeeperRegister) createIfNotExist(path string, data []byte, flag int32) error { exist, _, err := r.conn.Exists(r.node()) if err != nil { return err } if !exist { _, err = r.conn.Create(path, data, flag, zk.WorldACL(zk.PermAll)) if err != nil { return err } } return nil}func (r *zookeeperRegister) node() string { return fmt.Sprintf("%s/%s", r.prefix, r.serviceName)}func (r *zookeeperRegister) key() string { return fmt.Sprintf("%s/%s/%s", r.prefix, r.serviceName, r.endpoint)}
服务发现端
type zookeeperDiscover struct { ctx context.Context cancel context.CancelFunc conn *zk.Conn prefix string serviceName string}func newZookeeperDiscover(cfg *DiscoverConfig) (*zookeeperDiscover, error) { conn, _, err := zk.Connect(cfg.BackendEndPoints, 10*time.Second) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) d := &zookeeperDiscover{ ctx: ctx, cancel: cancel, conn: conn, prefix: cfg.DiscoverPrefix, serviceName: cfg.ServiceName, } return d, nil}func (d *zookeeperDiscover) Start(callback EndpointCacher) { d.discover(callback)}func (d *zookeeperDiscover) Stop() { d.cancel() if d.conn != nil { d.conn.Close() }}func (d *zookeeperDiscover) discover(callback EndpointCacher) { if err := d.listService(callback); err != nil { callback.AddError(err) return } for { snapshot, _, ch, err := d.conn.ChildrenW(d.key()) if err != nil { callback.AddError(err) return } select { case e := <-ch: switch e.Type { case zk.EventNodeCreated, zk.EventNodeChildrenChanged: for _, v := range snapshot { callback.Delete(v) } if err := d.listService(callback); err != nil { callback.AddError(err) } case zk.EventNodeDeleted: for _, v := range snapshot { callback.Delete(v) } } } }}func (d *zookeeperDiscover) getNodeProperty(path string) ([]byte, error) { value, _, err := d.conn.Get(path) return value, err}func (d *zookeeperDiscover) listService(callback EndpointCacher) error { childs, _, err := d.conn.Children(d.key()) if err != nil { return err } for _, c := range childs { value, _, err := d.conn.Get(fmt.Sprintf("%s/%s", d.key(), c)) if err != nil { return err } callback.AddOrUpdate(c, value) } return nil}func (d *zookeeperDiscover) key() string { return fmt.Sprintf("%s/%s", d.prefix, d.serviceName)}
consul
服务注册端
type consulRegister struct { prefix string serviceName string serviceId string endpoint string healthCheckEndpoint string attr string ttl int64 stopCh chan struct{} client *consulapi.Client}func newConsulRegister(cfg *RegisterConfig) (*consulRegister, error) { config := consulapi.DefaultConfig() config.Address = strings.Join(cfg.BackendEndPoints, ",") client, err := consulapi.NewClient(config) if err != nil { return nil, err } r := &consulRegister{ prefix: cfg.DiscoverPrefix, serviceName: cfg.ServiceName, endpoint: cfg.ServiceEndPoint, healthCheckEndpoint: cfg.HealthCheckEndPoint, attr: cfg.Attr, ttl: cfg.HeartBeatPeriod, stopCh: make(chan struct{}), client: client, } return r, nil}func (s *consulRegister) Start() error { return s.register()}func (s *consulRegister) Stop() error { if s.serviceId != "" { return s.client.Agent().ServiceDeregister(s.serviceId) } return nil}func (s *consulRegister) register() error { registration := new(consulapi.AgentServiceRegistration) address, port, err := net.SplitHostPort(s.endpoint) if err != nil { return err } registration.Address = address portInt, err := strconv.Atoi(port) if err != nil { return err } registration.Port = portInt serviceId := fmt.Sprintf("%s_%s", s.serviceName, address) s.serviceId = serviceId registration.Name = s.serviceName registration.ID = serviceId serviceCheck := new(consulapi.AgentServiceCheck) serviceCheck.HTTP = fmt.Sprintf("http://%s", s.healthCheckEndpoint) serviceCheck.Timeout = "2s" serviceCheck.Interval = "2s" serviceCheck.DeregisterCriticalServiceAfter = "30s" registration.Check = serviceCheck return s.client.Agent().ServiceRegister(registration)}
服务发现端
type consulDiscover struct { ctx context.Context cancel context.CancelFunc prefix string serviceName string client *consulapi.Client}func newConsulDiscover(cfg *DiscoverConfig) (*consulDiscover, error) { config := consulapi.DefaultConfig() config.Address = strings.Join(cfg.BackendEndPoints, ",") client, err := consulapi.NewClient(config) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) d := &consulDiscover{ ctx: ctx, cancel: cancel, prefix: cfg.DiscoverPrefix, serviceName: cfg.ServiceName, client: client, } return d, nil}func (d *consulDiscover) Start(callback EndpointCacher) { d.discover(callback)}func (d *consulDiscover) Stop() { d.cancel()}func (d *consulDiscover) discover(callback EndpointCacher) { var lastIndex uint64 for { select { case <-d.ctx.Done(): return default: services, queryMeta, err := d.client.Health().Service( d.serviceName, "", false, &consulapi.QueryOptions{ WaitIndex: lastIndex, }) if err != nil { callback.AddError(err) } lastIndex = queryMeta.LastIndex for _, service := range services { var attr []byte endpoint := fmt.Sprintf("%s:%v", service.Service.Address, service.Service.Port) switch service.Checks.AggregatedStatus() { case consulapi.HealthPassing: if service.Service.Meta != nil { attr, _ = json.Marshal(service.Service.Meta) } callback.AddOrUpdate(endpoint, attr) case consulapi.HealthCritical, consulapi.HealthWarning: callback.Delete(endpoint) } } } }}func (d *consulDiscover) listService() { d.client.Agent().Services()}
示例代码
etcd
服务发现端
package mainimport ( "fmt" "os" "github.com/goeasya/discox")func main() { cfg := discox.RegisterConfig{ BackendType: discox.EtcdBackend, BackendEndPoints: []string{"http://10.1.1.1:23790"}, DiscoverPrefix: "/discox/etcddemo", ServiceName: "demo", HeartBeatPeriod: 5, ServiceEndPoint: "127.0.0.1:8111", } service, err := discox.NewRegister(&cfg) if err != nil { fmt.Println(err.Error()) os.Exit(1) } if err = service.Start(); err != nil { fmt.Println(err.Error()) os.Exit(1) } defer service.Stop() select {}}
服务注册端
package mainimport ( "fmt" "os" "time" "github.com/goeasya/discox")func main() { timer := time.NewTimer(time.Second * 5) cfg := discox.DiscoverConfig{ BackendEndPoints: []string{"http://10.1.1.1:23790"}, BackendType: discox.EtcdBackend, DiscoverPrefix: "/discox/etcddemo", ServiceName: "demo", } server, err := discox.NewDiscover(&cfg) if err != nil { fmt.Println(err.Error()) os.Exit(1) } endpointCacher := discox.NewLiteEndpoint() go server.Start(endpointCacher) defer server.Stop() for { select { case <-timer.C: fmt.Println("time 5 seconds") endpoints := endpointCacher.List() fmt.Println(endpoints, len(endpoints)) timer.Reset(time.Second * 5) } }}
zookeeper
服务发现端
package mainimport ( "fmt" "github.com/goeasya/discox" "os")func main() { cfg := discox.RegisterConfig{ BackendType: discox.ZookeeperBackend, BackendEndPoints: []string{"10.1.1.1:2181"}, DiscoverPrefix: "/soaservices", ServiceName: "demo", HeartBeatPeriod: 5, ServiceEndPoint: "127.0.0.1:8111", } service, err := discox.NewRegister(&cfg) if err != nil { fmt.Println(err.Error()) os.Exit(1) } if err = service.Start(); err != nil { fmt.Println(err.Error()) os.Exit(1) } defer service.Stop() select {}}
服务注册端
package mainimport ( "fmt" "github.com/goeasya/discox" "os" "time")func main() { timer := time.NewTimer(time.Second * 5) cfg := discox.DiscoverConfig{ BackendEndPoints: []string{"10.1.1.1:2181"}, BackendType: discox.ZookeeperBackend, DiscoverPrefix: "/soaservices", ServiceName: "demo", } server, err := discox.NewDiscover(&cfg) if err != nil { fmt.Println(err.Error()) os.Exit(1) } endpointCacher := discox.NewLiteEndpoint() go server.Start(endpointCacher) defer server.Stop() for { select { case <-timer.C: fmt.Println("time 5 seconds") endpoints := endpointCacher.List() fmt.Println(endpoints, len(endpoints)) timer.Reset(time.Second * 5) } }}
consul
服务发现端
package mainimport ( "fmt" "net/http" "os" "github.com/goeasya/discox")func main() { cfg := discox.RegisterConfig{ BackendType: discox.ConsulBackend, BackendEndPoints: []string{"consul.test.com"}, DiscoverPrefix: "/soaservices", ServiceName: "demo", HeartBeatPeriod: 5, ServiceEndPoint: "172.18.1.1:8080", HealthCheckEndPoint: "172.18.1.1:8080/check", } service, err := discox.NewRegister(&cfg) if err != nil { fmt.Println(err.Error()) os.Exit(1) } if err = service.Start(); err != nil { fmt.Println(err.Error()) os.Exit(1) } http.HandleFunc("/check", consulCheck) go http.ListenAndServe(":8080", nil) defer service.Stop() select {}}var count int64func consulCheck(w http.ResponseWriter, r *http.Request) { s := "consulCheck" + fmt.Sprint(count) + "remote:" + r.RemoteAddr + " " + r.URL.String() fmt.Println(s) fmt.Fprintln(w, s) count++}
服务注册端
package mainimport ( "fmt" "os" "time" "github.com/goeasya/discox")func main() { timer := time.NewTimer(time.Second * 5) cfg := discox.DiscoverConfig{ BackendEndPoints: []string{"consul.test.com"}, BackendType: discox.ConsulBackend, DiscoverPrefix: "/soaservices", ServiceName: "demo", } server, err := discox.NewDiscover(&cfg) if err != nil { fmt.Println(err.Error()) os.Exit(1) } endpointCacher := discox.NewLiteEndpoint() go server.Start(endpointCacher) defer server.Stop() for { select { case <-timer.C: fmt.Println("time 5 seconds") endpoints := endpointCacher.List() fmt.Println(endpoints, len(endpoints)) timer.Reset(time.Second * 5) } }}
代码github地址https://github.com/goeasya/di...