关于nsq:Nsq-消息队列的使用

nsq 音讯队列的应用: https://nsq.io/overview/quick... 下载 nsq https://nsq.io/deployment/ins... nsqlookupdnsqd --lookupd-tcp-address=127.0.0.1:4160nsqadmin --lookupd-http-address=127.0.0.1:4161发送音讯到音讯队列中curl -d "hello world" "http://127.0.0.1:4151/pub?topic=test"音讯存储到磁盘nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161再次发送音讯到队列中curl -d "hello world 2" "http://127.0.0.1:4151/pub?topic=test"7.拜访nsq 管理员界面http://127.0.0.1:4171/

March 11, 2022 · 1 min · jiezi

golangnsq系列一初识

nsq 最初是由 bitly 公司开源出来的一款简单易用的分布式消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。 它具有以下特性: 分布式。它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。 易于扩展。它支持水平扩展,没有中心化的消息代理(Broker),内置的发现服务让集群中增加节点非常容易。 运维方便。它非常容易配置和部署,灵活性高。 高度集成。现在已经有官方的Golang、Python和JavaScript客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。 1. 首先到官方文档看用法:https://nsq.io/overview/quick... 下载对应的二进制可执行文件,在本地按照上述步骤就可以跑起来了,看下nsqadmin 后台展示如下: 2. docker 环境搭建 nsq参考官方提供资料创建:docker-compose.yml version: '2' # 高版本支持3services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160:4160" # tcp - "4161:4161" # http nsqd: image: nsqio/nsq # 广播地址不填的话默认就是oshostname(或虚拟机名称),那样 lookupd 会连接不上,所以直接写IP command: /nsqd --broadcast-address=10.236.92.208 --lookupd-tcp-address=nsqlookupd:4160 depends_on: - nsqlookupd ports: - "4150:4150" # tcp - "4151:4151" # http nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "4171:4171" # http执行 docker-compose up -d 生成对应的三个容器: ...

August 18, 2019 · 2 min · jiezi

How-we-redesign-the-NSQNSQ重塑之客户端

overview有赞的自研版 NSQ 在高可用性以及负载均衡方面进行了改造,自研版的 nsqd 中引入了数据分区以及副本,副本保存在不同的 nsqd 上,达到容灾目的。此外,自研版 NSQ 在原有 Protocol Spec 基础上进行了拓展,支持基于分区的消息生产、消费,以及基于消息分区的有序消费,以及消息追踪功能。 为了充分支持自研版 NSQ 新功能,在要构建 NSQ client 时,需要在兼容原版 NSQ 的基础上,实现额外的设计。本文作为《Building Client Libraries》的拓展,为构建有赞自研版 NSQ client 提供指引。 参考 NSQ 官方的构造 client 指南的结构,接下来的文章分为如下部分: 1 workflow 及配置2 nsqd 发现服务3 nsqd 建连4 发送/接收消息5 顺序消费 本文根据有赞自研版 nsq 的新特性,对 nsq 文档1中构建 nsq client 的专题进行补充。在阅读《Building Client Libraries》的基础上阅读本文,更有助于理解。workflow 及配置通过一张图,浏览一下 nsq client 的工作流程。 client 作为消息 producer 或者 consumer 启动后,负责 lookup 的功能通过 nsqlookupd 进行 nsqd 服务发现。对于服务发现返回的 nsqd 节点,client 进行建连操作以及异常处理。nsq 连接建立后,producer 进行消息发送,consumer 则监听端口接收消息。同时,client 负责响应来自 nsqd 的心跳,以保持连接不被断开。在和 nsqd 消息通信过程中,client 通过 lookup 发现,持续更新 nsq 集群中 topic 以及节点最新信息,并对连接做相应更新操作。当消息发送/消费结束时,client 负责关闭相应 nsqd 连接。文章在接下来讨论这一流程中的关键步骤,对相应功能的实现做更详细的说明。 ...

May 14, 2019 · 4 min · jiezi

NSQ源码-NSQD

