Nsq-原理分析一

45次阅读

共计 16008 个字符,预计需要花费 41 分钟才能阅读完成。

Nsq 是用 go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理、学习 golang channel 知识以及如何用 go 来写分布式,为什么说适合小型小型项目使用因为,nsq 如果没有能力进行二次开发的情况存在的问题还是很多的。


Nsq 模块介绍

nsqd:是一个进程监听了 http、tcp 两种协议,用来创建 topic、channel,分发消息给消费者,向 nsqlooup 注册自己的元数据信息(topic、channel、consumer),自己的服务信息,最核心模块。

nsqlookup:存储了 nsqd 的元数据和服务信息(endpoind),向消费者提供服务发现功能,向 nsqadmin 提供数据查询功能。

nsqadmin:简单的管理界面,展示了 topic、channel 以及 channel 上的消费者,也可以创建 topic、channel

摘自官网
生产者向某个 topic 中发送消息,如果 topic 有一个或者多个 channle,那么该消息会被复制多分发送到每一个 channel 中。类似 rabbitmq 中的 fanout 类型,channle 类似队列。
官方说 nsq 是分布式的消息队列服务,但是在我看来只有 channel 到消费者这部分提现出来分布式的感觉,nsqd 这个模块其实就是单点的,nsqd 将 topic、channel、以及消息都存储在了本地磁盘,官方还建议一个生产者使用一个 nsqd,这样不仅浪费资源还没有数据备份的保障。一旦 nsqd 所在的主机磁损坏,数据都将丢失。

Nsq 源码分析

先部署一个简单的环境,以 centos 操作系统为例

下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
解压
tar xvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
cd nsq-1.2.0.linux-amd64.go1.12.9/bin
cp * /bin

启动三个终端,一个用来启动 nsqadmin(管理界面)、nsqlookup(nsqd 服务以及元数据管理)、nsqd(nsq 核心模块,元数据、消息存储以及消息分发), ip 换成自己的真实 ip

终端 1
/bin/nsqd --lookupd-tcp-address 192.168.1.1:4160 -tcp-address 0.0.0.0:4152 -http-address 0.0.0.0:4153  --broadcast-address 192.168.1.1
终端 2
/bin/nsqlookupd --broadcast-address 192.168.1.1:4160
终端 3
/bin/nsqadmin --lookupd-http-address 192.168.1.1:4160

看一下 nsq 的简单使用

cat producer.go
package main
import "github.com/nsqio/go-nsq"
config := nsq.NewConfig()
p, _ := nsq.NewProducer(addr, config)
err := p.Publish("topic", []byte("message"))
if err != nil {fmt.Printf("dispatch task failed %s", err)
}

cat consumer.go
package main
import "github.com/nsqio/go-nsq"

type MyHandler struct {}

func (h *MyHandler) HandleMessage(message *nsq.Message) error {fmt.Printf("consume message %+v\n", message)
}

config := nsq.NewConfig()
c, _ := nsq.NewConsumer("topic", "channel", config)
c.SetLoggerLevel(nsq.LogLevelDebug)
handler := &MyHandler{}
c.AddHandler(handler)
// 这里端口是 4161 是 nsqlookup 的 http 端口,nsqd 和 nsqlookup 都同时监听了 tcp 和 http 两个协议
err := c.ConnectToNSQLookupd("192.168.1.1:4161")
if err != nil {fmt.Printf("Connect nsq lookup failed %+v\n", err)
}

1. 生产者代码分析

go-nsq/producer.go

// After Config is passed into NewProducer the values are no longer mutable (they are copied).
func NewProducer(addr string, config *Config) (*Producer, error) {err := config.Validate()
    if err != nil {return nil, err}

    p := &Producer{id: atomic.AddInt64(&instCount, 1),

        addr:   addr,
        config: *config,

        logger: make([]logger, int(LogLevelMax+1)),
        logLvl: LogLevelInfo,

        transactionChan: make(chan *ProducerTransaction),
        exitChan:        make(chan int),
        responseChan:    make(chan []byte),
        errorChan:       make(chan []byte),
    }

    // Set default logger for all log levels
    l := log.New(os.Stderr, "", log.Flags())
    for index, _ := range p.logger {p.logger[index] = l
    }
    return p, nil
}

