micro.newService()中newOptions

func newOptions(opts ...Option) Options {    opt := Options{        Auth:      auth.DefaultAuth,        Broker:    broker.DefaultBroker,        Cmd:       cmd.DefaultCmd,        Config:    config.DefaultConfig,        Client:    client.DefaultClient,        Server:    server.DefaultServer,        Store:     store.DefaultStore,        Registry:  registry.DefaultRegistry,        Router:    router.DefaultRouter,        Runtime:   runtime.DefaultRuntime,        Transport: transport.DefaultTransport,        Context:   context.Background(),        Signal:    true,    }    for _, o := range opts {        o(&opt)    }    return opt}

初始化了一堆根底设置,来看看Registry
registry.DefaultRegistry,
在registry/registry.go中的
DefaultRegistry = NewRegistry()

// NewRegistry returns a new default registry which is mdnsfunc NewRegistry(opts ...Option) Registry {    return newRegistry(opts...)}func newRegistry(opts ...Option) Registry {    options := Options{        Context: context.Background(),        Timeout: time.Millisecond * 100,    }    for _, o := range opts {        o(&options)    }    // set the domain    defaultDomain := DefaultDomain    d, ok := options.Context.Value("mdns.domain").(string)    if ok {        defaultDomain = d    }    return &mdnsRegistry{        defaultDomain: defaultDomain,        globalDomain:  globalDomain,        opts:          options,        domains:       make(map[string]services),        watchers:      make(map[string]*mdnsWatcher),    }}

这里做了以下事件:

  1. 初始化并设置Options
  2. 设置defaultDomain,默认micro,如果options.Context中定义了mdns.domain,则应用这里定义的
  3. 返回mdnsRegistry{}实例

在micro server篇中介绍了service的启动过程
service.Run()中调用了s.Start()
s.Start()中调用了s.opts.Server.Start(),这里的s.opts.Server就是micro/defaults.go中定义的server.DefaultServer = gsrv.NewServer()

那咱们去看server/grpc/grpc.go中的Start()

