概述

Go Micro是一个微服务框架分布式框架,既然是分布式那服务的注册和发现就是不可避免的。Micro又是一个可插拔插件的框架,只要实现下面代码中的接口就可以使用各种不同的服务注册发现。现在代码库中已经可以支持consul,etcd,zk等各种。下面我们来看一下Micro框架是如何注册和发现服务的。

流程

服务端把服务的地址信息保存到Registry,
然后定时的心跳检查,或者定时的重新注册服务。
客户端监听Registry,最好是把服务信息保存到本地,监听服务的变动,更新缓存。
当调用服务端的接口是时,根据客户端的服务列表和负载算法选择服务端进行通信。

  • Register() 服务端服务注册
  • Deregister() 服务端服务注销
  • GetService() 客户端获取服务节点
  • ListServices() 获取所有服务节点
  • Watch() 客户端获取watcher监听Registry的服务节点信息
type Registry interface {    Init(...Option) error    Options() Options    Register(*Service, ...RegisterOption) error    Deregister(*Service) error    GetService(string) ([]*Service, error)    ListServices() ([]*Service, error)    Watch(...WatchOption) (Watcher, error)    String() string}

以下我们就以consul作为服务注册发现来分析源码

服务端注册服务

  1. 当服务开启时,调用consul插件作为服务发现注册。
  2. 准备服务对象,包括服务名等信息。
  3. 往consul中写入services节点。
  4. 根据设置的RegisterInterval时间间隔参数,循环检测服务是否可用监听已注册服务。
  5. 调用consul设置服务存货时间TTL。

consul添加完节点如下图(下图展示了同一个服务名 开启了2个服务,相当于分布式的两台机器):

源码分析

// 服务开启 调用run方法func (s *service) Run() error {    // 调用start函数    if err := s.Start(); err != nil {        return err    }    ch := make(chan os.Signal, 1)    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)    select {    // wait on kill signal    case <-ch:    // wait on context cancel    case <-s.opts.Context.Done():    }    return s.Stop()}// 调用服务start方法 func (s *service) Start() error {    // 执行服务开始之前函数列表    for _, fn := range s.opts.BeforeStart {        if err := fn(); err != nil {            return err        }    }    // 服务开始    if err := s.opts.Server.Start(); err != nil {        return err    }    // 执行服务结束之后函数列表    for _, fn := range s.opts.AfterStart {        if err := fn(); err != nil {            return err        }    }    return nil}// 服务开启函数 func (s *rpcServer) Start() error {    // 省略其他逻辑代码。。。    // 调用RegisterCheck检查注册服务是否可用 可从外部注入函数    if err = s.opts.RegisterCheck(s.opts.Context); err != nil {        log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)    } else {        // 注册服务        if err = s.Register(); err != nil {            log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)        }    }    exit := make(chan bool)    // 省略broker 代码    // 开启goroutine 检查服务是否可用,间隔时间为设置的RegisterInterval    go func() {        t := new(time.Ticker)        // only process if it exists        if s.opts.RegisterInterval > time.Duration(0) {            // new ticker            t = time.NewTicker(s.opts.RegisterInterval)        }        // return error chan        var ch chan error        Loop:        for {            select {            // register self on interval            case <-t.C:                s.RLock()                registered := s.registered                s.RUnlock()                if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered {                    log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)                    // deregister self in case of error                    if err := s.Deregister(); err != nil {                        log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)                    }                } else {                    if err := s.Register(); err != nil {                        log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)                    }                }            // wait for exit            case ch = <-s.exit:                t.Stop()                close(exit)                break Loop            }        }        // deregister self        if err := s.Deregister(); err != nil {            log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)        }        // wait for requests to finish        if s.wg != nil {            s.wg.Wait()        }        // close transport listener        ch <- ts.Close()        // disconnect the broker        config.Broker.Disconnect()        // swap back address        s.Lock()        s.opts.Address = addr        s.Unlock()    }()    return nil}// 调用consul插件中的Register函数func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {    // 省略组成注册服务和判断是否错误代码        // 向consul中写入服务    if err := c.Client.Agent().ServiceRegister(asr); err != nil {        return err    }    // save our hash and time check of the service    c.Lock()    c.register[s.Name] = h    c.lastChecked[s.Name] = time.Now()    c.Unlock()    // if the TTL is 0 we don't mess with the checks    if options.TTL == time.Duration(0) {        return nil    }    // pass the healthcheck    return c.Client.Agent().PassTTL("service:"+node.Id, "")}// 使用http方式往consul中添加服务func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {    r := a.c.newRequest("PUT", "/v1/agent/service/register")    r.obj = service    _, resp, err := requireOK(a.c.doRequest(r))    if err != nil {        return err    }    resp.Body.Close()    return nil}

订阅服务注册

  1. 调用broker的Subscribe(订阅方法)
  2. 调用Register Register函数往consul添加服务节点如下图

// broker.gofunc Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {    return DefaultBroker.Subscribe(topic, handler, opts...)}// 订阅func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {    options := NewSubscribeOptions(opts...)    // parse address for host, port    parts := strings.Split(h.Address(), ":")    host := strings.Join(parts[:len(parts)-1], ":")    port, _ := strconv.Atoi(parts[len(parts)-1])    addr, err := maddr.Extract(host)    if err != nil {        return nil, err    }    // create unique id    id := h.id + "." + uuid.New().String()    var secure bool    if h.opts.Secure || h.opts.TLSConfig != nil {        secure = true    }    // register service    node := &registry.Node{        Id:      id,        Address: fmt.Sprintf("%s:%d", addr, port),        Metadata: map[string]string{            "secure": fmt.Sprintf("%t", secure),        },    }    // check for queue group or broadcast queue    version := options.Queue    if len(version) == 0 {        version = broadcastVersion    }    service := &registry.Service{        Name:    "topic:" + topic,        Version: version,        Nodes:   []*registry.Node{node},    }    // generate subscriber    subscriber := &httpSubscriber{        opts:  options,        hb:    h,        id:    id,        topic: topic,        fn:    handler,        svc:   service,    }    // subscribe now    if err := h.subscribe(subscriber); err != nil {        return nil, err    }    // return the subscriber    return subscriber, nil}// 调用Register组件注册服务func (h *httpBroker) subscribe(s *httpSubscriber) error {    h.Lock()    defer h.Unlock()    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {        return err    }    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)    return nil}

总结

在Micro中 Register是一个很重要的组件,之所以Micro被称为分布式框架就是因为有了服务的注册和发现。另外Micro中的订阅功能也是利用了Register组件来实现的。