NSQ源码-NSQD

68次阅读

共计 8689 个字符,预计需要花费 22 分钟才能阅读完成。

看完了 nsqlookupd 我们继续往下看, nsqd 才是他的核心. 里面大量的使用到了 go channel, 相信看完之后对你学习 go 有很大的帮助. 相较于 lookupd 部分无论在代码逻辑和实现上都要复杂很多. 不过基本的代码结构基本上都是一样的, 进程使用 go-srv 来管理, Main 里启动一个 http sever 和一个 tcp server, 这里可以参考下之前文章的进程模型小节, 不过在 nsqd 中会启动另外的两个 goroutine queueScanLoop 和 lookupLoop。下面是一个具体的进程模型。后面的分析都是基于这个进程模型。
NSQD 的启动
启动时序这块儿大体上和 lookupd 中的一致, 我们下面来看看 lookupLoop 和 queueScanLoop.lookupLoop 代码见 nsqd/lookup.go 中 主要做以下几件事情:

和 lookupd 建立连接 (这里是一个长连接)
每隔 15s ping 一下 lookupd
新增或者删除 topic 的时候通知到 lookupd
新增或者删除 channel 的时候通知到 lookupd
动态的更新 options

由于设计到了 nsq 里的 in-flight/deferred message, 我们把 queueScanLoop 放到最后来看.
一条 message 的 LifeLine
下面我们就通过一条 message 的生命周期来看下 nsqd 的工作原理. 根据官方的 QuickStart, 我们可以通过 curl 来 pub 一条消息.
curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/pub?topic=test’
http handler
我们就跟着代码看一下, 首先是 http 对此的处理:
// nsq/nsqd/http.go
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {

reqParams, topic, err := s.getTopicFromQuery(req) // 从 http query 中拿到 topic 信息

}
// nsq/nsqd/http.go
func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
reqParams, err := url.ParseQuery(req.URL.RawQuery)
topicNames, ok := reqParams[“topic”]
return reqParams, s.ctx.nsqd.GetTopic(topicName), nil
}
// nsq/nsqd/nsqd.go
// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
// 1. 首先查看 n.topicMap, 确认该 topic 是否已经存在 (存在直接返回)
t, ok := n.topicMap[topicName]
// 2. 否则将新建一个 topic
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t

// 3. 查看该 nsqd 是否设置了 lookupd, 从 lookupd 获取该 tpoic 的 channel 信息
// 这个 topic/channel 已经通过 nsqlookupd 的 api 添加上去的, 但是 nsqd 的本地
// 还没有, 针对这种情况我们需要创建该 channel 对应的 deffer queue 和 inFlight
// queue.
lookupdHTTPAddrs := n.lookupdHTTPAddrs()
if len(lookupdHTTPAddrs) > 0 {
channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
}
// now that all channels are added, start topic messagePump
// 对该 topic 的初始化已经完成下面就是 message
t.Start()
return t
}

topic messagePump
在上面消息初始化完成之后就启动了 tpoic 对应的 messagePump
// nsq/nsqd/topic.go
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {

// 1. do not pass messages before Start(), but avoid blocking Pause()
// or GetChannel()
// 等待 channel 相关的初始化完成,GetTopic 中最后的 t.Start() 才正式启动该 Pump

// 2. main message loop
// 开始从 Memory chan 或者 disk 读取消息
// 如果 topic 对应的 channel 发生了变化,则更新 channel 信息

// 3. 往该 tpoic 对应的每个 channel 写入 message(如果是 deffermessage
// 的话放到对应的 deffer queue 中
// 否则放到该 channel 对应的 memoryMsgChan 中 )。
}
至此也就完成了从 tpoic memoryMsgChan 收到消息投递到 channel memoryMsgChan 的投递, 我们先看下 http 收到消息到通知 pump 处理的过程。
// nsq/nsqd/http.go
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {

msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
err = topic.PutMessage(msg)
if err != nil {
return nil, http_api.Err{503, “EXITING”}
}

return “OK”, nil
}
// nsq/nsqd/topic.go
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New(“exiting”)
}
err := t.put(m)
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1)
return nil
}
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
“TOPIC(%s) ERROR: failed to write message to backend – %s”,
t.name, err)
return err
}
}
return nil
}
这里 memoryMsgChan 的大小我们可以通过 –mem-queue-size 参数来设置,上面这段代码的流程是如果 memoryMsgChan 还没有满的话就把消息放到 memoryMsgChan 中,否则就放到 backend(disk) 中。topic 的 mesasgePump 检测到有新的消息写入的时候就开始工作了,从 memoryMsgChan/backend(disk) 读取消息投递到 channel 对应的 chan 中。还有一点请注意就是 messagePump 中
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
这段代码只有 channel(此 channel 非 golang 里的 channel 而是 nsq 的 channel 类似 nsq_to_file) 存在的时候才会去投递。上面部分就是 msg 从 producer 生产消息到吧消息写到 memoryChan/Disk 的过程,下面我们来看下 consumer 消费消息的过程。
首先是 consumer 从 nsqlookupd 查询到自己所感兴趣的 topic/channel 的 nsqd 信息,然后就是来连接了。
tcp handler
对新的 client 的处理
//nsq/internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
go handler.Handle(clientConn)
}
//nsq/nsqd/tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) {
prot.IOLoop(clientConn)
}
针对每个 client 起一个 messagePump 吧 msg 从上面 channel 对应的 chan 写入到 consumer 侧
//nsq/nsqd/protocol_v2.go
func (p *protocolV2) IOLoop(conn net.Conn) error {
client := newClientV2(clientID, conn, p.ctx)
p.ctx.nsqd.AddClient(client.ID, client)

messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)