func (g *grpcServer) Start() error {    g.RLock()    if g.started {        g.RUnlock()        return nil    }    g.RUnlock()    config := g.Options()    // micro: config.Transport.Listen(config.Address)    var ts net.Listener    if l := g.getListener(); l != nil {        ts = l    } else {        var err error        // check the tls config for secure connect        if tc := config.TLSConfig; tc != nil {            ts, err = tls.Listen("tcp", config.Address, tc)            // otherwise just plain tcp listener        } else {            ts, err = net.Listen("tcp", config.Address)        }        if err != nil {            return err        }    }    if g.opts.Context != nil {        if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 {            ts = netutil.LimitListener(ts, c)        }    }    if logger.V(logger.InfoLevel, logger.DefaultLogger) {        logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())    }    g.Lock()    g.opts.Address = ts.Addr().String()    g.Unlock()    // only connect if we're subscribed    if len(g.subscribers) > 0 {        // connect to the broker        if err := config.Broker.Connect(); err != nil {            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {                logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)            }            return err        }        if logger.V(logger.InfoLevel, logger.DefaultLogger) {            logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())        }    }    // announce self to the world    if err := g.Register(); err != nil {        if logger.V(logger.ErrorLevel, logger.DefaultLogger) {            logger.Errorf("Server register error: %v", err)        }    }    // micro: go ts.Accept(s.accept)    go func() {        if err := g.srv.Serve(ts); err != nil {            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {                logger.Errorf("gRPC Server start error: %v", err)            }        }    }()    go func() {        t := new(time.Ticker)        // only process if it exists        if g.opts.RegisterInterval > time.Duration(0) {            // new ticker            t = time.NewTicker(g.opts.RegisterInterval)        }        // return error chan        var ch chan error    Loop:        for {            select {            // register self on interval            case <-t.C:                if err := g.Register(); err != nil {                    if logger.V(logger.ErrorLevel, logger.DefaultLogger) {                        logger.Error("Server register error: ", err)                    }                }            // wait for exit            case ch = <-g.exit:                break Loop            }        }        // deregister self        if err := g.Deregister(); err != nil {            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {                logger.Error("Server deregister error: ", err)            }        }        // wait for waitgroup        if g.wg != nil {            g.wg.Wait()        }        // stop the grpc server        exit := make(chan bool)        go func() {            g.srv.GracefulStop()            close(exit)        }()        select {        case <-exit:        case <-time.After(time.Second):            g.srv.Stop()        }        // close transport        ch <- nil        if logger.V(logger.InfoLevel, logger.DefaultLogger) {            logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())        }        // disconnect broker        if err := config.Broker.Disconnect(); err != nil {            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {                logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)            }        }    }()    // mark the server as started    g.Lock()    g.started = true    g.Unlock()    return nil}

这个过程在micro server篇中有介绍,当初只看registry局部,注册后开一个协程定时注册

g.Register()注册到服务发现

func (g *grpcServer) Register() error {    g.RLock()    rsvc := g.rsvc    config := g.opts    g.RUnlock()    regFunc := func(service *registry.Service) error {        var regErr error        for i := 0; i < 3; i++ {            // set the ttl and namespace            rOpts := []registry.RegisterOption{                registry.RegisterTTL(config.RegisterTTL),                registry.RegisterDomain(g.opts.Namespace),            }            // attempt to register            if err := config.Registry.Register(service, rOpts...); err != nil {                // set the error                regErr = err                // backoff then retry                time.Sleep(backoff.Do(i + 1))                continue            }            // success so nil error            regErr = nil            break        }        return regErr    }    // if service already filled, reuse it and return early    if rsvc != nil {        if err := regFunc(rsvc); err != nil {            return err        }        return nil    }    var err error    var advt, host, port string    var cacheService bool    // check the advertise address first    // if it exists then use it, otherwise    // use the address    if len(config.Advertise) > 0 {        advt = config.Advertise    } else {        advt = config.Address    }    if cnt := strings.Count(advt, ":"); cnt >= 1 {        // ipv6 address in format [host]:port or ipv4 host:port        host, port, err = net.SplitHostPort(advt)        if err != nil {            return err        }    } else {        host = advt    }    if ip := net.ParseIP(host); ip != nil {        cacheService = true    }    addr, err := addr.Extract(host)    if err != nil {        return err    }    // make copy of metadata    md := meta.Copy(config.Metadata)    // register service    node := &registry.Node{        Id:       config.Name + "-" + config.Id,        Address:  mnet.HostPort(addr, port),        Metadata: md,    }    node.Metadata["broker"] = config.Broker.String()    node.Metadata["registry"] = config.Registry.String()    node.Metadata["server"] = g.String()    node.Metadata["transport"] = g.String()    node.Metadata["protocol"] = "grpc"    g.RLock()    // Maps are ordered randomly, sort the keys for consistency    var handlerList []string    for n, e := range g.handlers {        // Only advertise non internal handlers        if !e.Options().Internal {            handlerList = append(handlerList, n)        }    }    sort.Strings(handlerList)    var subscriberList []*subscriber    for e := range g.subscribers {        // Only advertise non internal subscribers        if !e.Options().Internal {            subscriberList = append(subscriberList, e)        }    }    sort.Slice(subscriberList, func(i, j int) bool {        return subscriberList[i].topic > subscriberList[j].topic    })    endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))    for _, n := range handlerList {        endpoints = append(endpoints, g.handlers[n].Endpoints()...)    }    for _, e := range subscriberList {        endpoints = append(endpoints, e.Endpoints()...)    }    g.RUnlock()    service := &registry.Service{        Name:      config.Name,        Version:   config.Version,        Nodes:     []*registry.Node{node},        Endpoints: endpoints,    }    g.RLock()    registered := g.registered    g.RUnlock()    if !registered {        if logger.V(logger.InfoLevel, logger.DefaultLogger) {            logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)        }    }    // register the service    if err := regFunc(service); err != nil {        return err    }    // already registered? don't need to register subscribers    if registered {        return nil    }    g.Lock()    defer g.Unlock()    for sb := range g.subscribers {        handler := g.createSubHandler(sb, g.opts)        var opts []broker.SubscribeOption        if queue := sb.Options().Queue; len(queue) > 0 {            opts = append(opts, broker.Queue(queue))        }        if cx := sb.Options().Context; cx != nil {            opts = append(opts, broker.SubscribeContext(cx))        }        if !sb.Options().AutoAck {            opts = append(opts, broker.DisableAutoAck())        }        if logger.V(logger.InfoLevel, logger.DefaultLogger) {            logger.Infof("Subscribing to topic: %s", sb.Topic())        }        sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)        if err != nil {            return err        }        g.subscribers[sb] = []broker.Subscriber{sub}    }    g.registered = true    if cacheService {        g.rsvc = service    }    return nil}

这里做了以下事件:

  1. rsvc := g.rsvc,定义regFunc()

    1. 定义[]registry.RegisterOption{}
    2. 调用config.Registry.Register()注册,失败会重试3次
  2. rsvc不为空就调用regFunc()注册了并返回了,空就往下走,持续注册
  3. 验证host,port,复制metadata,定义registry.Node{},在metadata中减少broker,registry,server,transport,protocol
  4. g.handlers放到handlerList中(非外部handle),排个序,放弃一致性。g.subscribers也放到subscriberList,按topic排序。最初都放入endpoints
  5. 定义registry.Service{},调用regFunc()注册,如果没有谬误,也没有订阅须要解决就返回
  6. 解决订阅

到registry/mdns_registry.go中看看Register()

func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {    m.Lock()    // parse the options    var options RegisterOptions    for _, o := range opts {        o(&options)    }    if len(options.Domain) == 0 {        options.Domain = m.defaultDomain    }    // create the domain in the memory store if it doesn't yet exist    if _, ok := m.domains[options.Domain]; !ok {        m.domains[options.Domain] = make(services)    }    // create the wildcard entry used for list queries in this domain    entries, ok := m.domains[options.Domain][service.Name]    if !ok {        entry, err := createServiceMDNSEntry(service.Name, options.Domain)        if err != nil {            m.Unlock()            return err        }        entries = append(entries, entry)    }    var gerr error    for _, node := range service.Nodes {        var seen bool        for _, entry := range entries {            if node.Id == entry.id {                seen = true                break            }        }        // this node has already been registered, continue        if seen {            continue        }        txt, err := encode(&mdnsTxt{            Service:   service.Name,            Version:   service.Version,            Endpoints: service.Endpoints,            Metadata:  node.Metadata,        })        if err != nil {            gerr = err            continue        }        host, pt, err := net.SplitHostPort(node.Address)        if err != nil {            gerr = err            continue        }        port, _ := strconv.Atoi(pt)        if logger.V(logger.DebugLevel, logger.DefaultLogger) {            logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)        }        // we got here, new node        s, err := mdns.NewMDNSService(            node.Id,            service.Name,            options.Domain+".",            "",            port,            []net.IP{net.ParseIP(host)},            txt,        )        if err != nil {            gerr = err            continue        }        srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true})        if err != nil {            gerr = err            continue        }        entries = append(entries, &mdnsEntry{id: node.Id, node: srv})    }    // save the mdns entry    m.domains[options.Domain][service.Name] = entries    m.Unlock()    // register in the global Domain so it can be queried as one    if options.Domain != m.globalDomain {        srv := *service        srv.Nodes = nil        for _, n := range service.Nodes {            node := n            // set the original domain in node metadata            if node.Metadata == nil {                node.Metadata = map[string]string{"domain": options.Domain}            } else {                node.Metadata["domain"] = options.Domain            }            srv.Nodes = append(srv.Nodes, node)        }        if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil {            gerr = err        }    }    return gerr}

这里做了以下事件:

  1. 设置optionsentries
  2. 创立m.domains[options.Domain],并赋值entries
  3. 循环每个service.Nodes,entries看有没有注册过,有就跳过
  4. 编码mdnsTxt{},调用mdns.NewMDNSService()失去一个新node,在调用mdns.NewServer()失去mdns.Server,包装到mdnsEntry{},放入entries,在存入m.domainsoptions.Domain
  5. 如果options.Domain != m.globalDomain,设置service.node中的Metadata["domain"]为options.Domain,注册到global Domain中

这里是默认的mdns实现,理论应用中能够指定consul,etcd等,具体的流程请见各自的Register()