看完了nsqlookupd我们继续往下看, nsqd才是他的核心. 里面大量的使用到了go channel, 相信看完之后对你学习go有很大的帮助.相较于lookupd部分无论在代码逻辑和实现上都要复杂很多. 不过基本的代码结构基本上都是一样的, 进程使用go-srv来管理, Main里启动一个http sever和一个tcp server, 这里可以参考下之前文章的进程模型小节, 不过在nsqd中会启动另外的两个goroutine queueScanLoop和lookupLoop。下面是一个具体的进程模型。后面的分析都是基于这个进程模型。NSQD的启动启动时序这块儿大体上和lookupd中的一致, 我们下面来看看lookupLoop和queueScanLoop.lookupLoop代码见nsqd/lookup.go中 主要做以下几件事情:和lookupd建立连接(这里是一个长连接)每隔15s ping一下lookupd新增或者删除topic的时候通知到lookupd新增或者删除channel的时候通知到lookupd动态的更新options由于设计到了nsq里的in-flight/deferred message, 我们把queueScanLoop放到最后来看.一条message的LifeLine下面我们就通过一条message的生命周期来看下nsqd的工作原理. 根据官方的QuickStart, 我们可以通过curl来pub一条消息.curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/pub?topic=test’http handler我们就跟着代码看一下, 首先是http对此的处理:// nsq/nsqd/http.gofunc (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { … reqParams, topic, err := s.getTopicFromQuery(req) // 从http query中拿到topic信息 …}// nsq/nsqd/http.gofunc (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) { reqParams, err := url.ParseQuery(req.URL.RawQuery) topicNames, ok := reqParams[“topic”] return reqParams, s.ctx.nsqd.GetTopic(topicName), nil}// nsq/nsqd/nsqd.go// GetTopic performs a thread safe operation// to return a pointer to a Topic object (potentially new)func (n *NSQD) GetTopic(topicName string) *Topic { // 1. 首先查看n.topicMap,确认该topic是否已经存在(存在直接返回) t, ok := n.topicMap[topicName] // 2. 否则将新建一个topic t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t // 3. 查看该nsqd是否设置了lookupd, 从lookupd获取该tpoic的channel信息 // 这个topic/channel已经通过nsqlookupd的api添加上去的, 但是nsqd的本地 // 还没有, 针对这种情况我们需要创建该channel对应的deffer queue和inFlight // queue. lookupdHTTPAddrs := n.lookupdHTTPAddrs() if len(lookupdHTTPAddrs) > 0 { channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) } // now that all channels are added, start topic messagePump // 对该topic的初始化已经完成下面就是message t.Start() return t}topic messagePump在上面消息初始化完成之后就启动了tpoic对应的messagePump// nsq/nsqd/topic.go// messagePump selects over the in-memory and backend queue and// writes messages to every channel for this topicfunc (t *Topic) messagePump() { // 1. do not pass messages before Start(), but avoid blocking Pause() // or GetChannel() // 等待channel相关的初始化完成,GetTopic中最后的t.Start()才正式启动该Pump // 2. main message loop // 开始从Memory chan或者disk读取消息 // 如果topic对应的channel发生了变化,则更新channel信息 // 3. 往该tpoic对应的每个channel写入message(如果是deffermessage // 的话放到对应的deffer queue中 // 否则放到该channel对应的memoryMsgChan中)。}至此也就完成了从tpoic memoryMsgChan收到消息投递到channel memoryMsgChan的投递, 我们先看下http收到消息到通知pump处理的过程。// nsq/nsqd/http.gofunc (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { … msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, “EXITING”} } return “OK”, nil}// nsq/nsqd/topic.go// PutMessage writes a Message to the queuefunc (t *Topic) PutMessage(m *Message) error { t.RLock() defer t.RUnlock() if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New(“exiting”) } err := t.put(m) if err != nil { return err } atomic.AddUint64(&t.messageCount, 1) return nil}func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, “TOPIC(%s) ERROR: failed to write message to backend - %s”, t.name, err) return err } } return nil}这里memoryMsgChan的大小我们可以通过–mem-queue-size参数来设置,上面这段代码的流程是如果memoryMsgChan还没有满的话就把消息放到memoryMsgChan中,否则就放到backend(disk)中。topic的mesasgePump检测到有新的消息写入的时候就开始工作了,从memoryMsgChan/backend(disk)读取消息投递到channel对应的chan中。 还有一点请注意就是messagePump中 if len(chans) > 0 && !t.IsPaused() { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() }这段代码只有channel(此channel非golang里的channel而是nsq的channel类似nsq_to_file)存在的时候才会去投递。上面部分就是msg从producer生产消息到吧消息写到memoryChan/Disk的过程,下面我们来看下consumer消费消息的过程。首先是consumer从nsqlookupd查询到自己所感兴趣的topic/channel的nsqd信息, 然后就是来连接了。tcp handler对新的client的处理//nsq/internal/protocol/tcp_server.gofunc TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { go handler.Handle(clientConn)}//nsq/nsqd/tcp.gofunc (p *tcpServer) Handle(clientConn net.Conn) { prot.IOLoop(clientConn)}针对每个client起一个messagePump吧msg从上面channel对应的chan 写入到consumer侧//nsq/nsqd/protocol_v2.gofunc (p *protocolV2) IOLoop(conn net.Conn) error { client := newClientV2(clientID, conn, p.ctx) p.ctx.nsqd.AddClient(client.ID, client) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) // read the request line, err = client.Reader.ReadSlice(’\n’) response, err = p.Exec(client, params) p.Send(client, frameTypeResponse, response)}//nsq/nsqd/protocol_v2.gofunc (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { switch { case bytes.Equal(params[0], []byte(“FIN”)): return p.FIN(client, params) case bytes.Equal(params[0], []byte(“RDY”)): return p.RDY(client, params) case bytes.Equal(params[0], []byte(“REQ”)): return p.REQ(client, params) case bytes.Equal(params[0], []byte(“PUB”)): return p.PUB(client, params) case bytes.Equal(params[0], []byte(“MPUB”)): return p.MPUB(client, params) case bytes.Equal(params[0], []byte(“DPUB”)): return p.DPUB(client, params) case bytes.Equal(params[0], []byte(“NOP”)): return p.NOP(client, params) case bytes.Equal(params[0], []byte(“TOUCH”)): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte(“SUB”)): return p.SUB(client, params) case bytes.Equal(params[0], []byte(“CLS”)): return p.CLS(client, params) case bytes.Equal(params[0], []byte(“AUTH”)): return p.AUTH(client, params) }}//nsq/nsqd/protocol_v2.gofunc (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { var channel *Channel topic := p.ctx.nsqd.GetTopic(topicName) channel = topic.GetChannel(channelName) channel.AddClient(client.ID, client) // 通知messagePump开始工作 client.SubEventChan <- channel通知topic的messagePump开始工作func (t *Topic) GetChannel(channelName string) *Channel { t.Lock() channel, isNew := t.getOrCreateChannel(channelName) t.Unlock() if isNew { // update messagePump state select { case t.channelUpdateChan <- 1: case <-t.exitChan: } } return channel}message 对应的Pumpfunc (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { for { if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages… // 等待client ready,并且channel的初始化完成 flushed = true } else if flushed { // last iteration we flushed… // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { // we’re buffered (if there isn’t any more data we should flush)… // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // if this case wins, we’re either starved // or we won the race between other channels… // in either case, force flush case <-client.ReadyStateChan: case subChannel = <-subEventChan: // you can’t SUB anymore // channel初始化完成,pump开始工作 subEventChan = nil case identifyData := <-identifyEventChan: // you can’t IDENTIFY anymore case <-heartbeatChan: // heartbeat的处理 case b := <-backendMsgChan: // 1. decode msg // 2. 把msg push到Flight Queue里 // 3. send msg to client case msg := <-memoryMsgChan: // 1. 把msg push到Flight Queue里 // 2. send msg to client case <-client.ExitChan: // exit the routine } }至此我们看的代码就是一条消息从pub到nsqd中到被消费者处理的过程。不过得注意一点,我们在上面的代码分析中,创建topic/channel的部分放到了message Pub的链上, 如果是没有lookupd的模式的话这部分是在client SUB链上的。topic/hannel的管理在NSQ内部通过type NSQD struct { topicMap map[string]*Topic}和type Topic struct { channelMap map[string]*Channel}来维护一个内部的topic/channel状态,然后在提供了如下的接口来管理topic和channel/topic/create - create a new topic/topic/delete - delete a topic/topic/empty - empty a topic/topic/pause - pause message flow for a topic/topic/unpause - unpause message flow for a topic/channel/create - create a new channel/channel/delete - delete a channel/channel/empty - empty a channel/channel/pause - pause message flow for a channel/channel/unpause - unpause message flow for a channelcreate topic/channel的话我们在之前的代码看过了,这里可以重点看下topic/channel delete的时候怎样保证数据优雅的删除的,以及messagePump的退出机制。queueScanLoop的工作// queueScanLoop runs in a single goroutine to process in-flight and deferred// priority queues. It manages a pool of queueScanWorker (configurable max of// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.//// It copies Redis’s probabilistic expiration algorithm: it wakes up every// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount// (default: 20) channels from a locally cached list (refreshed every// QueueScanRefreshInterval (default: 5s)).//// If either of the queues had work to do the channel is considered “dirty”.//// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,// the loop continues without sleep.这里的注释已经说的很明白了,queueScanLoop就是通过动态的调整queueScanWorker的数目来处理in-flight和deffered queue的。在具体的算法上的话参考了redis的随机过期算法。总结阅读源码就是走走停停的过程,从一开始的无从下手到后面的一点点的把它啃透。一开始都觉得很困难,无从下手。以前也是尝试着去看一些经典的开源代码,但都没能坚持下来,有时候人大概是会高估自己的能力的,好多东西自以为看个一两遍就能看懂,其实不然,好多知识只有不断的去研究你才能参透其中的原理。 一定要持续的读,不然过几天之后就忘了前面读的内容 一定要多总结, 总结就是在不断的读的过程,从第一遍读通到你把它表述出来至少需要再读5-10次 多思考,这段时间在地铁上/跑步的时候我会回向一下其中的流程 分享(读懂是一个层面,写出来是一个层面,讲给别人听是另外一个层面)后面我会先看下go-nsqd部分的代码,之后会研究下gnatsd, 两个都是cloud native的消息系统,看下有啥区别。 ...

