本文次要钻研一下rocketmq-client-go的pushConsumer

pushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

type pushConsumer struct {    *defaultConsumer    queueFlowControlTimes        int    queueMaxSpanFlowControlTimes int    consumeFunc                  utils.Set    submitToConsume              func(*processQueue, *primitive.MessageQueue)    subscribedTopic              map[string]string    interceptor                  primitive.Interceptor    queueLock                    *QueueLock    done                         chan struct{}    closeOnce                    sync.Once}
  • pushConsumer定义了queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce属性

NewPushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func NewPushConsumer(opts ...Option) (*pushConsumer, error) {    defaultOpts := defaultPushConsumerOptions()    for _, apply := range opts {        apply(&defaultOpts)    }    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)    if err != nil {        return nil, errors.Wrap(err, "new Namesrv failed.")    }    if !defaultOpts.Credentials.IsEmpty() {        srvs.SetCredentials(defaultOpts.Credentials)    }    defaultOpts.Namesrv = srvs    if defaultOpts.Namespace != "" {        defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName    }    dc := &defaultConsumer{        client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),        consumerGroup:  defaultOpts.GroupName,        cType:          _PushConsume,        state:          int32(internal.StateCreateJust),        prCh:           make(chan PullRequest, 4),        model:          defaultOpts.ConsumerModel,        consumeOrderly: defaultOpts.ConsumeOrderly,        fromWhere:      defaultOpts.FromWhere,        allocate:       defaultOpts.Strategy,        option:         defaultOpts,        namesrv:        srvs,    }    p := &pushConsumer{        defaultConsumer: dc,        subscribedTopic: make(map[string]string, 0),        queueLock:       newQueueLock(),        done:            make(chan struct{}, 1),        consumeFunc:     utils.NewSet(),    }    dc.mqChanged = p.messageQueueChanged    if p.consumeOrderly {        p.submitToConsume = p.consumeMessageOrderly    } else {        p.submitToConsume = p.consumeMessageCurrently    }    p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)    return p, nil}
  • NewPushConsumer办法实例化defaultConsumer及pushConsumer