// read the request
line, err = client.Reader.ReadSlice(‘\n’)
response, err = p.Exec(client, params)
p.Send(client, frameTypeResponse, response)

}
//nsq/nsqd/protocol_v2.go
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
switch {
case bytes.Equal(params[0], []byte(“FIN”)):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte(“RDY”)):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte(“REQ”)):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte(“PUB”)):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte(“MPUB”)):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte(“DPUB”)):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte(“NOP”)):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte(“TOUCH”)):
return p.TOUCH(client, params)
case bytes.Equal(params[0], []byte(“SUB”)):
return p.SUB(client, params)
case bytes.Equal(params[0], []byte(“CLS”)):
return p.CLS(client, params)
case bytes.Equal(params[0], []byte(“AUTH”)):
return p.AUTH(client, params)
}
}

//nsq/nsqd/protocol_v2.go
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
var channel *Channel
topic := p.ctx.nsqd.GetTopic(topicName)
channel = topic.GetChannel(channelName)
channel.AddClient(client.ID, client)

// 通知 messagePump 开始工作
client.SubEventChan <- channel
通知 topic 的 messagePump 开始工作
func (t *Topic) GetChannel(channelName string) *Channel {
t.Lock()
channel, isNew := t.getOrCreateChannel(channelName)
t.Unlock()

if isNew {
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
}

return channel
}
message 对应的 Pump
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages…
// 等待 client ready,并且 channel 的初始化完成
flushed = true
} else if flushed {
// last iteration we flushed…
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// we’re buffered (if there isn’t any more data we should flush)…
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}

select {
case <-flusherChan:
// if this case wins, we’re either starved
// or we won the race between other channels…
// in either case, force flush
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
// you can’t SUB anymore
// channel 初始化完成,pump 开始工作
subEventChan = nil
case identifyData := <-identifyEventChan:
// you can’t IDENTIFY anymore
case <-heartbeatChan:
// heartbeat 的处理
case b := <-backendMsgChan:
// 1. decode msg
// 2. 把 msg push 到 Flight Queue 里
// 3. send msg to client
case msg := <-memoryMsgChan:
// 1. 把 msg push 到 Flight Queue 里
// 2. send msg to client
case <-client.ExitChan:
// exit the routine
}
}

至此我们看的代码就是一条消息从 pub 到 nsqd 中到被消费者处理的过程。不过得注意一点,我们在上面的代码分析中,创建 topic/channel 的部分放到了 message Pub 的链上,如果是没有 lookupd 的模式的话这部分是在 client SUB 链上的。
topic/hannel 的管理
在 NSQ 内部通过
type NSQD struct {
topicMap map[string]*Topic
}

type Topic struct {
channelMap map[string]*Channel
}
来维护一个内部的 topic/channel 状态,然后在提供了如下的接口来管理 topic 和 channel
/topic/create – create a new topic
/topic/delete – delete a topic
/topic/empty – empty a topic
/topic/pause – pause message flow for a topic
/topic/unpause – unpause message flow for a topic
/channel/create – create a new channel
/channel/delete – delete a channel
/channel/empty – empty a channel
/channel/pause – pause message flow for a channel
/channel/unpause – unpause message flow for a channel
create topic/channel 的话我们在之前的代码看过了,这里可以重点看下 topic/channel delete 的时候怎样保证数据优雅的删除的,以及 messagePump 的退出机制。
queueScanLoop 的工作
// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
//
// It copies Redis’s probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
//
// If either of the queues had work to do the channel is considered “dirty”.
//
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.
这里的注释已经说的很明白了,queueScanLoop 就是通过动态的调整 queueScanWorker 的数目来处理 in-flight 和 deffered queue 的。在具体的算法上的话参考了 redis 的随机过期算法。
总结
阅读源码就是走走停停的过程,从一开始的无从下手到后面的一点点的把它啃透。一开始都觉得很困难,无从下手。以前也是尝试着去看一些经典的开源代码,但都没能坚持下来,有时候人大概是会高估自己的能力的,好多东西自以为看个一两遍就能看懂,其实不然,好多知识只有不断的去研究你才能参透其中的原理。
一定要持续的读,不然过几天之后就忘了前面读的内容
一定要多总结,总结就是在不断的读的过程,从第一遍读通到你把它表述出来至少需要再读 5 -10 次
多思考,这段时间在地铁上 / 跑步的时候我会回向一下其中的流程
分享 (读懂是一个层面,写出来是一个层面,讲给别人听是另外一个层面)
后面我会先看下 go-nsqd 部分的代码,之后会研究下 gnatsd, 两个都是 cloud native 的消息系统,看下有啥区别。

正文完
 0