December 1, 2018 · 5 min · jiezi

NSQ源码-nsqlookupd

为什么选择nsq之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点rabbitmq的代码,以及后来的emqtt都看过一点, 所以对消息队列这块是情有独钟。转到go后也在关注消息队列这块,nsq是一个golng的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。nsq的架构与代码结构nsq的的话主要有三个模块构成, 这里直接复制官方的介绍:nsqd: is the daemon that receives, queues, and delivers messages to clients.nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).这里是一个消息投递的过程, 显示了消息怎么从nsqd到达consumer, 缺少了producer和nsqlookupd. nsqlookupd主要提供了两个功能:向nsqd提供一个topic和channel的注册信息对consumser提供了toic和channel的查询功能然后consumer查询到nsqd之后就是上面看到的动态图了, consumer直接和nsqd通信, 下面是一个更全面一点的时序图整个项目的代码结构也是围绕上面的三个模块构建:internal(公共部分的实现)nsqadmin(对nsqadmin的时间)nsqd(对nsqd的实现)nsqlookupd(对nsqlookupd的实现)总共也就这四个package,是不是有很想看下去的冲动(smile).lookupd的启动流程经过上面的介绍,我们对lookupd有里简单的认识.首先他是一个独立的进程, 为topic和channel的发现服务. 但不参与时间的消息投递. 对lookup的实现是在nsq/apps/nsqlookupd/nsqlookupd.go和nsq/nsqlookupd/中. lookupd的启动是使用了一个叫go-srv的windows wrapper.通过在nsq/apps/nsqlookupd/nsqlookupd.go中实现:type Service interface { // Init is called before the program/service is started and after it’s // determined if the program is running as a Windows Service. Init(Environment) error // Start is called after Init. This method must be non-blocking. Start() error // Stop is called in response to os.Interrupt, os.Kill, or when a // Windows Service is stopped. Stop() error}来完成整个进程的管理,go-srv帮助我们做了系统信号的管理, 下面来看下lookupd的启动流程,实例化一个NSQLookupd对象// apps/nsqlookupd/nsqlookupd.go daemon := nsqlookupd.New(opts) // 实例化一个NSQLookupd的对象 err := daemon.Main() // 开始启动NSQLookupd // nsq/nsqlookupd/nsqlookupd.gofunc New(opts *Options) *NSQLookupd { …. n := &NSQLookupd{ opts: opts, // 启动参数 DB: NewRegistrationDB(), // 内从里面的一个数据库,主要用来存储tpoic/channel以及nsqd的消息 } … return n}开始启动// Main starts an instance of nsqlookupd and returns an// error if there was a problem starting up.func (l *NSQLookupd) Main() error { ctx := &Context{l} // 启动两场go routine来处理tcp/http的请求 tcpListener, err := net.Listen(“tcp”, l.opts.TCPAddress) if err != nil { return fmt.Errorf(“listen (%s) failed - %s”, l.opts.TCPAddress, err) } httpListener, err := net.Listen(“tcp”, l.opts.HTTPAddress) if err != nil { return fmt.Errorf(“listen (%s) failed - %s”, l.opts.TCPAddress, err) } l.tcpListener = tcpListener l.httpListener = httpListener tcpServer := &tcpServer{ctx: ctx} l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.logf) }) httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { http_api.Serve(httpListener, httpServer, “HTTP”, l.logf) }) return nil}下面是一个lookupd里面的进程模型lookupd里的主要数据结构在上面创建一个instance的时候我们看到创建一个NewRegistrationDB()的函数, 这里就是存储lookupd所有数据结构的地方.每个topic/channe/clientl就是一个Registration的key, 然后value对应的就是该topic/channel对应的nsqd信息.所有的接口都是在操作上面的那个数据结构.lookupd和其他模块的交互在进程模型中我们看到一个tcp server和一个http seerver, 和其他模块之间的交互都是在里面完成的.看下tcp server的处理有新的tcp连接进来,创建一个新的go routine去服务该请求// /nsq/internal/tcp_server.gofunc TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { for { … go handler.Handle(clientConn) }实例化一个protocol对象// /nsq/nsqlookupd/tcp.gofunc (p *tcpServer) Handle(clientConn net.Conn) { … prot.IOLoop(clientConn) …}对请求的具体处理// /nsq/nsqlookupd/lookup_protocol_v1.gofunc (p *LookupProtocolV1) IOLoop(conn net.Conn) error { … p.Exec(client, reader, params) …}// /nsq/nsqlookupd/lookup_protocol_v1.gofunc (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case “PING”: // NSQD的心跳包 return p.PING(client, params) case “IDENTIFY”: // NQSD启动时候的indentify就是我们上面看到的peerInfo return p.IDENTIFY(client, reader, params[1:]) case “REGISTER”: // 注册topic/channel信息到lookupd return p.REGISTER(client, reader, params[1:]) case “UNREGISTER”: // unregister topic/lookup 信息 return p.UNREGISTER(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, “E_INVALID”, fmt.Sprintf(“invalid command %s”, params[0]))}上面就是整个tcp server的流程, 每个连接都是一个go routine. 相对tcp server来说的话http server就简单很多, 如果你对httprouter熟悉的话就更简单了就是对RegistrationDB的增删查改. http测的api的话可以参考:官方的文档总结lookupd是其中比较简单的模块,通过源码的学习我们可以更好的掌握go的一些技巧,也鼓励大家通过一一些开源的代码来掌握语言的一些技巧。其实通过lookupd我们可以抽象一套自己的HTTP/TCP服务端架构来。 ...

December 1, 2018 · 2 min · jiezi