Start

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Start() error {    var err error    pc.once.Do(func() {        rlog.Info("the consumer start beginning", map[string]interface{}{            rlog.LogKeyConsumerGroup: pc.consumerGroup,            "messageModel":           pc.model,            "unitMode":               pc.unitMode,        })        atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))        pc.validate()        err = pc.client.RegisterConsumer(pc.consumerGroup, pc)        if err != nil {            rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{                rlog.LogKeyConsumerGroup: pc.consumerGroup,            })            err = ErrCreated            return        }        err = pc.defaultConsumer.start()        if err != nil {            return        }        go func() {            // todo start clean msg expired            for {                select {                case pr := <-pc.prCh:                    go func() {                        pc.pullMessage(&pr)                    }()                case <-pc.done:                    rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{                        rlog.LogKeyConsumerGroup: pc.consumerGroup,                    })                    return                }            }        }()        go primitive.WithRecover(func() {            // initial lock.            if !pc.consumeOrderly {                return            }            time.Sleep(1000 * time.Millisecond)            pc.lockAll()            lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)            defer lockTicker.Stop()            for {                select {                case <-lockTicker.C:                    pc.lockAll()                case <-pc.done:                    rlog.Info("push consumer close tick.", map[string]interface{}{                        rlog.LogKeyConsumerGroup: pc.consumerGroup,                    })                    return                }            }        })    })    if err != nil {        return err    }    pc.client.UpdateTopicRouteInfo()    for k := range pc.subscribedTopic {        _, exist := pc.topicSubscribeInfoTable.Load(k)        if !exist {            pc.client.Shutdown()            return fmt.Errorf("the topic=%s route info not found, it may not exist", k)        }    }    pc.client.CheckClientInBroker()    pc.client.SendHeartbeatToAllBrokerWithLock()    pc.client.RebalanceImmediately()    return err}
  • Start办法执行pc.client.RegisterConsumer及pc.defaultConsumer.start(),而后异步执行pc.pullMessage(&pr);对于非consumeOrderly则通过time.NewTicker创立lockTicker,执行pc.lockAll();之后执行pc.client.UpdateTopicRouteInfo()、pc.client.CheckClientInBroker()、pc.client.SendHeartbeatToAllBrokerWithLock()及pc.client.RebalanceImmediately()

Shutdown

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Shutdown() error {    var err error    pc.closeOnce.Do(func() {        close(pc.done)        pc.client.UnregisterConsumer(pc.consumerGroup)        err = pc.defaultConsumer.shutdown()    })    return err}
  • Shutdown办法则执行pc.client.UnregisterConsumer及pc.defaultConsumer.shutdown()

Subscribe

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,    f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {    if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {        return errors.New("subscribe topic only started before")    }    if pc.option.Namespace != "" {        topic = pc.option.Namespace + "%" + topic    }    data := buildSubscriptionData(topic, selector)    pc.subscriptionDataTable.Store(topic, data)    pc.subscribedTopic[topic] = ""    pc.consumeFunc.Add(&PushConsumerCallback{        f:     f,        topic: topic,    })    return nil}
  • Subscribe办法先通过buildSubscriptionData构建data,之后执行pc.subscriptionDataTable.Store(topic, data)及pc.consumeFunc.Add

pullMessage

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) pullMessage(request *PullRequest) {    rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{        rlog.LogKeyPullRequest: request.String(),    })    var sleepTime time.Duration    pq := request.pq    go primitive.WithRecover(func() {        for {            select {            case <-pc.done:                rlog.Info("push consumer close pullMessage.", map[string]interface{}{                    rlog.LogKeyConsumerGroup: pc.consumerGroup,                })                return            default:                pc.submitToConsume(request.pq, request.mq)            }        }    })    for {    NEXT:        select {        case <-pc.done:            rlog.Info("push consumer close message handle.", map[string]interface{}{                rlog.LogKeyConsumerGroup: pc.consumerGroup,            })            return        default:        }        if pq.IsDroppd() {            rlog.Debug("the request was dropped, so stop task", map[string]interface{}{                rlog.LogKeyPullRequest: request.String(),            })            return        }        if sleepTime > 0 {            rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)            time.Sleep(sleepTime)        }        // reset time        sleepTime = pc.option.PullInterval        pq.lastPullTime = time.Now()        err := pc.makeSureStateOK()        if err != nil {            rlog.Warning("consumer state error", map[string]interface{}{                rlog.LogKeyUnderlayError: err.Error(),            })            sleepTime = _PullDelayTimeWhenError            goto NEXT        }        if pc.pause {            rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",                pc.option.InstanceName, pc.consumerGroup, request.String()), nil)            sleepTime = _PullDelayTimeWhenSuspend            goto NEXT        }        cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)        if pq.cachedMsgCount > pc.option.PullThresholdForQueue {            if pc.queueFlowControlTimes%1000 == 0 {                rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{                    "PullThresholdForQueue": pc.option.PullThresholdForQueue,                    "minOffset":             pq.Min(),                    "maxOffset":             pq.Max(),                    "count":                 pq.msgCache,                    "size(MiB)":             cachedMessageSizeInMiB,                    "flowControlTimes":      pc.queueFlowControlTimes,                    rlog.LogKeyPullRequest:  request.String(),                })            }            pc.queueFlowControlTimes++            sleepTime = _PullDelayTimeWhenFlowControl            goto NEXT        }        if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {            if pc.queueFlowControlTimes%1000 == 0 {                rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{                    "PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,                    "minOffset":                 pq.Min(),                    "maxOffset":                 pq.Max(),                    "count":                     pq.msgCache,                    "size(MiB)":                 cachedMessageSizeInMiB,                    "flowControlTimes":          pc.queueFlowControlTimes,                    rlog.LogKeyPullRequest:      request.String(),                })            }            pc.queueFlowControlTimes++            sleepTime = _PullDelayTimeWhenFlowControl            goto NEXT        }        if !pc.consumeOrderly {            if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {                if pc.queueMaxSpanFlowControlTimes%1000 == 0 {                    rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{                        "ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,                        "minOffset":                  pq.Min(),                        "maxOffset":                  pq.Max(),                        "maxSpan":                    pq.getMaxSpan(),                        "flowControlTimes":           pc.queueFlowControlTimes,                        rlog.LogKeyPullRequest:       request.String(),                    })                }                sleepTime = _PullDelayTimeWhenFlowControl                goto NEXT            }        } else {            if pq.IsLock() {                if !request.lockedFirst {                    offset := pc.computePullFromWhere(request.mq)                    brokerBusy := offset < request.nextOffset                    rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{                        rlog.LogKeyPullRequest:      request.String(),                        rlog.LogKeyValueChangedFrom: request.nextOffset,                        rlog.LogKeyValueChangedTo:   offset,                        "brokerBusy":                brokerBusy,                    })                    if brokerBusy {                        rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+                            "broker consume offset", map[string]interface{}{"offset": offset})                    }                    request.lockedFirst = true                    request.nextOffset = offset                }            } else {                rlog.Info("pull message later because not locked in broker", map[string]interface{}{                    rlog.LogKeyPullRequest: request.String(),                })                sleepTime = _PullDelayTimeWhenError                goto NEXT            }        }        v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)        if !exist {            rlog.Info("find the consumer's subscription failed", map[string]interface{}{                rlog.LogKeyPullRequest: request.String(),            })            sleepTime = _PullDelayTimeWhenError            goto NEXT        }        beginTime := time.Now()        var (            commitOffsetEnable bool            commitOffsetValue  int64            subExpression      string        )        if pc.model == Clustering {            commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)            if commitOffsetValue > 0 {                commitOffsetEnable = true            }        }        sd := v.(*internal.SubscriptionData)        classFilter := sd.ClassFilterMode        if pc.option.PostSubscriptionWhenPull && classFilter {            subExpression = sd.SubString        }        sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)        pullRequest := &internal.PullMessageRequestHeader{            ConsumerGroup:        pc.consumerGroup,            Topic:                request.mq.Topic,            QueueId:              int32(request.mq.QueueId),            QueueOffset:          request.nextOffset,            MaxMsgNums:           pc.option.PullBatchSize,            SysFlag:              sysFlag,            CommitOffset:         commitOffsetValue,            SubExpression:        _SubAll,            ExpressionType:       string(TAG),            SuspendTimeoutMillis: 20 * time.Second,        }        //        //if data.ExpType == string(TAG) {        //    pullRequest.SubVersion = 0        //} else {        //    pullRequest.SubVersion = data.SubVersion        //}        brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)        if brokerResult == nil {            rlog.Warning("no broker found for mq", map[string]interface{}{                rlog.LogKeyPullRequest: request.mq.String(),            })            sleepTime = _PullDelayTimeWhenError            goto NEXT        }        if brokerResult.Slave {            pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)        }        result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)        if err != nil {            rlog.Warning("pull message from broker error", map[string]interface{}{                rlog.LogKeyBroker:        brokerResult.BrokerAddr,                rlog.LogKeyUnderlayError: err.Error(),            })            sleepTime = _PullDelayTimeWhenError            goto NEXT        }        if result.Status == primitive.PullBrokerTimeout {            rlog.Warning("pull broker timeout", map[string]interface{}{                rlog.LogKeyBroker: brokerResult.BrokerAddr,            })            sleepTime = _PullDelayTimeWhenError            goto NEXT        }        switch result.Status {        case primitive.PullFound:            rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)            prevRequestOffset := request.nextOffset            request.nextOffset = result.NextBeginOffset            rt := time.Now().Sub(beginTime) / time.Millisecond            increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))            pc.processPullResult(request.mq, result, sd)            msgFounded := result.GetMessageExts()            firstMsgOffset := int64(math.MaxInt64)            if msgFounded != nil && len(msgFounded) != 0 {                firstMsgOffset = msgFounded[0].QueueOffset                increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))                pq.putMessage(msgFounded...)            }            if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {                rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{                    "nextBeginOffset":   result.NextBeginOffset,                    "firstMsgOffset":    firstMsgOffset,                    "prevRequestOffset": prevRequestOffset,                })            }        case primitive.PullNoNewMsg:            rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",                request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)        case primitive.PullNoMsgMatched:            request.nextOffset = result.NextBeginOffset            pc.correctTagsOffset(request)        case primitive.PullOffsetIllegal:            rlog.Warning("the pull request offset illegal", map[string]interface{}{                rlog.LogKeyPullRequest: request.String(),                "result":               result.String(),            })            request.nextOffset = result.NextBeginOffset            pq.WithDropped(true)            time.Sleep(10 * time.Second)            pc.storage.update(request.mq, request.nextOffset, false)            pc.storage.persist([]*primitive.MessageQueue{request.mq})            pc.processQueueTable.Delete(request.mq)            rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)        default:            rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)            sleepTime = _PullDelayTimeWhenError        }    }}
  • pullMessage办法会创立internal.PullMessageRequestHeader,之后通过pc.defaultConsumer.tryFindBroker获取brokerResult,之后执行pc.client.PullMessage获取result;对于result.Status为primitive.PullFound执行pc.processPullResult、pq.putMessage提交到processQueue;pc.submitToConsume(request.pq, request.mq)对于p.consumeOrderly执行的是p.consumeMessageOrderly,否则执行的是p.consumeMessageCurrently,他们都会执行pc.consumeInner

consumeInner

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {    if len(subMsgs) == 0 {        return ConsumeRetryLater, errors.New("msg list empty")    }    f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)    // fix lost retry message    if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {        f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))    }    if !exist {        return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)    }    callback, ok := f.(*PushConsumerCallback)    if !ok {        return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)    }    if pc.interceptor == nil {        return callback.f(ctx, subMsgs...)    } else {        var container ConsumeResultHolder        err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {            msgs := req.([]*primitive.MessageExt)            r, e := callback.f(ctx, msgs...)            realReply := reply.(*ConsumeResultHolder)            realReply.ConsumeResult = r            msgCtx, _ := primitive.GetConsumerCtx(ctx)            msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess            if realReply.ConsumeResult == ConsumeSuccess {                msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)            } else {                msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)            }            return e        })        return container.ConsumeResult, err    }}
  • consumeInner办法会触发f.(*PushConsumerCallback)

小结

pushConsumer是对pull模式的封装,拉到音讯之后若consumeOrderly则执行consumeMessageOrderly,否则执行的是consumeMessageCurrently,他们外部调用了consumeInner,会触发PushConsumerCallback回调

doc

  • push_consumer