初始化了 Producer 的结构体

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {return w.sendCommand(Publish(topic, body))
}

指定要往哪个 topic 中发送消息以及要发送的消息

// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {var params = [][]byte{[]byte(topic)}
    return &Command{[]byte("PUB"), params, body}
}

封装了命令

func (w *Producer) sendCommand(cmd *Command) error {doneChan := make(chan *ProducerTransaction)
    // 内部使用了异步发送的方式
    err := w.sendCommandAsync(cmd, doneChan, nil)
    if err != nil {close(doneChan)
        return err
    }
    // 等待异步发送完成
    t := <-doneChan
    return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    args []interface{}) error {
    // keep track of how many outstanding producers we're dealing with
    // in order to later ensure that we clean them all up...
    atomic.AddInt32(&w.concurrentProducers, 1)
    defer atomic.AddInt32(&w.concurrentProducers, -1)
    // 判断有没有和 nsqd 建立连接,已经建立跳过
    if atomic.LoadInt32(&w.state) != StateConnected {err := w.connect()
        if err != nil {return err}
    }

    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }

    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }
    return nil
}

在上面这段代码中依然没有看到将 PUB command 发送给 nsqd 进程的代码,我们看一下那个 connect 函数

func (w *Producer) connect() error {w.guard.Lock()
    defer w.guard.Unlock()

    if atomic.LoadInt32(&w.stopFlag) == 1 {return ErrStopped}

    switch state := atomic.LoadInt32(&w.state); state {
    case StateInit:
    case StateConnected:
        return nil
    default:
        return ErrNotConnected
    }

    w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

    w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
    w.conn.SetLoggerLevel(w.getLogLevel())
    format := fmt.Sprintf("%3d (%%s)", w.id)
    for index := range w.logger {w.conn.SetLoggerForLevel(w.logger[index], LogLevel(index), format)
    }
    // 这个主要是消费者在使用。在消费者部分会详细分析
    _, err := w.conn.Connect()
    if err != nil {w.conn.Close()
        w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
        return err
    }
    atomic.StoreInt32(&w.state, StateConnected)
    w.closeChan = make(chan int)
    w.wg.Add(1)
    // 生产者利用这个 goroutine 向 nsqd 发送命令和接收响应
    go w.router()

    return nil
}
func (w *Producer) router() {
    for {
        select {
        // 在上面的 sendCommandAsync 这个方法中只看到了将待发送的命令又包装了一下扔到了一个 channel 中,这里在监听,以及将命令发送给 nsqd
        case t := <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err := w.conn.WriteCommand(t.cmd)
            if err != nil {w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
                w.close()}
            // 接收 nsqd 的响应
        case data := <-w.responseChan:
            w.popTransaction(FrameTypeResponse, data)
        case data := <-w.errorChan:
            w.popTransaction(FrameTypeError, data)
        case <-w.closeChan:
            goto exit
        case <-w.exitChan:
            goto exit
        }
    }

exit:
    w.transactionCleanup()
    w.wg.Done()
    w.log(LogLevelInfo, "exiting router")
}

2. 消费者代码分析

// NewConsumer creates a new instance of Consumer for the specified topic/channel
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into NewConsumer the values are no longer mutable (they are copied).
// 指定要监听的订阅的 topic 和 channel
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) {if err := config.Validate(); err != nil {return nil, err}

    if !IsValidTopicName(topic) {return nil, errors.New("invalid topic name")
    }

    if !IsValidChannelName(channel) {return nil, errors.New("invalid channel name")
    }

    r := &Consumer{id: atomic.AddInt64(&instCount, 1),

        topic:   topic,
        channel: channel,
        config:  *config,

        logger:      make([]logger, LogLevelMax+1),
        logLvl:      LogLevelInfo,
        maxInFlight: int32(config.MaxInFlight),

        incomingMessages: make(chan *Message),

        rdyRetryTimers:     make(map[string]*time.Timer),
        pendingConnections: make(map[string]*Conn),
        connections:        make(map[string]*Conn),

        lookupdRecheckChan: make(chan int, 1),

        rng: rand.New(rand.NewSource(time.Now().UnixNano())),

        StopChan: make(chan int),
        exitChan: make(chan int),
    }

    // Set default logger for all log levels
    l := log.New(os.Stderr, "", log.Flags())
    for index := range r.logger {r.logger[index] = l
    }

    r.wg.Add(1)
    // 因为 nsq 是推送 push 的方式消费消息,所以早消费者端会控制消费的速度,限流作用,可以配置可以自动更新
    go r.rdyLoop()
    return r, nil
}

