网上看了好多,都是抄个官网 README,很多重要的货色不说分明。只好本人钻研了一下。

自己博客,关键词 Less-Bug.com ,欢送关注。

NSQ 的全家桶介绍

  • nsqd:守护过程,客户端通信。默认端口 4150(TCP) 4151(HTTP)
  • nsqlookupd:相当于一个路由器。客户端能够经由它发现生产者、nsqd 播送的话题。一个 nsqlookupd 可能治理一群 nsqd。默认端口::4160(TCP),:4161(HTTP)
  • nsqadmin:在线面板,可能通过浏览器间接拜访。默认端口 :4171

从命令行启动

能够间接下载二进制文件。开三个终端,别离执行:

nsqlookupdnsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1nsqadmin --lookupd-http-address=127.0.0.1:4161

go-nsq 的应用

我封装了一个包:

package mqimport (    "encoding/json"    "fmt"    "time"    "github.com/nsqio/go-nsq"    "go.uber.org/zap")type MessageQueueConfig struct {    NsqAddr         string    NsqLookupdAddr  string    SupportedTopics []string}type MessageQueue struct {    config    MessageQueueConfig    producer  *nsq.Producer    consumers map[string]*nsq.Consumer}func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {    zap.L().Debug("New message queue")    producer, err := initProducer(config.NsqAddr)    if err != nil {        return nil, err    }    consumers := make(map[string]*nsq.Consumer)    for _, topic := range config.SupportedTopics {        nsq.Register(topic,"default")        consumers[topic], err = initConsumer(topic, "default", config.NsqAddr)        if err != nil {            return        }    }    return &MessageQueue{        config:    config,        producer:  producer,        consumers: consumers,    }, nil}func (mq *MessageQueue) Run() {    for name, c := range mq.consumers {        zap.L().Info("Run consumer for " + name)        // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)        c.ConnectToNSQD(mq.config.NsqAddr)    }}func initProducer(addr string) (producer *nsq.Producer, err error) {    zap.L().Debug("initProducer to " + addr)    config := nsq.NewConfig()    producer, err = nsq.NewProducer(addr, config)        return}func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {    zap.L().Debug("initConsumer to " + topic + "/" + channel)    config := nsq.NewConfig()    config.LookupdPollInterval = 15 * time.Second    c, err = nsq.NewConsumer(topic, channel, config)    return}func (mq *MessageQueue) Pub(name string, data interface{}) (err error) {    body, err := json.Marshal(data)    if err != nil {        return    }    zap.L().Info("Pub " + name + " to mq. data = " + string(body))    return mq.producer.Publish(name, body)}type Messagehandler func(v []byte)func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) {    zap.L().Info("Subscribe " + name)    v, ok := mq.consumers[name]    if !ok {        err = fmt.Errorf("No such topic: " + name)        return    }    v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {        handler(message.Body)        return nil    }))    return}

应用示例:

    m, err := mq.NewMessageQueue(mq.MessageQueueConfig{        NsqAddr:         "127.0.0.1:4150",        NsqLookupdAddr:  "127.0.0.1:4161",        SupportedTopics: []string{"hello"},    })    if err != nil {        zap.L().Fatal("Message queue error: " + err.Error())    }    m.Sub("hello", func(resp []byte) {        zap.L().Info("S1 Got: " + string(resp))    })    m.Sub("hello", func(resp []byte) {        zap.L().Info("S2 Got: " + string(resp))    })    m.Run()    err = m.Pub("hello", "world")    if err != nil {        zap.L().Fatal("Message queue error: " + err.Error())    }    err = m.Pub("hello", "tom")    if err != nil {        zap.L().Fatal("Message queue error: " + err.Error())    }    sigChan := make(chan os.Signal, 1)    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)    <-sigChan    os.Exit(0);

次要是进行解耦合,这样万一咱们换成 Kalfa 之类的队列,就能够不必动业务代码。

输入后果:

2021-11-07T19:13:41.886+0800    DEBUG   mq/mq.go:29     New message queue2021-11-07T19:13:41.886+0800    DEBUG   mq/mq.go:58     initProducer to 127.0.0.1:41502021-11-07T19:13:41.887+0800    DEBUG   mq/mq.go:65     initConsumer to hello/default2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:84     Subscribe hello2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:84     Subscribe hello2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:51     Run consumer for hello2021/11/07 19:13:41 INF    2 [hello/default] (127.0.0.1:4150) connecting to nsqd2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:77     Pub hello to mq. data = "world"2021/11/07 19:13:41 INF    1 (127.0.0.1:4150) connecting to nsqd2021-11-07T19:13:41.888+0800    INFO    mq/mq.go:77     Pub hello to mq. data = "tom"2021-11-07T19:13:41.888+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "world"2021-11-07T19:13:41.888+0800    INFO    buqi-admin-backend/main.go:63   S2 Got: "tom"

从输入后果咱们能够确认一个事实,就是对于订阅了同一个 topic,同一个 channel 的不同消费者,当音讯涌入时,将会负载平衡——每个 Handler 只会收到一个音讯

遇到的问题

TOPIC_NOT_FOUND

遇到两个起因。

其一是大小写,Topic 名是大小写敏感的,因而 Hellohello 是两个不同的 topic,写代码时应该标准操作:抽取常量,并保护一个所有 Topic 的列表。

其二是 Topic 未创立。第一次 pub 之后,对应的 topic/channel 能力创立。倡议写个脚本调用 /topic/create 接口一次性创立好,不然前面第二次重试订阅的时候能力收到音讯,造成不可意料的提早。

发现客户端轮询 HTTP

这是因为 NsqLookupd 自身是一个中介,能够治理一堆不同 IP 的 nsqd,那么咱们就不可能永远只连贯一个 nsq,所以就要轮询来确认有哪些客户端。

对于小我的项目,能够绕过 NsqLookupd:

        // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)        c.ConnectToNSQD(mq.config.NsqAddr)

如何让多个消费者生产同一个 topic?

显然,依据 nsq 的机制,咱们须要让同一个 topic 的消费者应用不同的通道。一种办法是随机化 channel,比方应用一个递增量作为 channel 名。

第二种办法是依据用处定义 channel 名。

第三种办法:据说能够应用 AddConcurrentHandlers,尚未钻研。

第四种办法:咱们把 Handler 中介化,应用一个消费者去生产,然而手动将音讯送入应用层的一个自定义的流水线,让流水线的 filter 去解决音讯。我猜这样还能防止一些临界区问题。

咱们试一下第四种办法。(代码已公布到 GIST,Github 用户名 Pluveto)

实现流水线 Handler

package mqimport (    "encoding/json"    "fmt"    "time"    "github.com/nsqio/go-nsq"    "go.uber.org/zap")type MessageQueueConfig struct {    NsqAddr         string    NsqLookupdAddr  string    EnableLookupd   bool    SupportedTopics []string}type MessageQueue struct {    subscribers map[string]Subscriber    config      MessageQueueConfig    producer    *nsq.Producer}type Messagehandler func(v []byte) bool// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 niltype LinkedHandlerNode struct {    Handler  *Messagehandler    Index    int    NextNode *LinkedHandlerNode}type Subscriber struct {    HandlerHeadNode *LinkedHandlerNode    Consumer        *nsq.Consumer    Handler         nsq.HandlerFunc}func createProducer(addr string) (producer *nsq.Producer, err error) {    zap.L().Debug("initProducer to " + addr)    config := nsq.NewConfig()    producer, err = nsq.NewProducer(addr, config)    return}func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {    zap.L().Debug("initConsumer to " + topic + "/" + channel)    config := nsq.NewConfig()    config.LookupdPollInterval = 15 * time.Second    c, err = nsq.NewConsumer(topic, channel, config)    return}func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {        zap.L().Debug("New message queue")    producer, err := createProducer(config.NsqAddr)    if err != nil {        return nil, err    }    subscribers := make(map[string]Subscriber)    for _, topic := range config.SupportedTopics {        nsq.Register(topic, "default")        consumer, err := createConsumer(topic, "default", config.NsqAddr)        if err != nil {            return nil, err        }        // 头节点不参加理论应用,所以 Index = -1        headNode := &LinkedHandlerNode{Index: -1}        hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error {            // 循环链式调用各个 Handler            curNode := headNode.NextNode            // 当不存在任何用户定义的 Handler 时抛出正告            if(nil == curNode){                return fmt.Errorf("No handler provided!")            }            for nil != curNode {                msg := message.Body                zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic)                stop := (*curNode.Handler)(msg)                if stop {                    zap.S().Debugf("the message has stopped spreading ")                    break                }                curNode = curNode.NextNode            }            return nil        })        consumer.AddHandler(hubHandler)        subscribers[topic] = Subscriber{            Consumer:        consumer,            HandlerHeadNode: headNode,        }    }    return &MessageQueue{        config:      config,        producer:    producer,        subscribers: subscribers,    }, nil}func (mq *MessageQueue) Run() {    for name, s := range mq.subscribers {        zap.L().Info("Run consumer for " + name)        if mq.config.EnableLookupd {            s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)        } else {            s.Consumer.ConnectToNSQD(mq.config.NsqAddr)        }    }}func (mq *MessageQueue) IsTopicSupported(topic string) bool {    for _, v := range mq.config.SupportedTopics {        if v == topic {            return true        }    }    return false}// Pub 向音讯队列发送一个音讯func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) {    if !mq.IsTopicSupported(topic) {        err = fmt.Errorf("unsupported topic name: " + topic)        return    }    body, err := json.Marshal(data)    if err != nil {        return    }    zap.L().Info("Pub " + topic + " to mq. data = " + string(body))    return mq.producer.Publish(topic, body)}// Sub 从音讯队列订阅一个音讯func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) {    if !mq.IsTopicSupported(topic) {        err = fmt.Errorf("unsupported topic name: " + topic)        return    }    zap.L().Info("Subscribe " + topic)    subscriber, ok := mq.subscribers[topic]    if !ok {        err = fmt.Errorf("No such topic: " + topic)        return    }    // 到达最初一个无效链表节点    curNode := subscriber.HandlerHeadNode    for nil != curNode.NextNode {        curNode = curNode.NextNode    }    // 创立节点    curNode.NextNode = &LinkedHandlerNode{        Handler:  &handler,        Index:    1 + curNode.Index,        NextNode: nil,    }    return}

这里的思维是给每个消费者事后创立惟一的 Handler,这个 Handler 会顺次调用链表中的各个具体的 Handler。当用户订阅 Topic 时,将用户提供的 Handler 增加到链表开端。

应用示例:

    m, err := mq.NewMessageQueue(mq.MessageQueueConfig{        NsqAddr:         "127.0.0.1:4150",        NsqLookupdAddr:  "127.0.0.1:4161",        SupportedTopics: []string{"hello"},        EnableLookupd:   false,    })    if err != nil {        zap.L().Fatal("Message queue error: " + err.Error())    }    m.Sub("hello", func(resp []byte) bool {        zap.L().Info("S1 Got: " + string(resp))        return false    })    m.Sub("hello", func(resp []byte) bool {        zap.L().Info("S2 Got: " + string(resp))        return true    })    m.Sub("hello", func(resp []byte) bool {        zap.L().Info("S3 Got: " + string(resp))        return false    })    m.Run()    err = m.Pub("hello", "world")    if err != nil {        zap.L().Fatal("Message queue error: " + err.Error())    }    err = m.Pub("hello", "tom")    if err != nil {        zap.L().Fatal("Message queue error: " + err.Error())    }    sigChan := make(chan os.Signal, 1)    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)    <-sigChan    os.Exit(0)

输入:

2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:40     New message queue2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:89     initProducer to 127.0.0.1:41502021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:96     initConsumer to hello/default2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:82     Run consumer for hello2021/11/07 20:30:38 INF    2 [hello/default] (127.0.0.1:4150) connecting to nsqd2021-11-07T20:30:38.454+0800    INFO    mq/mq.go:108    Pub hello to mq. data = "world"2021/11/07 20:30:38 INF    1 (127.0.0.1:4150) connecting to nsqd2021-11-07T20:30:38.455+0800    INFO    mq/mq.go:108    Pub hello to mq. data = "tom"2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[0] for hello is invoked2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "world"2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[1] for hello is invoked2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:64   S2 Got: "world"2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:60     the message has stopped spreading 2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[0] for hello is invoked2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "tom"2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[1] for hello is invoked2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:64   S2 Got: "tom"2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:60     the message has stopped spreading ^C

能够看到,Handler 返回 true 时,就能够阻断音讯的流传。