共计 3946 个字符,预计需要花费 10 分钟才能阅读完成。
为什么选择 nsq
之前一直在用 erlang 做电信产品的开发,对 erlang 的一些生态也比较了解,和 erlang 相关的产品在互联网公司使用最多的应该就是 rabbitmq 了,也许很多人听说过 erlang 就是因为他们公司在使用 rabbitmq。在之前也看过一点 rabbitmq 的代码,以及后来的 emqtt 都看过一点, 所以对消息队列这块是情有独钟。转到 go 后也在关注消息队列这块,nsq 是一个 golng 的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。
nsq 的架构与代码结构
nsq 的的话主要有三个模块构成, 这里直接复制官方的介绍:
nsqd: is the daemon that receives, queues, and delivers messages to clients.
nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.
nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).
这里是一个消息投递的过程, 显示了消息怎么从 nsqd 到达 consumer, 缺少了 producer 和 nsqlookupd. nsqlookupd 主要提供了两个功能:
向 nsqd 提供一个 topic 和 channel 的注册信息
对 consumser 提供了 toic 和 channel 的查询功能然后
consumer 查询到 nsqd 之后就是上面看到的动态图了, consumer 直接和 nsqd 通信, 下面是一个更全面一点的时序图整个项目的代码结构也是围绕上面的三个模块构建:
internal(公共部分的实现)
nsqadmin(对 nsqadmin 的时间)
nsqd(对 nsqd 的实现)
nsqlookupd(对 nsqlookupd 的实现)
总共也就这四个 package, 是不是有很想看下去的冲动 (smile).
lookupd 的启动流程
经过上面的介绍, 我们对 lookupd 有里简单的认识. 首先他是一个独立的进程, 为 topic 和 channel 的发现服务. 但不参与时间的消息投递. 对 lookup 的实现是在 nsq/apps/nsqlookupd/nsqlookupd.go 和 nsq/nsqlookupd/ 中. lookupd 的启动是使用了一个叫 go-srv 的 windows wrapper. 通过在 nsq/apps/nsqlookupd/nsqlookupd.go 中实现:
type Service interface {
// Init is called before the program/service is started and after it’s
// determined if the program is running as a Windows Service.
Init(Environment) error
// Start is called after Init. This method must be non-blocking.
Start() error
// Stop is called in response to os.Interrupt, os.Kill, or when a
// Windows Service is stopped.
Stop() error
}
来完成整个进程的管理,go-srv 帮助我们做了系统信号的管理, 下面来看下 lookupd 的启动流程,
实例化一个 NSQLookupd 对象
// apps/nsqlookupd/nsqlookupd.go
daemon := nsqlookupd.New(opts) // 实例化一个 NSQLookupd 的对象
err := daemon.Main() // 开始启动 NSQLookupd
// nsq/nsqlookupd/nsqlookupd.go
func New(opts *Options) *NSQLookupd {
….
n := &NSQLookupd{
opts: opts, // 启动参数
DB: NewRegistrationDB(), // 内从里面的一个数据库, 主要用来存储 tpoic/channel 以及 nsqd 的消息
}
…
return n
}
开始启动
// Main starts an instance of nsqlookupd and returns an
// error if there was a problem starting up.
func (l *NSQLookupd) Main() error {
ctx := &Context{l}
// 启动两场 go routine 来处理 tcp/http 的请求
tcpListener, err := net.Listen(“tcp”, l.opts.TCPAddress)
if err != nil {
return fmt.Errorf(“listen (%s) failed – %s”, l.opts.TCPAddress, err)
}
httpListener, err := net.Listen(“tcp”, l.opts.HTTPAddress)
if err != nil {
return fmt.Errorf(“listen (%s) failed – %s”, l.opts.TCPAddress, err)
}
l.tcpListener = tcpListener
l.httpListener = httpListener
tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
})
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
http_api.Serve(httpListener, httpServer, “HTTP”, l.logf)
})
return nil
}
下面是一个 lookupd 里面的进程模型
lookupd 里的主要数据结构
在上面创建一个 instance 的时候我们看到创建一个 NewRegistrationDB() 的函数, 这里就是存储 lookupd 所有数据结构的地方. 每个 topic/channe/clientl 就是一个 Registration 的 key, 然后 value 对应的就是该 topic/channel 对应的 nsqd 信息. 所有的接口都是在操作上面的那个数据结构.
lookupd 和其他模块的交互
在进程模型中我们看到一个 tcp server 和一个 http seerver, 和其他模块之间的交互都是在里面完成的. 看下 tcp server 的处理
有新的 tcp 连接进来, 创建一个新的 go routine 去服务该请求
// /nsq/internal/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
for {
…
go handler.Handle(clientConn)
}
实例化一个 protocol 对象
// /nsq/nsqlookupd/tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) {
…
prot.IOLoop(clientConn)
…
}
对请求的具体处理
// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
…
p.Exec(client, reader, params)
…
}
// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
switch params[0] {
case “PING”: // NSQD 的心跳包
return p.PING(client, params)
case “IDENTIFY”: // NQSD 启动时候的 indentify 就是我们上面看到的 peerInfo
return p.IDENTIFY(client, reader, params[1:])
case “REGISTER”: // 注册 topic/channel 信息到 lookupd
return p.REGISTER(client, reader, params[1:])
case “UNREGISTER”: // unregister topic/lookup 信息
return p.UNREGISTER(client, reader, params[1:])
}
return nil, protocol.NewFatalClientErr(nil, “E_INVALID”, fmt.Sprintf(“invalid command %s”, params[0]))
}
上面就是整个 tcp server 的流程, 每个连接都是一个 go routine. 相对 tcp server 来说的话 http server 就简单很多, 如果你对 httprouter 熟悉的话就更简单了就是对 RegistrationDB 的增删查改. http 测的 api 的话可以参考: 官方的文档
总结
lookupd 是其中比较简单的模块,通过源码的学习我们可以更好的掌握 go 的一些技巧,也鼓励大家通过一一些开源的代码来掌握语言的一些技巧。其实通过 lookupd 我们可以抽象一套自己的 HTTP/TCP 服务端架构来。