乐趣区

关于micro:go-micro-registry

micro.newService()中 newOptions

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {o(&opt)
    }

    return opt
}

初始化了一堆根底设置,来看看 Registry
registry.DefaultRegistry,
在 registry/registry.go 中的
DefaultRegistry = NewRegistry()

// NewRegistry returns a new default registry which is mdns
func NewRegistry(opts ...Option) Registry {return newRegistry(opts...)
}

func newRegistry(opts ...Option) Registry {
    options := Options{Context: context.Background(),
        Timeout: time.Millisecond * 100,
    }

    for _, o := range opts {o(&options)
    }

    // set the domain
    defaultDomain := DefaultDomain

    d, ok := options.Context.Value("mdns.domain").(string)
    if ok {defaultDomain = d}

    return &mdnsRegistry{
        defaultDomain: defaultDomain,
        globalDomain:  globalDomain,
        opts:          options,
        domains:       make(map[string]services),
        watchers:      make(map[string]*mdnsWatcher),
    }
}

这里做了以下事件:

  1. 初始化并设置 Options
  2. 设置 defaultDomain, 默认 micro,如果 options.Context 中定义了 mdns.domain,则应用这里定义的
  3. 返回 mdnsRegistry{}实例

在 micro server 篇中介绍了 service 的启动过程
service.Run() 中调用了 s.Start()
s.Start() 中调用了s.opts.Server.Start(),这里的 s.opts.Server 就是 micro/defaults.go 中定义的server.DefaultServer = gsrv.NewServer()

那咱们去看 server/grpc/grpc.go 中的Start()

func (g *grpcServer) Start() error {g.RLock()
    if g.started {g.RUnlock()
        return nil
    }
    g.RUnlock()

    config := g.Options()

    // micro: config.Transport.Listen(config.Address)
    var ts net.Listener

    if l := g.getListener(); l != nil {ts = l} else {
        var err error

        // check the tls config for secure connect
        if tc := config.TLSConfig; tc != nil {ts, err = tls.Listen("tcp", config.Address, tc)
            // otherwise just plain tcp listener
        } else {ts, err = net.Listen("tcp", config.Address)
        }
        if err != nil {return err}
    }

    if g.opts.Context != nil {if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 {ts = netutil.LimitListener(ts, c)
        }
    }

    if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())
    }
    g.Lock()
    g.opts.Address = ts.Addr().String()
    g.Unlock()

    // only connect if we're subscribed
    if len(g.subscribers) > 0 {
        // connect to the broker
        if err := config.Broker.Connect(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
            }
            return err
        }

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
        }
    }

    // announce self to the world
    if err := g.Register(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("Server register error: %v", err)
        }
    }

    // micro: go ts.Accept(s.accept)
    go func() {if err := g.srv.Serve(ts); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("gRPC Server start error: %v", err)
            }
        }
    }()

    go func() {t := new(time.Ticker)

        // only process if it exists
        if g.opts.RegisterInterval > time.Duration(0) {
            // new ticker
            t = time.NewTicker(g.opts.RegisterInterval)
        }

        // return error chan
        var ch chan error

    Loop:
        for {
            select {
            // register self on interval
            case <-t.C:
                if err := g.Register(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Error("Server register error:", err)
                    }
                }
            // wait for exit
            case ch = <-g.exit:
                break Loop
            }
        }

        // deregister self
        if err := g.Deregister(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Error("Server deregister error:", err)
            }
        }

        // wait for waitgroup
        if g.wg != nil {g.wg.Wait()
        }

        // stop the grpc server
        exit := make(chan bool)

        go func() {g.srv.GracefulStop()
            close(exit)
        }()

        select {
        case <-exit:
        case <-time.After(time.Second):
            g.srv.Stop()}

        // close transport
        ch <- nil

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
        }
        // disconnect broker
        if err := config.Broker.Disconnect(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
            }
        }
    }()

    // mark the server as started
    g.Lock()
    g.started = true
    g.Unlock()

    return nil
}

