聊聊rocketmqclientgo的pushConsumer

51次阅读

共计 13930 个字符,预计需要花费 35 分钟才能阅读完成。

本文次要钻研一下 rocketmq-client-go 的 pushConsumer

pushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

type pushConsumer struct {
    *defaultConsumer
    queueFlowControlTimes        int
    queueMaxSpanFlowControlTimes int
    consumeFunc                  utils.Set
    submitToConsume              func(*processQueue, *primitive.MessageQueue)
    subscribedTopic              map[string]string
    interceptor                  primitive.Interceptor
    queueLock                    *QueueLock
    done                         chan struct{}
    closeOnce                    sync.Once
}
  • pushConsumer 定义了 queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce 属性

NewPushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func NewPushConsumer(opts ...Option) (*pushConsumer, error) {defaultOpts := defaultPushConsumerOptions()
    for _, apply := range opts {apply(&defaultOpts)
    }
    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
    if err != nil {return nil, errors.Wrap(err, "new Namesrv failed.")
    }
    if !defaultOpts.Credentials.IsEmpty() {srvs.SetCredentials(defaultOpts.Credentials)
    }
    defaultOpts.Namesrv = srvs

    if defaultOpts.Namespace != "" {defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName}

    dc := &defaultConsumer{client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
        consumerGroup:  defaultOpts.GroupName,
        cType:          _PushConsume,
        state:          int32(internal.StateCreateJust),
        prCh:           make(chan PullRequest, 4),
        model:          defaultOpts.ConsumerModel,
        consumeOrderly: defaultOpts.ConsumeOrderly,
        fromWhere:      defaultOpts.FromWhere,
        allocate:       defaultOpts.Strategy,
        option:         defaultOpts,
        namesrv:        srvs,
    }

    p := &pushConsumer{
        defaultConsumer: dc,
        subscribedTopic: make(map[string]string, 0),
        queueLock:       newQueueLock(),
        done:            make(chan struct{}, 1),
        consumeFunc:     utils.NewSet(),}
    dc.mqChanged = p.messageQueueChanged
    if p.consumeOrderly {p.submitToConsume = p.consumeMessageOrderly} else {p.submitToConsume = p.consumeMessageCurrently}

    p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)

    return p, nil
}
  • NewPushConsumer 办法实例化 defaultConsumer 及 pushConsumer

Start

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Start() error {
    var err error
    pc.once.Do(func() {rlog.Info("the consumer start beginning", map[string]interface{}{
            rlog.LogKeyConsumerGroup: pc.consumerGroup,
            "messageModel":           pc.model,
            "unitMode":               pc.unitMode,
        })
        atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
        pc.validate()

        err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
        if err != nil {rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
            err = ErrCreated
            return
        }

        err = pc.defaultConsumer.start()
        if err != nil {return}

        go func() {
            // todo start clean msg expired
            for {
                select {
                case pr := <-pc.prCh:
                    go func() {pc.pullMessage(&pr)
                    }()
                case <-pc.done:
                    rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
                    return
                }
            }
        }()

        go primitive.WithRecover(func() {
            // initial lock.
            if !pc.consumeOrderly {return}

            time.Sleep(1000 * time.Millisecond)
            pc.lockAll()

            lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
            defer lockTicker.Stop()
            for {
                select {
                case <-lockTicker.C:
                    pc.lockAll()
                case <-pc.done:
                    rlog.Info("push consumer close tick.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
                    return
                }
            }
        })
    })

    if err != nil {return err}

    pc.client.UpdateTopicRouteInfo()
    for k := range pc.subscribedTopic {_, exist := pc.topicSubscribeInfoTable.Load(k)
        if !exist {pc.client.Shutdown()
            return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
        }
    }
    pc.client.CheckClientInBroker()
    pc.client.SendHeartbeatToAllBrokerWithLock()
    pc.client.RebalanceImmediately()

    return err
}
  • Start 办法执行 pc.client.RegisterConsumer 及 pc.defaultConsumer.start(),而后异步执行 pc.pullMessage(&pr);对于非 consumeOrderly 则通过 time.NewTicker 创立 lockTicker,执行 pc.lockAll();之后执行 pc.client.UpdateTopicRouteInfo()、pc.client.CheckClientInBroker()、pc.client.SendHeartbeatToAllBrokerWithLock() 及 pc.client.RebalanceImmediately()

Shutdown

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Shutdown() error {
    var err error
    pc.closeOnce.Do(func() {close(pc.done)

        pc.client.UnregisterConsumer(pc.consumerGroup)
        err = pc.defaultConsumer.shutdown()})

    return err
}
  • Shutdown 办法则执行 pc.client.UnregisterConsumer 及 pc.defaultConsumer.shutdown()

Subscribe

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
    f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {return errors.New("subscribe topic only started before")
    }
    if pc.option.Namespace != "" {topic = pc.option.Namespace + "%" + topic}
    data := buildSubscriptionData(topic, selector)
    pc.subscriptionDataTable.Store(topic, data)
    pc.subscribedTopic[topic] = ""

    pc.consumeFunc.Add(&PushConsumerCallback{
        f:     f,
        topic: topic,
    })
    return nil
}
  • Subscribe 办法先通过 buildSubscriptionData 构建 data,之后执行 pc.subscriptionDataTable.Store(topic, data) 及 pc.consumeFunc.Add