初始化 Consumer 结构体

初始化后需要添加消息处理函数 AddHandler

// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddHandler(handler Handler) {r.AddConcurrentHandlers(handler, 1)
}

// AddConcurrentHandlers sets the Handler for messages received by this Consumer.  It
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {if atomic.LoadInt32(&r.connectedFlag) == 1 {panic("already connected")
    }

    atomic.AddInt32(&r.runningHandlers, int32(concurrency))
    for i := 0; i < concurrency; i++ {
        // 可以设置并发
        go r.handlerLoop(handler)
    }
}

func (r *Consumer) handlerLoop(handler Handler) {r.log(LogLevelDebug, "starting Handler")

    for {
        // 不断的接收 nsqd 发送过来的请求,readloop 这个死循环方法会向这个 channel 仍消息进来,后面我们会说到
        message, ok := <-r.incomingMessages
        if !ok {goto exit}

        if r.shouldFailMessage(message, handler) {message.Finish()
            continue
        }
       // 使用我们添加的消息处理函数来消费消息
        err := handler.HandleMessage(message)
        if err != nil {r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
            if !message.IsAutoResponseDisabled() {message.Requeue(-1)
            }
            continue
        }
       // 当一条消息处理完成是否从队列中移除,相当于提交,默认消费完一条消息自动提交,可以设置批量提交
        if !message.IsAutoResponseDisabled() {message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(&r.runningHandlers, -1) == 0 {r.exit()
    }
}

func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
    // message passed the max number of attempts
    if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
        r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
            message.ID, message.Attempts)

        logger, ok := handler.(FailedMessageLogger)
        if ok {logger.LogFailedMessage(message)
        }

        return true
    }
    return false
}

func (r *Consumer) exit() {r.exitHandler.Do(func() {close(r.exitChan)
        r.wg.Wait()
        close(r.StopChan)
    })
}
// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
// If it is the first to be added, it initiates an HTTP request to discover nsqd
// producers for the configured topic.
//
// A goroutine is spawned to handle continual polling.
func (r *Consumer) ConnectToNSQLookupd(addr string) error {if atomic.LoadInt32(&r.stopFlag) == 1 {return errors.New("consumer stopped")
    }
    if atomic.LoadInt32(&r.runningHandlers) == 0 {return errors.New("no handlers")
    }

    if err := validatedLookupAddr(addr); err != nil {return err}

    atomic.StoreInt32(&r.connectedFlag, 1)

    r.mtx.Lock()
    for _, x := range r.lookupdHTTPAddrs {
        if x == addr {r.mtx.Unlock()
            return nil
        }
    }
    r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    numLookupd := len(r.lookupdHTTPAddrs)
    r.mtx.Unlock()

    // if this is the first one, kick off the go loop
    if numLookupd == 1 {r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()}

    return nil
}

消费者需要连接到 nsqlookup,从 nsqlookup 中查询到 nsqd 的服务信息,然后进行连接

// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
    retries := 0

retry:
    endpoint := r.nextLookupdEndpoint()

    r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)

    var data lookupResp
    err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
    if err != nil {r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
        retries++
        if retries < 3 {r.log(LogLevelInfo, "retrying with next nsqlookupd")
            goto retry
        }
        return
    }

    var nsqdAddrs []string
    for _, producer := range data.Producers {
        broadcastAddress := producer.BroadcastAddress
        port := producer.TCPPort
        joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    // apply filter
    if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
    }
    // 获取 nsqlookup 中所以的 nsqd 信息,然后进行连接
    for _, addr := range nsqdAddrs {err = r.ConnectToNSQD(addr)
        if err != nil && err != ErrAlreadyConnected {r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
            continue
        }
    }
}