这个过程在 micro server 篇中有介绍,当初只看 registry 局部,注册后开一个协程定时注册

g.Register()注册到服务发现

func (g *grpcServer) Register() error {g.RLock()
    rsvc := g.rsvc
    config := g.opts
    g.RUnlock()

    regFunc := func(service *registry.Service) error {
        var regErr error

        for i := 0; i < 3; i++ {
            // set the ttl and namespace
            rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL),
                registry.RegisterDomain(g.opts.Namespace),
            }

            // attempt to register
            if err := config.Registry.Register(service, rOpts...); err != nil {
                // set the error
                regErr = err
                // backoff then retry
                time.Sleep(backoff.Do(i + 1))
                continue
            }
            // success so nil error
            regErr = nil
            break
        }

        return regErr
    }

    // if service already filled, reuse it and return early
    if rsvc != nil {if err := regFunc(rsvc); err != nil {return err}
        return nil
    }

    var err error
    var advt, host, port string
    var cacheService bool

    // check the advertise address first
    // if it exists then use it, otherwise
    // use the address
    if len(config.Advertise) > 0 {advt = config.Advertise} else {advt = config.Address}

    if cnt := strings.Count(advt, ":"); cnt >= 1 {// ipv6 address in format [host]:port or ipv4 host:port
        host, port, err = net.SplitHostPort(advt)
        if err != nil {return err}
    } else {host = advt}

    if ip := net.ParseIP(host); ip != nil {cacheService = true}

    addr, err := addr.Extract(host)
    if err != nil {return err}

    // make copy of metadata
    md := meta.Copy(config.Metadata)

    // register service
    node := &registry.Node{
        Id:       config.Name + "-" + config.Id,
        Address:  mnet.HostPort(addr, port),
        Metadata: md,
    }

    node.Metadata["broker"] = config.Broker.String()
    node.Metadata["registry"] = config.Registry.String()
    node.Metadata["server"] = g.String()
    node.Metadata["transport"] = g.String()
    node.Metadata["protocol"] = "grpc"

    g.RLock()
    // Maps are ordered randomly, sort the keys for consistency
    var handlerList []string
    for n, e := range g.handlers {
        // Only advertise non internal handlers
        if !e.Options().Internal {handlerList = append(handlerList, n)
        }
    }
    sort.Strings(handlerList)

    var subscriberList []*subscriber
    for e := range g.subscribers {
        // Only advertise non internal subscribers
        if !e.Options().Internal {subscriberList = append(subscriberList, e)
        }
    }
    sort.Slice(subscriberList, func(i, j int) bool {return subscriberList[i].topic > subscriberList[j].topic
    })

    endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
    for _, n := range handlerList {endpoints = append(endpoints, g.handlers[n].Endpoints()...)
    }
    for _, e := range subscriberList {endpoints = append(endpoints, e.Endpoints()...)
    }
    g.RUnlock()

    service := &registry.Service{
        Name:      config.Name,
        Version:   config.Version,
        Nodes:     []*registry.Node{node},
        Endpoints: endpoints,
    }

    g.RLock()
    registered := g.registered
    g.RUnlock()

    if !registered {if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
        }
    }

    // register the service
    if err := regFunc(service); err != nil {return err}

    // already registered? don't need to register subscribers
    if registered {return nil}

    g.Lock()
    defer g.Unlock()

    for sb := range g.subscribers {handler := g.createSubHandler(sb, g.opts)
        var opts []broker.SubscribeOption
        if queue := sb.Options().Queue; len(queue) > 0 {opts = append(opts, broker.Queue(queue))
        }

        if cx := sb.Options().Context; cx != nil {opts = append(opts, broker.SubscribeContext(cx))
        }

        if !sb.Options().AutoAck {opts = append(opts, broker.DisableAutoAck())
        }

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Subscribing to topic: %s", sb.Topic())
        }
        sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
        if err != nil {return err}
        g.subscribers[sb] = []broker.Subscriber{sub}
    }

    g.registered = true
    if cacheService {g.rsvc = service}

    return nil
}

