共计 15579 个字符,预计需要花费 39 分钟才能阅读完成。
在微服务中,服务注册与发现是必不可少的一环,其中 etcd,zookeeper,consul 在 golang 中较为罕用。
程序对于这种中间件的依赖,都倡议加一层接口,基于接口去实现
上面会分层三大板块去阐明
接口定义
因为是下层调用是基于接口调用的,所以须要将 discover 和 register 接口化。在 discover 发现 endpoint 有变动时,须要调用 callback 去更新本地的缓存,所以也须要 endpoint 的接口。如下
const (
EtcdBackend = "etcd"
ZookeeperBackend = "zookeeper"
ConsulBackend = "consul"
)
// 服务发现接口
type Discover interface {
// Start watch with block, 须要一个 callback 去更新本地 endpoint
Start(callback EndpointCacher)
Stop()}
// 服务注册接口
type Register interface {Start() error
Stop() error}
// endpoint 接口
type EndpointCacher interface {AddOrUpdate(endpoint string, attribute []byte)
Delete(endpoint string)
AddError(err error)
Error(err error)
}
接口定义完了,须要有连贯注册核心的信息配置。
// 服务发现端配置
type DiscoverConfig struct {
BackendType string // one of etcd|consul|zookeeper
BackendEndPoints []string // register backend endpoint
DiscoverPrefix string
ServiceName string
HostName string
}
// 注册端配置
type RegisterConfig struct {
BackendType string // one of etcd|consul|zookeeper
BackendEndPoints []string // register backend endpoint
DiscoverPrefix string
ServiceName string
HeartBeatPeriod int64
ServiceEndPoint string // register service endpoint to backend
Attr string // custom attribute. like: {"hostname": "xxx", "weight": 1}
HealthCheckEndPoint string
}
有了配置之后,须要有注册和服务发现实例的创立办法
// NewDiscover 创立一个服务发现实例
func NewDiscover(cfg *DiscoverConfig) (Discover, error) {
switch cfg.BackendType {
case EtcdBackend:
return newEtcdDiscover(cfg)
case ConsulBackend:
return newConsulDiscover(cfg)
case ZookeeperBackend:
return newZookeeperDiscover(cfg)
}
return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}
// NewRegister 创立一个注册实例
func NewRegister(cfg *RegisterConfig) (Register, error) {
switch cfg.BackendType {
case EtcdBackend:
return newEtcdRegister(cfg)
case ConsulBackend:
return newConsulRegister(cfg)
case ZookeeperBackend:
return newZookeeperRegister(cfg)
}
return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}
这里实现了简易版的 endpoint,可供参考,次要是将 endpoint 存在 map 中,而后有变动时做变更
// LiteEndpoint EndpointCacher lite impl
type LiteEndpoint struct {Endpoints map[string][]byte `json:"value"`
lock sync.Mutex
Err error
}
func NewLiteEndpoint() *LiteEndpoint {
return &LiteEndpoint{Endpoints: map[string][]byte{},
lock: sync.Mutex{},}
}
func (e *LiteEndpoint) AddOrUpdate(endpoint string, attribute []byte) {e.lock.Lock()
defer e.lock.Unlock()
e.Endpoints[endpoint] = attribute
}
func (e *LiteEndpoint) Delete(endpoint string) {e.lock.Lock()
defer e.lock.Unlock()
delete(e.Endpoints, endpoint)
}
func (e *LiteEndpoint) Error(err error) {e.Err = err}
func (e *LiteEndpoint) List() []string {var endpointSlice []string
for k, _ := range e.Endpoints {endpointSlice = append(endpointSlice, k)
}
return endpointSlice
}
func (e *LiteEndpoint) Attr(endpoint string) []byte {return e.Endpoints[endpoint]
}
接口的具体实现
etcd
注册服务
服务注册到 etcd 后,利用 etcd 租期的个性,每次续租几秒,在续期过期前实现续租。当实例异样时无奈续租,则会在 etcd 端该实例会被过期删除,达到下线异样节点的成果。
func newEtcdRegister(cfg *RegisterConfig) (*etcdRegister, error) {
var err error
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
if err != nil {return nil, err}
r := &etcdRegister{
etcdEndpoints: cfg.BackendEndPoints,
discoverPrefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
endpoint: cfg.ServiceEndPoint,
attr: cfg.Attr,
ttl: cfg.HeartBeatPeriod,
stopCh: make(chan struct{}),
etcdClient: etcdClient,
}
return r, nil
}
// Start 开启一个协程
func (r *etcdRegister) Start() error {go r.keepAlive()
return nil
}
func (r *etcdRegister) Stop() error {close(r.stopCh)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := r.etcdClient.Delete(ctx, r.key())
//if r.grpcResolver != nil {// return r.grpcResolver.Update(ctx, r.key(), grpcnaming.Update{Op: grpcnaming.Delete, Addr: r.endpoint})
//}
return err
}
// 定时续租
func (r *etcdRegister) keepAlive() {duration := time.Duration(r.ttl) * time.Second
timer := time.NewTimer(duration)
for {
select {
case <-r.stopCh:
return
case <-timer.C:
if r.leaseID > 0 {if err := r.leaseRenewal(); err != nil {logrus.Warnf("%s leaseid[%x] keepAlive err: %s, try to reset...", r.endpoint, r.leaseID, err.Error())
r.leaseID = 0
}
} else {if err := r.register(); err != nil {logrus.Warnf("register endpoint %s error: %s", r.endpoint, err.Error())
} else {logrus.Infof("register endppint %s success", r.endpoint)
}
}
timer.Reset(duration)
}
}
}
func (r *etcdRegister) register() error {ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
resp, err := r.etcdClient.Grant(ctx, r.ttl+3)
if err != nil {return err}
_, err = r.etcdClient.Put(ctx, r.key(), r.attr, clientv3.WithLease(resp.ID))
r.leaseID = resp.ID
return err
}
func (r *etcdRegister) leaseRenewal() error {ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := r.etcdClient.KeepAliveOnce(ctx, r.leaseID)
return err
}
func (r *etcdRegister) key() string {return toEtcdKey(r.discoverPrefix, r.serviceName, r.endpoint)
}
func toEtcdKey(elem ...string) string {return strings.Join(elem, "/")
}
func tailKey(key []byte) string {keyStr := string(key)
topicSlice := strings.Split(keyStr, "/")
if len(topicSlice) != 0 {return topicSlice[len(topicSlice)-1]
}
return keyStr
}
服务发现端
服务发现端须要一直的监听 key 的变动,所以须要 watch,有更新后须要调用 callback 来更新本地的 endpoint 列表
// etcd discover impl
type etcdDiscover struct {
ctx context.Context
cancel context.CancelFunc
etcdClient *clientv3.Client
prefix string
}
func newEtcdDiscover(cfg *DiscoverConfig) (*etcdDiscover, error) {cli, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
if err != nil {return nil, err}
ctx, cancel := context.WithCancel(context.Background())
return &etcdDiscover{
ctx: ctx,
cancel: cancel,
etcdClient: cli,
prefix: cfg.DiscoverPrefix,
}, nil
}
func (d *etcdDiscover) Start(callback EndpointCacher) {d.discover(callback)
}
func (d *etcdDiscover) discover(callback EndpointCacher) {ctx, cancel := context.WithCancel(d.ctx)
defer cancel()
if err := d.listService(ctx, callback); err != nil {callback.AddError(err)
}
watch := d.etcdClient.Watch(ctx, d.prefix, clientv3.WithPrefix())
for {
select {case <-d.ctx.Done():
return
case resp := <-watch:
if err := resp.Err(); err != nil {callback.AddError(err)
return
}
for _, event := range resp.Events {
if event.Kv == nil {continue}
switch event.Type {
case mvccpb.PUT:
callback.AddOrUpdate(tailKey(event.Kv.Key), event.Kv.Value)
case mvccpb.DELETE:
callback.Delete(tailKey(event.Kv.Key))
}
}
}
}
}
func (d *etcdDiscover) Stop() {d.cancel()
}
func (d *etcdDiscover) listService(ctx context.Context, callback EndpointCacher) error {resp, err := d.etcdClient.Get(ctx, d.prefix, clientv3.WithPrefix())
if err != nil {return err}
for _, kv := range resp.Kvs {callback.AddOrUpdate(tailKey(kv.Key), kv.Value)
}
return nil
}
zookeeper
zookeeper 则利用长期节点的个性,来做异样服务下线性能
服务注册端
type zookeeperRegister struct {zkEndpoints []string
prefix string
serviceName string
endpoint string
attr string
ttl int64
stopCh chan struct{}
conn *zk.Conn
}
func newZookeeperRegister(cfg *RegisterConfig) (*zookeeperRegister, error) {
r := zookeeperRegister{
zkEndpoints: cfg.BackendEndPoints,
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
endpoint: cfg.ServiceEndPoint,
attr: cfg.Attr,
ttl: cfg.HeartBeatPeriod,
}
return &r, nil
}
func (r *zookeeperRegister) Start() error {
var err error
r.conn, _, err = zk.Connect(r.zkEndpoints, time.Second*5)
if err != nil {return err}
return r.register()}
func (r *zookeeperRegister) Stop() error {
if r.conn != nil {r.conn.Close()
}
return nil
}
func (r *zookeeperRegister) register() error {if err := r.createIfNotExist(r.node(), nil, 0); err != nil {return err}
return r.createOrUpdateEndpoint(r.key(), []byte(r.attr))
}
func (r *zookeeperRegister) createOrUpdateEndpoint(path string, data []byte) error {exist, _, err := r.conn.Exists(path)
if err != nil {return err}
if !exist {_, err = r.conn.Create(path, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {return err}
return nil
}
_, stat, err := r.conn.Get(path)
if err != nil {return err}
_, err = r.conn.Set(path, []byte(r.attr), stat.Version)
if err != nil {return err}
return nil
}
func (r *zookeeperRegister) createIfNotExist(path string, data []byte, flag int32) error {exist, _, err := r.conn.Exists(r.node())
if err != nil {return err}
if !exist {_, err = r.conn.Create(path, data, flag, zk.WorldACL(zk.PermAll))
if err != nil {return err}
}
return nil
}
func (r *zookeeperRegister) node() string {return fmt.Sprintf("%s/%s", r.prefix, r.serviceName)
}
func (r *zookeeperRegister) key() string {return fmt.Sprintf("%s/%s/%s", r.prefix, r.serviceName, r.endpoint)
}
服务发现端
type zookeeperDiscover struct {
ctx context.Context
cancel context.CancelFunc
conn *zk.Conn
prefix string
serviceName string
}
func newZookeeperDiscover(cfg *DiscoverConfig) (*zookeeperDiscover, error) {conn, _, err := zk.Connect(cfg.BackendEndPoints, 10*time.Second)
if err != nil {return nil, err}
ctx, cancel := context.WithCancel(context.Background())
d := &zookeeperDiscover{
ctx: ctx,
cancel: cancel,
conn: conn,
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
}
return d, nil
}
func (d *zookeeperDiscover) Start(callback EndpointCacher) {d.discover(callback)
}
func (d *zookeeperDiscover) Stop() {d.cancel()
if d.conn != nil {d.conn.Close()
}
}
func (d *zookeeperDiscover) discover(callback EndpointCacher) {if err := d.listService(callback); err != nil {callback.AddError(err)
return
}
for {snapshot, _, ch, err := d.conn.ChildrenW(d.key())
if err != nil {callback.AddError(err)
return
}
select {
case e := <-ch:
switch e.Type {
case zk.EventNodeCreated, zk.EventNodeChildrenChanged:
for _, v := range snapshot {callback.Delete(v)
}
if err := d.listService(callback); err != nil {callback.AddError(err)
}
case zk.EventNodeDeleted:
for _, v := range snapshot {callback.Delete(v)
}
}
}
}
}
func (d *zookeeperDiscover) getNodeProperty(path string) ([]byte, error) {value, _, err := d.conn.Get(path)
return value, err
}
func (d *zookeeperDiscover) listService(callback EndpointCacher) error {childs, _, err := d.conn.Children(d.key())
if err != nil {return err}
for _, c := range childs {value, _, err := d.conn.Get(fmt.Sprintf("%s/%s", d.key(), c))
if err != nil {return err}
callback.AddOrUpdate(c, value)
}
return nil
}
func (d *zookeeperDiscover) key() string {return fmt.Sprintf("%s/%s", d.prefix, d.serviceName)
}
consul
服务注册端
type consulRegister struct {
prefix string
serviceName string
serviceId string
endpoint string
healthCheckEndpoint string
attr string
ttl int64
stopCh chan struct{}
client *consulapi.Client
}
func newConsulRegister(cfg *RegisterConfig) (*consulRegister, error) {config := consulapi.DefaultConfig()
config.Address = strings.Join(cfg.BackendEndPoints, ",")
client, err := consulapi.NewClient(config)
if err != nil {return nil, err}
r := &consulRegister{
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
endpoint: cfg.ServiceEndPoint,
healthCheckEndpoint: cfg.HealthCheckEndPoint,
attr: cfg.Attr,
ttl: cfg.HeartBeatPeriod,
stopCh: make(chan struct{}),
client: client,
}
return r, nil
}
func (s *consulRegister) Start() error {return s.register()
}
func (s *consulRegister) Stop() error {
if s.serviceId != "" {return s.client.Agent().ServiceDeregister(s.serviceId)
}
return nil
}
func (s *consulRegister) register() error {registration := new(consulapi.AgentServiceRegistration)
address, port, err := net.SplitHostPort(s.endpoint)
if err != nil {return err}
registration.Address = address
portInt, err := strconv.Atoi(port)
if err != nil {return err}
registration.Port = portInt
serviceId := fmt.Sprintf("%s_%s", s.serviceName, address)
s.serviceId = serviceId
registration.Name = s.serviceName
registration.ID = serviceId
serviceCheck := new(consulapi.AgentServiceCheck)
serviceCheck.HTTP = fmt.Sprintf("http://%s", s.healthCheckEndpoint)
serviceCheck.Timeout = "2s"
serviceCheck.Interval = "2s"
serviceCheck.DeregisterCriticalServiceAfter = "30s"
registration.Check = serviceCheck
return s.client.Agent().ServiceRegister(registration)
}
服务发现端
type consulDiscover struct {
ctx context.Context
cancel context.CancelFunc
prefix string
serviceName string
client *consulapi.Client
}
func newConsulDiscover(cfg *DiscoverConfig) (*consulDiscover, error) {config := consulapi.DefaultConfig()
config.Address = strings.Join(cfg.BackendEndPoints, ",")
client, err := consulapi.NewClient(config)
if err != nil {return nil, err}
ctx, cancel := context.WithCancel(context.Background())
d := &consulDiscover{
ctx: ctx,
cancel: cancel,
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
client: client,
}
return d, nil
}
func (d *consulDiscover) Start(callback EndpointCacher) {d.discover(callback)
}
func (d *consulDiscover) Stop() {d.cancel()
}
func (d *consulDiscover) discover(callback EndpointCacher) {
var lastIndex uint64
for {
select {case <-d.ctx.Done():
return
default:
services, queryMeta, err := d.client.Health().Service(
d.serviceName, "", false, &consulapi.QueryOptions{WaitIndex: lastIndex,})
if err != nil {callback.AddError(err)
}
lastIndex = queryMeta.LastIndex
for _, service := range services {var attr []byte
endpoint := fmt.Sprintf("%s:%v", service.Service.Address, service.Service.Port)
switch service.Checks.AggregatedStatus() {
case consulapi.HealthPassing:
if service.Service.Meta != nil {attr, _ = json.Marshal(service.Service.Meta)
}
callback.AddOrUpdate(endpoint, attr)
case consulapi.HealthCritical, consulapi.HealthWarning:
callback.Delete(endpoint)
}
}
}
}
}
func (d *consulDiscover) listService() {d.client.Agent().Services()}
示例代码
etcd
服务发现端
package main
import (
"fmt"
"os"
"github.com/goeasya/discox"
)
func main() {
cfg := discox.RegisterConfig{
BackendType: discox.EtcdBackend,
BackendEndPoints: []string{"http://10.1.1.1:23790"},
DiscoverPrefix: "/discox/etcddemo",
ServiceName: "demo",
HeartBeatPeriod: 5,
ServiceEndPoint: "127.0.0.1:8111",
}
service, err := discox.NewRegister(&cfg)
if err != nil {fmt.Println(err.Error())
os.Exit(1)
}
if err = service.Start(); err != nil {fmt.Println(err.Error())
os.Exit(1)
}
defer service.Stop()
select {}}
服务注册端
package main
import (
"fmt"
"os"
"time"
"github.com/goeasya/discox"
)
func main() {timer := time.NewTimer(time.Second * 5)
cfg := discox.DiscoverConfig{BackendEndPoints: []string{"http://10.1.1.1:23790"},
BackendType: discox.EtcdBackend,
DiscoverPrefix: "/discox/etcddemo",
ServiceName: "demo",
}
server, err := discox.NewDiscover(&cfg)
if err != nil {fmt.Println(err.Error())
os.Exit(1)
}
endpointCacher := discox.NewLiteEndpoint()
go server.Start(endpointCacher)
defer server.Stop()
for {
select {
case <-timer.C:
fmt.Println("time 5 seconds")
endpoints := endpointCacher.List()
fmt.Println(endpoints, len(endpoints))
timer.Reset(time.Second * 5)
}
}
}
zookeeper
服务发现端
package main
import (
"fmt"
"github.com/goeasya/discox"
"os"
)
func main() {
cfg := discox.RegisterConfig{
BackendType: discox.ZookeeperBackend,
BackendEndPoints: []string{"10.1.1.1:2181"},
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
HeartBeatPeriod: 5,
ServiceEndPoint: "127.0.0.1:8111",
}
service, err := discox.NewRegister(&cfg)
if err != nil {fmt.Println(err.Error())
os.Exit(1)
}
if err = service.Start(); err != nil {fmt.Println(err.Error())
os.Exit(1)
}
defer service.Stop()
select {}}
服务注册端
package main
import (
"fmt"
"github.com/goeasya/discox"
"os"
"time"
)
func main() {timer := time.NewTimer(time.Second * 5)
cfg := discox.DiscoverConfig{BackendEndPoints: []string{"10.1.1.1:2181"},
BackendType: discox.ZookeeperBackend,
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
}
server, err := discox.NewDiscover(&cfg)
if err != nil {fmt.Println(err.Error())
os.Exit(1)
}
endpointCacher := discox.NewLiteEndpoint()
go server.Start(endpointCacher)
defer server.Stop()
for {
select {
case <-timer.C:
fmt.Println("time 5 seconds")
endpoints := endpointCacher.List()
fmt.Println(endpoints, len(endpoints))
timer.Reset(time.Second * 5)
}
}
}
consul
服务发现端
package main
import (
"fmt"
"net/http"
"os"
"github.com/goeasya/discox"
)
func main() {
cfg := discox.RegisterConfig{
BackendType: discox.ConsulBackend,
BackendEndPoints: []string{"consul.test.com"},
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
HeartBeatPeriod: 5,
ServiceEndPoint: "172.18.1.1:8080",
HealthCheckEndPoint: "172.18.1.1:8080/check",
}
service, err := discox.NewRegister(&cfg)
if err != nil {fmt.Println(err.Error())
os.Exit(1)
}
if err = service.Start(); err != nil {fmt.Println(err.Error())
os.Exit(1)
}
http.HandleFunc("/check", consulCheck)
go http.ListenAndServe(":8080", nil)
defer service.Stop()
select {}}
var count int64
func consulCheck(w http.ResponseWriter, r *http.Request) {s := "consulCheck" + fmt.Sprint(count) + "remote:" + r.RemoteAddr + " " + r.URL.String()
fmt.Println(s)
fmt.Fprintln(w, s)
count++
}
服务注册端
package main
import (
"fmt"
"os"
"time"
"github.com/goeasya/discox"
)
func main() {timer := time.NewTimer(time.Second * 5)
cfg := discox.DiscoverConfig{BackendEndPoints: []string{"consul.test.com"},
BackendType: discox.ConsulBackend,
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
}
server, err := discox.NewDiscover(&cfg)
if err != nil {fmt.Println(err.Error())
os.Exit(1)
}
endpointCacher := discox.NewLiteEndpoint()
go server.Start(endpointCacher)
defer server.Stop()
for {
select {
case <-timer.C:
fmt.Println("time 5 seconds")
endpoints := endpointCacher.List()
fmt.Println(endpoints, len(endpoints))
timer.Reset(time.Second * 5)
}
}
}
代码 github 地址 https://github.com/goeasya/di…