官方不建议消费者端直接连接 nsqd,

// ConnectToNSQD takes a nsqd address to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically.  This method is useful when you want to connect to a single, local,
// instance.
func (r *Consumer) ConnectToNSQD(addr string) error {if atomic.LoadInt32(&r.stopFlag) == 1 {return errors.New("consumer stopped")
    }

    if atomic.LoadInt32(&r.runningHandlers) == 0 {return errors.New("no handlers")
    }

    atomic.StoreInt32(&r.connectedFlag, 1)
    // 初始化
    conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
    conn.SetLoggerLevel(r.getLogLevel())
    format := fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)
    for index := range r.logger {conn.SetLoggerForLevel(r.logger[index], LogLevel(index), format)
    }
    r.mtx.Lock()
    _, pendingOk := r.pendingConnections[addr]
    _, ok := r.connections[addr]
    if ok || pendingOk {r.mtx.Unlock()
        return ErrAlreadyConnected
    }
    r.pendingConnections[addr] = conn
    if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    }
    r.mtx.Unlock()

    r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)

    cleanupConnection := func() {r.mtx.Lock()
        delete(r.pendingConnections, addr)
        r.mtx.Unlock()
        conn.Close()}
    // 进行连接,在分析生产者时看到过,这里是 consumer 和 nsqd 建立了连接的地方
    resp, err := conn.Connect()
    if err != nil {cleanupConnection()
        return err
    }

    if resp != nil {if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
            r.log(LogLevelWarning,
                "(%s) max RDY count %d < consumer max in flight %d, truncation possible",
                conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
        }
    }
    // consumer 向 nsqd 发送订阅命令,此时 consumer 会将自己注册到 nsqd 中,更准确的说法是 consumer 将自己注册到了 topic 下的 channel 的 client 列表中,有消息到来时 channle 会随机向自己的客户端列表发送消息
    cmd := Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)
    if err != nil {cleanupConnection()
        return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
            conn, r.topic, r.channel, err.Error())
    }

    r.mtx.Lock()
    delete(r.pendingConnections, addr)
    r.connections[addr] = conn
    r.mtx.Unlock()

    // pre-emptive signal to existing connections to lower their RDY count
    for _, c := range r.conns() {r.maybeUpdateRDY(c)
    }

    return nil

go-nsq/conn.go

// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
    dialer := &net.Dialer{
        LocalAddr: c.config.LocalAddr,
        Timeout:   c.config.DialTimeout,
    }
    // 生产者或者消费者在这里与 nsqd 建立 tcp 连接
    conn, err := dialer.Dial("tcp", c.addr)
    if err != nil {return nil, err}
    c.conn = conn.(*net.TCPConn)
    c.r = conn
    c.w = conn
    // 建立连接后先发送 4 字节信息表示使用哪种协议,目前有 v1 和 v2 两种协议
    _, err = c.Write(MagicV2)
    if err != nil {c.Close()
        return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
    }
    // 告诉 nsqd 关于自己的一些基本信息,比如心跳间隔、处理消息的超时、client id 等等
    resp, err := c.identify()
    if err != nil {return nil, err}

    if resp != nil && resp.AuthRequired {
        if c.config.AuthSecret == "" {c.log(LogLevelError, "Auth Required")
            return nil, errors.New("Auth Required")
        }
        err := c.auth(c.config.AuthSecret)
        if err != nil {c.log(LogLevelError, "Auth Failed %s", err)
            return nil, err
        }
    }

    c.wg.Add(2)
    atomic.StoreInt32(&c.readLoopRunning, 1)
    // 这两个 goroutine 很重要
    go c.readLoop()
    go c.writeLoop()
    return resp, nil
}
func (c *Conn) readLoop() {delegate := &connMessageDelegate{c}
    for {if atomic.LoadInt32(&c.closeFlag) == 1 {goto exit}
        // 从 nsqd 获取消息
        frameType, data, err := ReadUnpackedResponse(c)
        if err != nil {if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {goto exit}
            if !strings.Contains(err.Error(), "use of closed network connection") {c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
            }
            goto exit
        }
        // 心跳检测默认 30s 检查一次,后面会细说一下这里
        if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {c.log(LogLevelDebug, "heartbeat received")
            c.delegate.OnHeartbeat(c)
            err := c.WriteCommand(Nop())
            if err != nil {c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            continue
        }

        switch frameType {
        // 处理相应信息
        case FrameTypeResponse:
            c.delegate.OnResponse(c, data)
            // 接收消息进行消费
        case FrameTypeMessage:
            msg, err := DecodeMessage(data)
            if err != nil {c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            msg.Delegate = delegate
            msg.NSQDAddress = c.String()

            atomic.AddInt64(&c.messagesInFlight, 1)
            atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
             // 这里将从 nsqd 那边获取到的消息扔到了一个 channel 中,这个 channel 就是上面 handlerloop 死循环中在等待消息的 channel
            c.delegate.OnMessage(c, msg)
        case FrameTypeError:
            c.log(LogLevelError, "protocol error - %s", data)
            c.delegate.OnError(c, data)
        default:
            c.log(LogLevelError, "IO error - %s", err)
            c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
        }
    }

exit:
    atomic.StoreInt32(&c.readLoopRunning, 0)
    // start the connection close
    messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
    if messagesInFlight == 0 {
        // if we exited readLoop with no messages in flight
        // we need to explicitly trigger the close because
        // writeLoop won't
        c.close()} else {c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
    }
    c.wg.Done()
    c.log(LogLevelInfo, "readLoop exiting")
}
func (c *Conn) writeLoop() {
    for {
        select {
        case <-c.exitChan:
            c.log(LogLevelInfo, "breaking out of writeLoop")
            // Indicate drainReady because we will not pull any more off msgResponseChan
            close(c.drainReady)
            goto exit
        case cmd := <-c.cmdChan:
            err := c.WriteCommand(cmd)
            if err != nil {c.log(LogLevelError, "error sending command %s - %s", cmd, err)
                c.close()
                continue
            }
        case resp := <-c.msgResponseChan:
            // Decrement this here so it is correct even if we can't respond to nsqd
            msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)

            if resp.success {c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
                c.delegate.OnMessageFinished(c, resp.msg)
                c.delegate.OnResume(c)
            } else {c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
                c.delegate.OnMessageRequeued(c, resp.msg)
                if resp.backoff {c.delegate.OnBackoff(c)
                } else {c.delegate.OnContinue(c)
                }
            }

            err := c.WriteCommand(resp.cmd)
            if err != nil {c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
                c.close()
                continue
            }

            if msgsInFlight == 0 &&
                atomic.LoadInt32(&c.closeFlag) == 1 {c.close()
                continue
            }
        }
    }

exit:
    c.wg.Done()
    c.log(LogLevelInfo, "writeLoop exiting")
}

当消息处理完成 consumer 会通过 writeloop 向 nsqd 发送 FIN 命令,告诉 nsqd 我有哪些消息消费完成可以从队列中移除了。
其实上面是 go nsq 这个客户端的代码,还没有看到 nsq 本身的代码,先总结一下。然后继续看 nsqd 的代码
生产者

  1. 生产者先初始化 Producerj 结构体,然后设置一些配置
  2. 生产者和 nsqd 建立 tcp 连接
  3. 协商版本
  4. 生产者启动一个 route 协程,这个协程用来不断的向 nsqd 发送 PUB 指令,同时携带消息

消费者

  1. 消费者初始化 Consumer 结构体
  2. 消费者通过 nsqlookup 和 nsqd 建立 tcp 连接,nsqd 可能是一个也可能是多个
  3. 协商版本
  4. 建立连接后发送自己的识别信息给 nsqd,携带一些基本配置信息,比如心跳间隔、消息消费超时、客户端 id 等等
  5. 启动 RDY 限流机制
  6. 启动 readloop、writeloop

正文完
 0