pullMessage

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) pullMessage(request *PullRequest) {rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
    })
    var sleepTime time.Duration
    pq := request.pq
    go primitive.WithRecover(func() {
        for {
            select {
            case <-pc.done:
                rlog.Info("push consumer close pullMessage.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
                return
            default:
                pc.submitToConsume(request.pq, request.mq)
            }
        }
    })

    for {
    NEXT:
        select {
        case <-pc.done:
            rlog.Info("push consumer close message handle.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
            return
        default:
        }

        if pq.IsDroppd() {rlog.Debug("the request was dropped, so stop task", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
            })
            return
        }
        if sleepTime > 0 {rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
            time.Sleep(sleepTime)
        }
        // reset time
        sleepTime = pc.option.PullInterval
        pq.lastPullTime = time.Now()
        err := pc.makeSureStateOK()
        if err != nil {rlog.Warning("consumer state error", map[string]interface{}{rlog.LogKeyUnderlayError: err.Error(),
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }

        if pc.pause {rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",
                pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
            sleepTime = _PullDelayTimeWhenSuspend
            goto NEXT
        }

        cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
        if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
            if pc.queueFlowControlTimes%1000 == 0 {rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
                    "PullThresholdForQueue": pc.option.PullThresholdForQueue,
                    "minOffset":             pq.Min(),
                    "maxOffset":             pq.Max(),
                    "count":                 pq.msgCache,
                    "size(MiB)":             cachedMessageSizeInMiB,
                    "flowControlTimes":      pc.queueFlowControlTimes,
                    rlog.LogKeyPullRequest:  request.String(),})
            }
            pc.queueFlowControlTimes++
            sleepTime = _PullDelayTimeWhenFlowControl
            goto NEXT
        }

        if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
            if pc.queueFlowControlTimes%1000 == 0 {rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
                    "PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
                    "minOffset":                 pq.Min(),
                    "maxOffset":                 pq.Max(),
                    "count":                     pq.msgCache,
                    "size(MiB)":                 cachedMessageSizeInMiB,
                    "flowControlTimes":          pc.queueFlowControlTimes,
                    rlog.LogKeyPullRequest:      request.String(),})
            }
            pc.queueFlowControlTimes++
            sleepTime = _PullDelayTimeWhenFlowControl
            goto NEXT
        }

        if !pc.consumeOrderly {if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
                if pc.queueMaxSpanFlowControlTimes%1000 == 0 {rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{"ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,"minOffset":                  pq.Min(),"maxOffset":                  pq.Max(),"maxSpan":                    pq.getMaxSpan(),"flowControlTimes":           pc.queueFlowControlTimes,
                        rlog.LogKeyPullRequest:       request.String(),})
                }
                sleepTime = _PullDelayTimeWhenFlowControl
                goto NEXT
            }
        } else {if pq.IsLock() {
                if !request.lockedFirst {offset := pc.computePullFromWhere(request.mq)
                    brokerBusy := offset < request.nextOffset
                    rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{rlog.LogKeyPullRequest:      request.String(),
                        rlog.LogKeyValueChangedFrom: request.nextOffset,
                        rlog.LogKeyValueChangedTo:   offset,
                        "brokerBusy":                brokerBusy,
                    })
                    if brokerBusy {rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than"+
                            "broker consume offset", map[string]interface{}{"offset": offset})
                    }
                    request.lockedFirst = true
                    request.nextOffset = offset
                }
            } else {rlog.Info("pull message later because not locked in broker", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
                })
                sleepTime = _PullDelayTimeWhenError
                goto NEXT
            }
        }

        v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
        if !exist {rlog.Info("find the consumer's subscription failed", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }
        beginTime := time.Now()
        var (
            commitOffsetEnable bool
            commitOffsetValue  int64
            subExpression      string
        )

        if pc.model == Clustering {commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
            if commitOffsetValue > 0 {commitOffsetEnable = true}
        }

        sd := v.(*internal.SubscriptionData)
        classFilter := sd.ClassFilterMode
        if pc.option.PostSubscriptionWhenPull && classFilter {subExpression = sd.SubString}

        sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)

        pullRequest := &internal.PullMessageRequestHeader{
            ConsumerGroup:        pc.consumerGroup,
            Topic:                request.mq.Topic,
            QueueId:              int32(request.mq.QueueId),
            QueueOffset:          request.nextOffset,
            MaxMsgNums:           pc.option.PullBatchSize,
            SysFlag:              sysFlag,
            CommitOffset:         commitOffsetValue,
            SubExpression:        _SubAll,
            ExpressionType:       string(TAG),
            SuspendTimeoutMillis: 20 * time.Second,
        }
        //
        //if data.ExpType == string(TAG) {
        //    pullRequest.SubVersion = 0
        //} else {
        //    pullRequest.SubVersion = data.SubVersion
        //}

        brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
        if brokerResult == nil {rlog.Warning("no broker found for mq", map[string]interface{}{rlog.LogKeyPullRequest: request.mq.String(),
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }

        if brokerResult.Slave {pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
        }

        result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
        if err != nil {rlog.Warning("pull message from broker error", map[string]interface{}{
                rlog.LogKeyBroker:        brokerResult.BrokerAddr,
                rlog.LogKeyUnderlayError: err.Error(),})
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }

        if result.Status == primitive.PullBrokerTimeout {rlog.Warning("pull broker timeout", map[string]interface{}{rlog.LogKeyBroker: brokerResult.BrokerAddr,})
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }

        switch result.Status {
        case primitive.PullFound:
            rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
            prevRequestOffset := request.nextOffset
            request.nextOffset = result.NextBeginOffset

            rt := time.Now().Sub(beginTime) / time.Millisecond
            increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))

            pc.processPullResult(request.mq, result, sd)

            msgFounded := result.GetMessageExts()
            firstMsgOffset := int64(math.MaxInt64)
            if msgFounded != nil && len(msgFounded) != 0 {firstMsgOffset = msgFounded[0].QueueOffset
                increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
                pq.putMessage(msgFounded...)
            }
            if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
                    "nextBeginOffset":   result.NextBeginOffset,
                    "firstMsgOffset":    firstMsgOffset,
                    "prevRequestOffset": prevRequestOffset,
                })
            }
        case primitive.PullNoNewMsg:
            rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
                request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
        case primitive.PullNoMsgMatched:
            request.nextOffset = result.NextBeginOffset
            pc.correctTagsOffset(request)
        case primitive.PullOffsetIllegal:
            rlog.Warning("the pull request offset illegal", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
                "result":               result.String(),})
            request.nextOffset = result.NextBeginOffset
            pq.WithDropped(true)
            time.Sleep(10 * time.Second)
            pc.storage.update(request.mq, request.nextOffset, false)
            pc.storage.persist([]*primitive.MessageQueue{request.mq})
            pc.processQueueTable.Delete(request.mq)
            rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
        default:
            rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
            sleepTime = _PullDelayTimeWhenError
        }
    }
}
  • pullMessage 办法会创立 internal.PullMessageRequestHeader,之后通过 pc.defaultConsumer.tryFindBroker 获取 brokerResult,之后执行 pc.client.PullMessage 获取 result;对于 result.Status 为 primitive.PullFound 执行 pc.processPullResult、pq.putMessage 提交到 processQueue;pc.submitToConsume(request.pq, request.mq) 对于 p.consumeOrderly 执行的是 p.consumeMessageOrderly,否则执行的是 p.consumeMessageCurrently,他们都会执行 pc.consumeInner

consumeInner

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {if len(subMsgs) == 0 {return ConsumeRetryLater, errors.New("msg list empty")
    }

    f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)

    // fix lost retry message
    if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
    }

    if !exist {return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
    }

    callback, ok := f.(*PushConsumerCallback)
    if !ok {return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
    }
    if pc.interceptor == nil {return callback.f(ctx, subMsgs...)
    } else {
        var container ConsumeResultHolder
        err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {msgs := req.([]*primitive.MessageExt)
            r, e := callback.f(ctx, msgs...)

            realReply := reply.(*ConsumeResultHolder)
            realReply.ConsumeResult = r

            msgCtx, _ := primitive.GetConsumerCtx(ctx)
            msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
            if realReply.ConsumeResult == ConsumeSuccess {msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
            } else {msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
            }
            return e
        })
        return container.ConsumeResult, err
    }
}
  • consumeInner 办法会触发 f.(*PushConsumerCallback)

小结

pushConsumer 是对 pull 模式的封装,拉到音讯之后若 consumeOrderly 则执行 consumeMessageOrderly,否则执行的是 consumeMessageCurrently,他们外部调用了 consumeInner,会触发 PushConsumerCallback 回调

doc

  • push_consumer

正文完
 0