micro.newService()中newOptions
func newOptions(opts ...Option) Options { opt := Options{ Auth: auth.DefaultAuth, Broker: broker.DefaultBroker, Cmd: cmd.DefaultCmd, Config: config.DefaultConfig, Client: client.DefaultClient, Server: server.DefaultServer, Store: store.DefaultStore, Registry: registry.DefaultRegistry, Router: router.DefaultRouter, Runtime: runtime.DefaultRuntime, Transport: transport.DefaultTransport, Context: context.Background(), Signal: true, } for _, o := range opts { o(&opt) } return opt}
初始化了一堆根底设置,来看看Registryregistry.DefaultRegistry,
在registry/registry.go中的DefaultRegistry = NewRegistry()
// NewRegistry returns a new default registry which is mdnsfunc NewRegistry(opts ...Option) Registry { return newRegistry(opts...)}func newRegistry(opts ...Option) Registry { options := Options{ Context: context.Background(), Timeout: time.Millisecond * 100, } for _, o := range opts { o(&options) } // set the domain defaultDomain := DefaultDomain d, ok := options.Context.Value("mdns.domain").(string) if ok { defaultDomain = d } return &mdnsRegistry{ defaultDomain: defaultDomain, globalDomain: globalDomain, opts: options, domains: make(map[string]services), watchers: make(map[string]*mdnsWatcher), }}
这里做了以下事件:
- 初始化并设置Options
- 设置defaultDomain,默认micro,如果options.Context中定义了mdns.domain,则应用这里定义的
- 返回mdnsRegistry{}实例
在micro server篇中介绍了service的启动过程service.Run()
中调用了s.Start()
,s.Start()
中调用了s.opts.Server.Start()
,这里的s.opts.Server就是micro/defaults.go中定义的server.DefaultServer = gsrv.NewServer()
那咱们去看server/grpc/grpc.go中的Start()
func (g *grpcServer) Start() error { g.RLock() if g.started { g.RUnlock() return nil } g.RUnlock() config := g.Options() // micro: config.Transport.Listen(config.Address) var ts net.Listener if l := g.getListener(); l != nil { ts = l } else { var err error // check the tls config for secure connect if tc := config.TLSConfig; tc != nil { ts, err = tls.Listen("tcp", config.Address, tc) // otherwise just plain tcp listener } else { ts, err = net.Listen("tcp", config.Address) } if err != nil { return err } } if g.opts.Context != nil { if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 { ts = netutil.LimitListener(ts, c) } } if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Server [grpc] Listening on %s", ts.Addr().String()) } g.Lock() g.opts.Address = ts.Addr().String() g.Unlock() // only connect if we're subscribed if len(g.subscribers) > 0 { // connect to the broker if err := config.Broker.Connect(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) } return err } if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } // announce self to the world if err := g.Register(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Server register error: %v", err) } } // micro: go ts.Accept(s.accept) go func() { if err := g.srv.Serve(ts); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("gRPC Server start error: %v", err) } } }() go func() { t := new(time.Ticker) // only process if it exists if g.opts.RegisterInterval > time.Duration(0) { // new ticker t = time.NewTicker(g.opts.RegisterInterval) } // return error chan var ch chan error Loop: for { select { // register self on interval case <-t.C: if err := g.Register(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Error("Server register error: ", err) } } // wait for exit case ch = <-g.exit: break Loop } } // deregister self if err := g.Deregister(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Error("Server deregister error: ", err) } } // wait for waitgroup if g.wg != nil { g.wg.Wait() } // stop the grpc server exit := make(chan bool) go func() { g.srv.GracefulStop() close(exit) }() select { case <-exit: case <-time.After(time.Second): g.srv.Stop() } // close transport ch <- nil if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker if err := config.Broker.Disconnect(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) } } }() // mark the server as started g.Lock() g.started = true g.Unlock() return nil}
这个过程在micro server篇中有介绍,当初只看registry局部,注册后开一个协程定时注册
g.Register()
注册到服务发现
func (g *grpcServer) Register() error { g.RLock() rsvc := g.rsvc config := g.opts g.RUnlock() regFunc := func(service *registry.Service) error { var regErr error for i := 0; i < 3; i++ { // set the ttl and namespace rOpts := []registry.RegisterOption{ registry.RegisterTTL(config.RegisterTTL), registry.RegisterDomain(g.opts.Namespace), } // attempt to register if err := config.Registry.Register(service, rOpts...); err != nil { // set the error regErr = err // backoff then retry time.Sleep(backoff.Do(i + 1)) continue } // success so nil error regErr = nil break } return regErr } // if service already filled, reuse it and return early if rsvc != nil { if err := regFunc(rsvc); err != nil { return err } return nil } var err error var advt, host, port string var cacheService bool // check the advertise address first // if it exists then use it, otherwise // use the address if len(config.Advertise) > 0 { advt = config.Advertise } else { advt = config.Address } if cnt := strings.Count(advt, ":"); cnt >= 1 { // ipv6 address in format [host]:port or ipv4 host:port host, port, err = net.SplitHostPort(advt) if err != nil { return err } } else { host = advt } if ip := net.ParseIP(host); ip != nil { cacheService = true } addr, err := addr.Extract(host) if err != nil { return err } // make copy of metadata md := meta.Copy(config.Metadata) // register service node := ®istry.Node{ Id: config.Name + "-" + config.Id, Address: mnet.HostPort(addr, port), Metadata: md, } node.Metadata["broker"] = config.Broker.String() node.Metadata["registry"] = config.Registry.String() node.Metadata["server"] = g.String() node.Metadata["transport"] = g.String() node.Metadata["protocol"] = "grpc" g.RLock() // Maps are ordered randomly, sort the keys for consistency var handlerList []string for n, e := range g.handlers { // Only advertise non internal handlers if !e.Options().Internal { handlerList = append(handlerList, n) } } sort.Strings(handlerList) var subscriberList []*subscriber for e := range g.subscribers { // Only advertise non internal subscribers if !e.Options().Internal { subscriberList = append(subscriberList, e) } } sort.Slice(subscriberList, func(i, j int) bool { return subscriberList[i].topic > subscriberList[j].topic }) endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) for _, n := range handlerList { endpoints = append(endpoints, g.handlers[n].Endpoints()...) } for _, e := range subscriberList { endpoints = append(endpoints, e.Endpoints()...) } g.RUnlock() service := ®istry.Service{ Name: config.Name, Version: config.Version, Nodes: []*registry.Node{node}, Endpoints: endpoints, } g.RLock() registered := g.registered g.RUnlock() if !registered { if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) } } // register the service if err := regFunc(service); err != nil { return err } // already registered? don't need to register subscribers if registered { return nil } g.Lock() defer g.Unlock() for sb := range g.subscribers { handler := g.createSubHandler(sb, g.opts) var opts []broker.SubscribeOption if queue := sb.Options().Queue; len(queue) > 0 { opts = append(opts, broker.Queue(queue)) } if cx := sb.Options().Context; cx != nil { opts = append(opts, broker.SubscribeContext(cx)) } if !sb.Options().AutoAck { opts = append(opts, broker.DisableAutoAck()) } if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { return err } g.subscribers[sb] = []broker.Subscriber{sub} } g.registered = true if cacheService { g.rsvc = service } return nil}
这里做了以下事件:
rsvc := g.rsvc,定义regFunc()
- 定义[]registry.RegisterOption{}
- 调用config.Registry.Register()注册,失败会重试3次
- rsvc不为空就调用regFunc()注册了并返回了,空就往下走,持续注册
- 验证host,port,复制metadata,定义registry.Node{},在metadata中减少broker,registry,server,transport,protocol
- g.handlers放到handlerList中(非外部handle),排个序,放弃一致性。g.subscribers也放到subscriberList,按topic排序。最初都放入endpoints
- 定义registry.Service{},调用regFunc()注册,如果没有谬误,也没有订阅须要解决就返回
- 解决订阅
到registry/mdns_registry.go中看看Register()
func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error { m.Lock() // parse the options var options RegisterOptions for _, o := range opts { o(&options) } if len(options.Domain) == 0 { options.Domain = m.defaultDomain } // create the domain in the memory store if it doesn't yet exist if _, ok := m.domains[options.Domain]; !ok { m.domains[options.Domain] = make(services) } // create the wildcard entry used for list queries in this domain entries, ok := m.domains[options.Domain][service.Name] if !ok { entry, err := createServiceMDNSEntry(service.Name, options.Domain) if err != nil { m.Unlock() return err } entries = append(entries, entry) } var gerr error for _, node := range service.Nodes { var seen bool for _, entry := range entries { if node.Id == entry.id { seen = true break } } // this node has already been registered, continue if seen { continue } txt, err := encode(&mdnsTxt{ Service: service.Name, Version: service.Version, Endpoints: service.Endpoints, Metadata: node.Metadata, }) if err != nil { gerr = err continue } host, pt, err := net.SplitHostPort(node.Address) if err != nil { gerr = err continue } port, _ := strconv.Atoi(pt) if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host) } // we got here, new node s, err := mdns.NewMDNSService( node.Id, service.Name, options.Domain+".", "", port, []net.IP{net.ParseIP(host)}, txt, ) if err != nil { gerr = err continue } srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true}) if err != nil { gerr = err continue } entries = append(entries, &mdnsEntry{id: node.Id, node: srv}) } // save the mdns entry m.domains[options.Domain][service.Name] = entries m.Unlock() // register in the global Domain so it can be queried as one if options.Domain != m.globalDomain { srv := *service srv.Nodes = nil for _, n := range service.Nodes { node := n // set the original domain in node metadata if node.Metadata == nil { node.Metadata = map[string]string{"domain": options.Domain} } else { node.Metadata["domain"] = options.Domain } srv.Nodes = append(srv.Nodes, node) } if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil { gerr = err } } return gerr}
这里做了以下事件:
- 设置optionsentries
- 创立m.domains[options.Domain],并赋值entries
- 循环每个service.Nodes,entries看有没有注册过,有就跳过
- 编码mdnsTxt{},调用mdns.NewMDNSService()失去一个新node,在调用mdns.NewServer()失去mdns.Server,包装到mdnsEntry{},放入entries,在存入m.domainsoptions.Domain
- 如果options.Domain != m.globalDomain,设置service.node中的Metadata["domain"]为options.Domain,注册到global Domain中
这里是默认的mdns实现,理论应用中能够指定consul,etcd等,具体的流程请见各自的Register()