共计 9992 个字符,预计需要花费 25 分钟才能阅读完成。
micro.newService()中 newOptions
func newOptions(opts ...Option) Options {
opt := Options{
Auth: auth.DefaultAuth,
Broker: broker.DefaultBroker,
Cmd: cmd.DefaultCmd,
Config: config.DefaultConfig,
Client: client.DefaultClient,
Server: server.DefaultServer,
Store: store.DefaultStore,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Runtime: runtime.DefaultRuntime,
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
}
for _, o := range opts {o(&opt)
}
return opt
}
初始化了一堆根底设置,来看看 Registryregistry.DefaultRegistry,
在 registry/registry.go 中的DefaultRegistry = NewRegistry()
// NewRegistry returns a new default registry which is mdns
func NewRegistry(opts ...Option) Registry {return newRegistry(opts...)
}
func newRegistry(opts ...Option) Registry {
options := Options{Context: context.Background(),
Timeout: time.Millisecond * 100,
}
for _, o := range opts {o(&options)
}
// set the domain
defaultDomain := DefaultDomain
d, ok := options.Context.Value("mdns.domain").(string)
if ok {defaultDomain = d}
return &mdnsRegistry{
defaultDomain: defaultDomain,
globalDomain: globalDomain,
opts: options,
domains: make(map[string]services),
watchers: make(map[string]*mdnsWatcher),
}
}
这里做了以下事件:
- 初始化并设置 Options
- 设置 defaultDomain, 默认 micro,如果 options.Context 中定义了 mdns.domain,则应用这里定义的
- 返回 mdnsRegistry{}实例
在 micro server 篇中介绍了 service 的启动过程 service.Run()
中调用了 s.Start()
,s.Start()
中调用了s.opts.Server.Start()
,这里的 s.opts.Server 就是 micro/defaults.go 中定义的server.DefaultServer = gsrv.NewServer()
那咱们去看 server/grpc/grpc.go 中的Start()
func (g *grpcServer) Start() error {g.RLock()
if g.started {g.RUnlock()
return nil
}
g.RUnlock()
config := g.Options()
// micro: config.Transport.Listen(config.Address)
var ts net.Listener
if l := g.getListener(); l != nil {ts = l} else {
var err error
// check the tls config for secure connect
if tc := config.TLSConfig; tc != nil {ts, err = tls.Listen("tcp", config.Address, tc)
// otherwise just plain tcp listener
} else {ts, err = net.Listen("tcp", config.Address)
}
if err != nil {return err}
}
if g.opts.Context != nil {if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 {ts = netutil.LimitListener(ts, c)
}
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())
}
g.Lock()
g.opts.Address = ts.Addr().String()
g.Unlock()
// only connect if we're subscribed
if len(g.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
}
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
}
// announce self to the world
if err := g.Register(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("Server register error: %v", err)
}
}
// micro: go ts.Accept(s.accept)
go func() {if err := g.srv.Serve(ts); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("gRPC Server start error: %v", err)
}
}
}()
go func() {t := new(time.Ticker)
// only process if it exists
if g.opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(g.opts.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
if err := g.Register(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Error("Server register error:", err)
}
}
// wait for exit
case ch = <-g.exit:
break Loop
}
}
// deregister self
if err := g.Deregister(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Error("Server deregister error:", err)
}
}
// wait for waitgroup
if g.wg != nil {g.wg.Wait()
}
// stop the grpc server
exit := make(chan bool)
go func() {g.srv.GracefulStop()
close(exit)
}()
select {
case <-exit:
case <-time.After(time.Second):
g.srv.Stop()}
// close transport
ch <- nil
if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
if err := config.Broker.Disconnect(); err != nil {if logger.V(logger.ErrorLevel, logger.DefaultLogger) {logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}()
// mark the server as started
g.Lock()
g.started = true
g.Unlock()
return nil
}
这个过程在 micro server 篇中有介绍,当初只看 registry 局部,注册后开一个协程定时注册
g.Register()
注册到服务发现
func (g *grpcServer) Register() error {g.RLock()
rsvc := g.rsvc
config := g.opts
g.RUnlock()
regFunc := func(service *registry.Service) error {
var regErr error
for i := 0; i < 3; i++ {
// set the ttl and namespace
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL),
registry.RegisterDomain(g.opts.Namespace),
}
// attempt to register
if err := config.Registry.Register(service, rOpts...); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
}
return regErr
}
// if service already filled, reuse it and return early
if rsvc != nil {if err := regFunc(rsvc); err != nil {return err}
return nil
}
var err error
var advt, host, port string
var cacheService bool
// check the advertise address first
// if it exists then use it, otherwise
// use the address
if len(config.Advertise) > 0 {advt = config.Advertise} else {advt = config.Address}
if cnt := strings.Count(advt, ":"); cnt >= 1 {// ipv6 address in format [host]:port or ipv4 host:port
host, port, err = net.SplitHostPort(advt)
if err != nil {return err}
} else {host = advt}
if ip := net.ParseIP(host); ip != nil {cacheService = true}
addr, err := addr.Extract(host)
if err != nil {return err}
// make copy of metadata
md := meta.Copy(config.Metadata)
// register service
node := ®istry.Node{
Id: config.Name + "-" + config.Id,
Address: mnet.HostPort(addr, port),
Metadata: md,
}
node.Metadata["broker"] = config.Broker.String()
node.Metadata["registry"] = config.Registry.String()
node.Metadata["server"] = g.String()
node.Metadata["transport"] = g.String()
node.Metadata["protocol"] = "grpc"
g.RLock()
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range g.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {handlerList = append(handlerList, n)
}
}
sort.Strings(handlerList)
var subscriberList []*subscriber
for e := range g.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {return subscriberList[i].topic > subscriberList[j].topic
})
endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, n := range handlerList {endpoints = append(endpoints, g.handlers[n].Endpoints()...)
}
for _, e := range subscriberList {endpoints = append(endpoints, e.Endpoints()...)
}
g.RUnlock()
service := ®istry.Service{
Name: config.Name,
Version: config.Version,
Nodes: []*registry.Node{node},
Endpoints: endpoints,
}
g.RLock()
registered := g.registered
g.RUnlock()
if !registered {if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
}
// register the service
if err := regFunc(service); err != nil {return err}
// already registered? don't need to register subscribers
if registered {return nil}
g.Lock()
defer g.Unlock()
for sb := range g.subscribers {handler := g.createSubHandler(sb, g.opts)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {opts = append(opts, broker.Queue(queue))
}
if cx := sb.Options().Context; cx != nil {opts = append(opts, broker.SubscribeContext(cx))
}
if !sb.Options().AutoAck {opts = append(opts, broker.DisableAutoAck())
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {logger.Infof("Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil {return err}
g.subscribers[sb] = []broker.Subscriber{sub}
}
g.registered = true
if cacheService {g.rsvc = service}
return nil
}
这里做了以下事件:
-
rsvc := g.rsvc,定义 regFunc()
- 定义[]registry.RegisterOption{}
- 调用 config.Registry.Register()注册,失败会重试 3 次
- rsvc 不为空就调用 regFunc()注册了并返回了,空就往下走,持续注册
- 验证 host,port,复制 metadata,定义 registry.Node{}, 在 metadata 中减少 broker,registry,server,transport,protocol
- g.handlers 放到 handlerList 中(非外部 handle),排个序,放弃一致性。g.subscribers 也放到 subscriberList,按 topic 排序。最初都放入 endpoints
- 定义 registry.Service{},调用 regFunc()注册,如果没有谬误,也没有订阅须要解决就返回
- 解决订阅
到 registry/mdns_registry.go 中看看Register()
func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {m.Lock()
// parse the options
var options RegisterOptions
for _, o := range opts {o(&options)
}
if len(options.Domain) == 0 {options.Domain = m.defaultDomain}
// create the domain in the memory store if it doesn't yet exist
if _, ok := m.domains[options.Domain]; !ok {m.domains[options.Domain] = make(services)
}
// create the wildcard entry used for list queries in this domain
entries, ok := m.domains[options.Domain][service.Name]
if !ok {entry, err := createServiceMDNSEntry(service.Name, options.Domain)
if err != nil {m.Unlock()
return err
}
entries = append(entries, entry)
}
var gerr error
for _, node := range service.Nodes {
var seen bool
for _, entry := range entries {
if node.Id == entry.id {
seen = true
break
}
}
// this node has already been registered, continue
if seen {continue}
txt, err := encode(&mdnsTxt{
Service: service.Name,
Version: service.Version,
Endpoints: service.Endpoints,
Metadata: node.Metadata,
})
if err != nil {
gerr = err
continue
}
host, pt, err := net.SplitHostPort(node.Address)
if err != nil {
gerr = err
continue
}
port, _ := strconv.Atoi(pt)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
}
// we got here, new node
s, err := mdns.NewMDNSService(
node.Id,
service.Name,
options.Domain+".",
"",
port,
[]net.IP{net.ParseIP(host)},
txt,
)
if err != nil {
gerr = err
continue
}
srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true})
if err != nil {
gerr = err
continue
}
entries = append(entries, &mdnsEntry{id: node.Id, node: srv})
}
// save the mdns entry
m.domains[options.Domain][service.Name] = entries
m.Unlock()
// register in the global Domain so it can be queried as one
if options.Domain != m.globalDomain {
srv := *service
srv.Nodes = nil
for _, n := range service.Nodes {
node := n
// set the original domain in node metadata
if node.Metadata == nil {node.Metadata = map[string]string{"domain": options.Domain}
} else {node.Metadata["domain"] = options.Domain
}
srv.Nodes = append(srv.Nodes, node)
}
if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil {gerr = err}
}
return gerr
}
这里做了以下事件:
- 设置 optionsentries
- 创立 m.domains[options.Domain], 并赋值 entries
- 循环每个 service.Nodes,entries 看有没有注册过,有就跳过
- 编码 mdnsTxt{}, 调用 mdns.NewMDNSService()失去一个新 node,在调用 mdns.NewServer()失去 mdns.Server,包装到 mdnsEntry{},放入 entries,在存入 m.domainsoptions.Domain
- 如果 options.Domain != m.globalDomain,设置 service.node 中的 Metadata[“domain”]为 options.Domain,注册到 global Domain 中
这里是默认的 mdns 实现,理论应用中能够指定 consul,etcd 等,具体的流程请见各自的Register()
正文完