概述Go Micro是一个微服务框架分布式框架,既然是分布式那服务的注册和发现就是不可避免的。Micro又是一个可插拔插件的框架,只要实现下面代码中的接口就可以使用各种不同的服务注册发现。现在代码库中已经可以支持consul,etcd,zk等各种。下面我们来看一下Micro框架是如何注册和发现服务的。
流程服务端把服务的地址信息保存到Registry, 然后定时的心跳检查,或者定时的重新注册服务。客户端监听Registry,最好是把服务信息保存到本地,监听服务的变动,更新缓存。当调用服务端的接口是时,根据客户端的服务列表和负载算法选择服务端进行通信。
Register() 服务端服务注册Deregister() 服务端服务注销GetService() 客户端获取服务节点ListServices() 获取所有服务节点Watch() 客户端获取watcher监听Registry的服务节点信息type Registry interface { Init(...Option) error Options() Options Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) Watch(...WatchOption) (Watcher, error) String() string}以下我们就以consul作为服务注册发现来分析源码服务端注册服务当服务开启时,调用consul插件作为服务发现注册。准备服务对象,包括服务名等信息。往consul中写入services节点。根据设置的RegisterInterval时间间隔参数,循环检测服务是否可用监听已注册服务。调用consul设置服务存货时间TTL。consul添加完节点如下图(下图展示了同一个服务名 开启了2个服务,相当于分布式的两台机器):
源码分析
// 服务开启 调用run方法func (s *service) Run() error { // 调用start函数 if err := s.Start(); err != nil { return err } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) select { // wait on kill signal case <-ch: // wait on context cancel case <-s.opts.Context.Done(): } return s.Stop()}// 调用服务start方法 func (s *service) Start() error { // 执行服务开始之前函数列表 for _, fn := range s.opts.BeforeStart { if err := fn(); err != nil { return err } } // 服务开始 if err := s.opts.Server.Start(); err != nil { return err } // 执行服务结束之后函数列表 for _, fn := range s.opts.AfterStart { if err := fn(); err != nil { return err } } return nil}// 服务开启函数 func (s *rpcServer) Start() error { // 省略其他逻辑代码。。。 // 调用RegisterCheck检查注册服务是否可用 可从外部注入函数 if err = s.opts.RegisterCheck(s.opts.Context); err != nil { log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err) } else { // 注册服务 if err = s.Register(); err != nil { log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err) } } exit := make(chan bool) // 省略broker 代码 // 开启goroutine 检查服务是否可用,间隔时间为设置的RegisterInterval go func() { t := new(time.Ticker) // only process if it exists if s.opts.RegisterInterval > time.Duration(0) { // new ticker t = time.NewTicker(s.opts.RegisterInterval) } // return error chan var ch chan error Loop: for { select { // register self on interval case <-t.C: s.RLock() registered := s.registered s.RUnlock() if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered { log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err) // deregister self in case of error if err := s.Deregister(); err != nil { log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) } } else { if err := s.Register(); err != nil { log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit case ch = <-s.exit: t.Stop() close(exit) break Loop } } // deregister self if err := s.Deregister(); err != nil { log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) } // wait for requests to finish if s.wg != nil { s.wg.Wait() } // close transport listener ch <- ts.Close() // disconnect the broker config.Broker.Disconnect() // swap back address s.Lock() s.opts.Address = addr s.Unlock() }() return nil}// 调用consul插件中的Register函数func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { // 省略组成注册服务和判断是否错误代码 // 向consul中写入服务 if err := c.Client.Agent().ServiceRegister(asr); err != nil { return err } // save our hash and time check of the service c.Lock() c.register[s.Name] = h c.lastChecked[s.Name] = time.Now() c.Unlock() // if the TTL is 0 we don't mess with the checks if options.TTL == time.Duration(0) { return nil } // pass the healthcheck return c.Client.Agent().PassTTL("service:"+node.Id, "")}// 使用http方式往consul中添加服务func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error { r := a.c.newRequest("PUT", "/v1/agent/service/register") r.obj = service _, resp, err := requireOK(a.c.doRequest(r)) if err != nil { return err } resp.Body.Close() return nil}订阅服务注册调用broker的Subscribe(订阅方法)调用Register Register函数往consul添加服务节点如下图
...