乐趣区

Go-Micro-Register-源码分析

概述

Go Micro 是一个微服务框架分布式框架,既然是分布式那服务的注册和发现就是不可避免的。Micro 又是一个可插拔插件的框架,只要实现下面代码中的接口就可以使用各种不同的服务注册发现。现在代码库中已经可以支持 consul,etcd,zk 等各种。下面我们来看一下 Micro 框架是如何注册和发现服务的。

流程

服务端把服务的地址信息保存到 Registry,
然后定时的心跳检查,或者定时的重新注册服务。
客户端监听 Registry,最好是把服务信息保存到本地,监听服务的变动,更新缓存。
当调用服务端的接口是时,根据客户端的服务列表和负载算法选择服务端进行通信。

  • Register() 服务端服务注册
  • Deregister() 服务端服务注销
  • GetService() 客户端获取服务节点
  • ListServices() 获取所有服务节点
  • Watch()客户端获取 watcher 监听 Registry 的服务节点信息
type Registry interface {Init(...Option) error
    Options() Options
    Register(*Service, ...RegisterOption) error
    Deregister(*Service) error
    GetService(string) ([]*Service, error)
    ListServices() ([]*Service, error)
    Watch(...WatchOption) (Watcher, error)
    String() string}

以下我们就以 consul 作为服务注册发现来分析源码

服务端注册服务

  1. 当服务开启时,调用 consul 插件作为服务发现注册。
  2. 准备服务对象,包括服务名等信息。
  3. 往 consul 中写入 services 节点。
  4. 根据设置的 RegisterInterval 时间间隔参数,循环检测服务是否可用监听已注册服务。
  5. 调用 consul 设置服务存货时间 TTL。

consul 添加完节点如下图 (下图展示了同一个服务名 开启了 2 个服务,相当于分布式的两台机器):

源码分析

// 服务开启 调用 run 方法
func (s *service) Run() error {
    // 调用 start 函数
    if err := s.Start(); err != nil {return err}

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)

    select {
    // wait on kill signal
    case <-ch:
    // wait on context cancel
    case <-s.opts.Context.Done():}

    return s.Stop()}
// 调用服务 start 方法 
func (s *service) Start() error {
    // 执行服务开始之前函数列表
    for _, fn := range s.opts.BeforeStart {if err := fn(); err != nil {return err}
    }
    // 服务开始
    if err := s.opts.Server.Start(); err != nil {return err}
    // 执行服务结束之后函数列表
    for _, fn := range s.opts.AfterStart {if err := fn(); err != nil {return err}
    }

    return nil
}
// 服务开启函数 
func (s *rpcServer) Start() error {
    // 省略其他逻辑代码。。。// 调用 RegisterCheck 检查注册服务是否可用 可从外部注入函数
    if err = s.opts.RegisterCheck(s.opts.Context); err != nil {log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)
    } else {
        // 注册服务
        if err = s.Register(); err != nil {log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
        }
    }

    exit := make(chan bool)

    // 省略 broker 代码
    // 开启 goroutine 检查服务是否可用,间隔时间为设置的 RegisterInterval
    go func() {t := new(time.Ticker)

        // only process if it exists
        if s.opts.RegisterInterval > time.Duration(0) {
            // new ticker
            t = time.NewTicker(s.opts.RegisterInterval)
        }

        // return error chan
        var ch chan error
    
    Loop:
        for {
            select {
            // register self on interval
            case <-t.C:
                s.RLock()
                registered := s.registered
                s.RUnlock()
                if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered {log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
                    // deregister self in case of error
                    if err := s.Deregister(); err != nil {log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
                    }
                } else {if err := s.Register(); err != nil {log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
                    }
                }
            // wait for exit
            case ch = <-s.exit:
                t.Stop()
                close(exit)
                break Loop
            }
        }

        // deregister self
        if err := s.Deregister(); err != nil {log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
        }

        // wait for requests to finish
        if s.wg != nil {s.wg.Wait()
        }

        // close transport listener
        ch <- ts.Close()

        // disconnect the broker
        config.Broker.Disconnect()

        // swap back address
        s.Lock()
        s.opts.Address = addr
        s.Unlock()}()

    return nil
}
// 调用 consul 插件中的 Register 函数
func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
    // 省略组成注册服务和判断是否错误代码
    
    // 向 consul 中写入服务
    if err := c.Client.Agent().ServiceRegister(asr); err != nil {return err}

    // save our hash and time check of the service
    c.Lock()
    c.register[s.Name] = h
    c.lastChecked[s.Name] = time.Now()
    c.Unlock()

    // if the TTL is 0 we don't mess with the checks
    if options.TTL == time.Duration(0) {return nil}

    // pass the healthcheck
    return c.Client.Agent().PassTTL("service:"+node.Id, "")
}
// 使用 http 方式往 consul 中添加服务
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {r := a.c.newRequest("PUT", "/v1/agent/service/register")
    r.obj = service
    _, resp, err := requireOK(a.c.doRequest(r))
    if err != nil {return err}
    resp.Body.Close()
    return nil
}

订阅服务注册

  1. 调用 broker 的 Subscribe(订阅方法)
  2. 调用 Register Register 函数往 consul 添加服务节点如下图

// broker.go
func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {return DefaultBroker.Subscribe(topic, handler, opts...)
}
// 订阅
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {options := NewSubscribeOptions(opts...)

    // parse address for host, port
    parts := strings.Split(h.Address(), ":")
    host := strings.Join(parts[:len(parts)-1], ":")
    port, _ := strconv.Atoi(parts[len(parts)-1])

    addr, err := maddr.Extract(host)
    if err != nil {return nil, err}

    // create unique id
    id := h.id + "." + uuid.New().String()

    var secure bool

    if h.opts.Secure || h.opts.TLSConfig != nil {secure = true}

    // register service
    node := &registry.Node{
        Id:      id,
        Address: fmt.Sprintf("%s:%d", addr, port),
        Metadata: map[string]string{"secure": fmt.Sprintf("%t", secure),
        },
    }

    // check for queue group or broadcast queue
    version := options.Queue
    if len(version) == 0 {version = broadcastVersion}

    service := &registry.Service{
        Name:    "topic:" + topic,
        Version: version,
        Nodes:   []*registry.Node{node},
    }

    // generate subscriber
    subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    id,
        topic: topic,
        fn:    handler,
        svc:   service,
    }

    // subscribe now
    if err := h.subscribe(subscriber); err != nil {return nil, err}

    // return the subscriber
    return subscriber, nil
}
// 调用 Register 组件注册服务
func (h *httpBroker) subscribe(s *httpSubscriber) error {h.Lock()
    defer h.Unlock()

    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {return err}

    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
    return nil
}

总结

在 Micro 中 Register 是一个很重要的组件,之所以 Micro 被称为分布式框架就是因为有了服务的注册和发现。另外 Micro 中的订阅功能也是利用了 Register 组件来实现的。

退出移动版