关于中间件:nsqlookupd高性能消息中间件-NSQ-解析

32次阅读

共计 4608 个字符,预计需要花费 12 分钟才能阅读完成。

摘要:本篇将会联合源码介绍 nsqlookupd 的实现细节。

本文分享自华为云社区《高性能消息中间件 NSQ 解析 -nsqlookupd 实现细节介绍》,原文作者:aoho。

本篇将会联合源码介绍 nsqlookupd 的实现细节。nsqlookupd 次要流程与 nsqd 执行逻辑类似,区别在于具体运行的工作不同。

nsqlookupd 是 nsq 治理集群拓扑信息以及用于注册和发现 nsqd 服务。所以,也能够把 nsqlookupd 了解为注册发现服务。当 nsq 集群中有多个 nsqlookupd 服务时,因为每个 nsqd 都会向所有的 nsqlookupd 上报本地信息,因而 nsqlookupd 具备最终一致性。

入口函数

在 nsq/apps/nsqlookupd/main.go 能够找到执行入口文件。

// 位于 apps/nsqlookupd/main.go:45
func main() {prg := &program{}
  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {if env.IsWindowsService() {dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start() error {opts := nsqlookupd.NewOptions()

  flagSet := nsqlookupdFlagSet(opts)
  ...
}

同样,通过第三方 svc 包进行优雅的后盾过程治理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例。

// 位于 apps/nsqlookupd/main.go:80
options.Resolve(opts, flagSet, cfg)
  nsqlookupd, err := nsqlookupd.New(opts)
  if err != nil {logFatal("failed to instantiate nsqlookupd", err)
  }
  p.nsqlookupd = nsqlookupd

  go func() {err := p.nsqlookupd.Main()
    if err != nil {p.Stop()
      os.Exit(1)
    }
  }()

初始化配置参数(优先级:flagSet- 命令行参数 > cfg- 配置文件 > opts- 默认值),开启协程,进入 nsqlookupd.Main() 主函数。

监听申请

咱们来看下 nsqlookupd 是如何监听申请的,代码实现如下:

// 位于 nsqlookupd/nsqlookupd.go:53
func (l *NSQLookupd) Main() error {ctx := &Context{l}

  exitCh := make(chan error)
  var once sync.Once
  exitFunc := func(err error) {once.Do(func() {
      if err != nil {l.logf(LOG_FATAL, "%s", err)
      }
      exitCh <- err
    })
  }

  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func() {exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
  })
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func() {exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
  })

  err := <-exitCh
  return err
}

开启 goroutine 执行 tcpServer, httpServer,别离监听 nsqd, nsqadmin 的客户端申请。

解决申请

// 位于 internal/protocol/tcp_server.go:17
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {logf(lg.INFO, "TCP: listening on %s", listener.Addr())

  for {clientConn, err := listener.Accept()
    if err != nil {if nerr, ok := err.(net.Error); ok && nerr.Temporary() {logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if !strings.Contains(err.Error(), "use of closed network connection") {return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }

  logf(lg.INFO, "TCP: closing %s", listener.Addr())

  return nil
}

TCPServer 循环监听客户端申请,建设长连贯进行通信,并开启 handler 解决每一个客户端 conn。

装璜 http 路由

httpServer 通过 http_api.Decorate 装璜器实现对各 http 路由进行 handler 装璜,如加 log 日志、V1 协定版本号的对立格局输入等;

func newHTTPServer(ctx *Context) *httpServer {log := http_api.Log(ctx.nsqlookupd.logf)

  router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
  s := &httpServer{
    ctx:    ctx,
    router: router,
  }

  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}

解决客户端命令

tcp 解析 V1 协定,外部协定封装的 prot.IOLoop(conn) 进行循环解决客户端命令,直到客户端命令全副解析处理完毕才敞开连贯。

var prot protocol.Protocol
  switch protocolMagic {
  case "V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic'%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)

执行命令

通过外部协定进行 p.Exec(执行命令)、p.SendResponse(返回后果),保障每个 nsqd 节点都能正确的进行服务注册 (register) 与登记 (unregister),并进行心跳检测(ping) 节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。

for {line, err = reader.ReadString('n')
    if err != nil {break}

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    var response []byte
    response, err = p.Exec(client, reader, params)
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {ctx = "-" + parentErr.Error()
      }
      _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
      if sendErr != nil {p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }
      continue
    }

    if response != nil {_, err = protocol.SendResponse(client, response)
      if err != nil {break}
    }
  }

  conn.Close()

nsqlookupd 服务同时开启 tcp 和 http 两个监听服务,nsqd 会作为客户端,连上 nsqlookupd 的 tcp 服务,并上报本人的 topic 和 channel 信息,以及通过心跳机制判断 nsqd 状态;还有个 http 服务提供给 nsqadmin 获取集群信息。

小结

本文次要介绍 nsqlookupd 的实现,nsqlookupd 同样是一个守护过程,负责管理拓扑信息。客户端通过查问 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点播送话题(topic)和通道(channel)信息。有两个接口:TCP 接口,nsqd 用它来播送。HTTP 接口,客户端用它来发现和治理。

下一篇文章,将会持续介绍 nsq 中其余模块实现的细节。

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0