概述
Go Micro是一个微服务框架分布式框架,既然是分布式那服务的注册和发现就是不可避免的。Micro又是一个可插拔插件的框架,只要实现下面代码中的接口就可以使用各种不同的服务注册发现。现在代码库中已经可以支持consul,etcd,zk等各种。下面我们来看一下Micro框架是如何注册和发现服务的。
流程
服务端把服务的地址信息保存到Registry,
然后定时的心跳检查,或者定时的重新注册服务。
客户端监听Registry,最好是把服务信息保存到本地,监听服务的变动,更新缓存。
当调用服务端的接口是时,根据客户端的服务列表和负载算法选择服务端进行通信。
- Register() 服务端服务注册
- Deregister() 服务端服务注销
- GetService() 客户端获取服务节点
- ListServices() 获取所有服务节点
- Watch() 客户端获取watcher监听Registry的服务节点信息
type Registry interface { Init(...Option) error Options() Options Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) Watch(...WatchOption) (Watcher, error) String() string}
以下我们就以consul作为服务注册发现来分析源码
服务端注册服务
- 当服务开启时,调用consul插件作为服务发现注册。
- 准备服务对象,包括服务名等信息。
- 往consul中写入services节点。
- 根据设置的RegisterInterval时间间隔参数,循环检测服务是否可用监听已注册服务。
- 调用consul设置服务存货时间TTL。
consul添加完节点如下图(下图展示了同一个服务名 开启了2个服务,相当于分布式的两台机器):
源码分析
// 服务开启 调用run方法func (s *service) Run() error { // 调用start函数 if err := s.Start(); err != nil { return err } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) select { // wait on kill signal case <-ch: // wait on context cancel case <-s.opts.Context.Done(): } return s.Stop()}// 调用服务start方法 func (s *service) Start() error { // 执行服务开始之前函数列表 for _, fn := range s.opts.BeforeStart { if err := fn(); err != nil { return err } } // 服务开始 if err := s.opts.Server.Start(); err != nil { return err } // 执行服务结束之后函数列表 for _, fn := range s.opts.AfterStart { if err := fn(); err != nil { return err } } return nil}// 服务开启函数 func (s *rpcServer) Start() error { // 省略其他逻辑代码。。。 // 调用RegisterCheck检查注册服务是否可用 可从外部注入函数 if err = s.opts.RegisterCheck(s.opts.Context); err != nil { log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err) } else { // 注册服务 if err = s.Register(); err != nil { log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err) } } exit := make(chan bool) // 省略broker 代码 // 开启goroutine 检查服务是否可用,间隔时间为设置的RegisterInterval go func() { t := new(time.Ticker) // only process if it exists if s.opts.RegisterInterval > time.Duration(0) { // new ticker t = time.NewTicker(s.opts.RegisterInterval) } // return error chan var ch chan error Loop: for { select { // register self on interval case <-t.C: s.RLock() registered := s.registered s.RUnlock() if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered { log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err) // deregister self in case of error if err := s.Deregister(); err != nil { log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) } } else { if err := s.Register(); err != nil { log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit case ch = <-s.exit: t.Stop() close(exit) break Loop } } // deregister self if err := s.Deregister(); err != nil { log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) } // wait for requests to finish if s.wg != nil { s.wg.Wait() } // close transport listener ch <- ts.Close() // disconnect the broker config.Broker.Disconnect() // swap back address s.Lock() s.opts.Address = addr s.Unlock() }() return nil}// 调用consul插件中的Register函数func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { // 省略组成注册服务和判断是否错误代码 // 向consul中写入服务 if err := c.Client.Agent().ServiceRegister(asr); err != nil { return err } // save our hash and time check of the service c.Lock() c.register[s.Name] = h c.lastChecked[s.Name] = time.Now() c.Unlock() // if the TTL is 0 we don't mess with the checks if options.TTL == time.Duration(0) { return nil } // pass the healthcheck return c.Client.Agent().PassTTL("service:"+node.Id, "")}// 使用http方式往consul中添加服务func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error { r := a.c.newRequest("PUT", "/v1/agent/service/register") r.obj = service _, resp, err := requireOK(a.c.doRequest(r)) if err != nil { return err } resp.Body.Close() return nil}
订阅服务注册
- 调用broker的Subscribe(订阅方法)
- 调用Register Register函数往consul添加服务节点如下图
// broker.gofunc Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { return DefaultBroker.Subscribe(topic, handler, opts...)}// 订阅func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { options := NewSubscribeOptions(opts...) // parse address for host, port parts := strings.Split(h.Address(), ":") host := strings.Join(parts[:len(parts)-1], ":") port, _ := strconv.Atoi(parts[len(parts)-1]) addr, err := maddr.Extract(host) if err != nil { return nil, err } // create unique id id := h.id + "." + uuid.New().String() var secure bool if h.opts.Secure || h.opts.TLSConfig != nil { secure = true } // register service node := ®istry.Node{ Id: id, Address: fmt.Sprintf("%s:%d", addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), }, } // check for queue group or broadcast queue version := options.Queue if len(version) == 0 { version = broadcastVersion } service := ®istry.Service{ Name: "topic:" + topic, Version: version, Nodes: []*registry.Node{node}, } // generate subscriber subscriber := &httpSubscriber{ opts: options, hb: h, id: id, topic: topic, fn: handler, svc: service, } // subscribe now if err := h.subscribe(subscriber); err != nil { return nil, err } // return the subscriber return subscriber, nil}// 调用Register组件注册服务func (h *httpBroker) subscribe(s *httpSubscriber) error { h.Lock() defer h.Unlock() if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { return err } h.subscribers[s.topic] = append(h.subscribers[s.topic], s) return nil}
总结
在Micro中 Register是一个很重要的组件,之所以Micro被称为分布式框架就是因为有了服务的注册和发现。另外Micro中的订阅功能也是利用了Register组件来实现的。