这里做了以下事件:

  1. rsvc := g.rsvc,定义 regFunc()

    1. 定义[]registry.RegisterOption{}
    2. 调用 config.Registry.Register()注册,失败会重试 3 次
  2. rsvc 不为空就调用 regFunc()注册了并返回了,空就往下走,持续注册
  3. 验证 host,port,复制 metadata,定义 registry.Node{}, 在 metadata 中减少 broker,registry,server,transport,protocol
  4. g.handlers 放到 handlerList 中(非外部 handle),排个序,放弃一致性。g.subscribers 也放到 subscriberList,按 topic 排序。最初都放入 endpoints
  5. 定义 registry.Service{},调用 regFunc()注册,如果没有谬误,也没有订阅须要解决就返回
  6. 解决订阅

到 registry/mdns_registry.go 中看看Register()

func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {m.Lock()

    // parse the options
    var options RegisterOptions
    for _, o := range opts {o(&options)
    }
    if len(options.Domain) == 0 {options.Domain = m.defaultDomain}

    // create the domain in the memory store if it doesn't yet exist
    if _, ok := m.domains[options.Domain]; !ok {m.domains[options.Domain] = make(services)
    }

    // create the wildcard entry used for list queries in this domain
    entries, ok := m.domains[options.Domain][service.Name]
    if !ok {entry, err := createServiceMDNSEntry(service.Name, options.Domain)
        if err != nil {m.Unlock()
            return err
        }
        entries = append(entries, entry)
    }

    var gerr error
    for _, node := range service.Nodes {
        var seen bool

        for _, entry := range entries {
            if node.Id == entry.id {
                seen = true
                break
            }
        }

        // this node has already been registered, continue
        if seen {continue}

        txt, err := encode(&mdnsTxt{
            Service:   service.Name,
            Version:   service.Version,
            Endpoints: service.Endpoints,
            Metadata:  node.Metadata,
        })

        if err != nil {
            gerr = err
            continue
        }

        host, pt, err := net.SplitHostPort(node.Address)
        if err != nil {
            gerr = err
            continue
        }
        port, _ := strconv.Atoi(pt)

        if logger.V(logger.DebugLevel, logger.DefaultLogger) {logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
        }
        // we got here, new node
        s, err := mdns.NewMDNSService(
            node.Id,
            service.Name,
            options.Domain+".",
            "",
            port,
            []net.IP{net.ParseIP(host)},
            txt,
        )
        if err != nil {
            gerr = err
            continue
        }

        srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true})
        if err != nil {
            gerr = err
            continue
        }

        entries = append(entries, &mdnsEntry{id: node.Id, node: srv})
    }

    // save the mdns entry
    m.domains[options.Domain][service.Name] = entries
    m.Unlock()

    // register in the global Domain so it can be queried as one
    if options.Domain != m.globalDomain {
        srv := *service
        srv.Nodes = nil

        for _, n := range service.Nodes {
            node := n

            // set the original domain in node metadata
            if node.Metadata == nil {node.Metadata = map[string]string{"domain": options.Domain}
            } else {node.Metadata["domain"] = options.Domain
            }

            srv.Nodes = append(srv.Nodes, node)
        }

        if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil {gerr = err}
    }

    return gerr
}

这里做了以下事件:

  1. 设置 optionsentries
  2. 创立 m.domains[options.Domain], 并赋值 entries
  3. 循环每个 service.Nodes,entries 看有没有注册过,有就跳过
  4. 编码 mdnsTxt{}, 调用 mdns.NewMDNSService()失去一个新 node,在调用 mdns.NewServer()失去 mdns.Server,包装到 mdnsEntry{},放入 entries,在存入 m.domainsoptions.Domain
  5. 如果 options.Domain != m.globalDomain,设置 service.node 中的 Metadata[“domain”]为 options.Domain,注册到 global Domain 中

这里是默认的 mdns 实现,理论应用中能够指定 consul,etcd 等,具体的流程请见各自的Register()

退出移动版