共计 13930 个字符,预计需要花费 35 分钟才能阅读完成。
序
本文次要钻研一下 rocketmq-client-go 的 pushConsumer
pushConsumer
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
type pushConsumer struct {
*defaultConsumer
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
consumeFunc utils.Set
submitToConsume func(*processQueue, *primitive.MessageQueue)
subscribedTopic map[string]string
interceptor primitive.Interceptor
queueLock *QueueLock
done chan struct{}
closeOnce sync.Once
}
- pushConsumer 定义了 queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce 属性
NewPushConsumer
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {defaultOpts := defaultPushConsumerOptions()
for _, apply := range opts {apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
if err != nil {return nil, errors.Wrap(err, "new Namesrv failed.")
}
if !defaultOpts.Credentials.IsEmpty() {srvs.SetCredentials(defaultOpts.Credentials)
}
defaultOpts.Namesrv = srvs
if defaultOpts.Namespace != "" {defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName}
dc := &defaultConsumer{client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
option: defaultOpts,
namesrv: srvs,
}
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {p.submitToConsume = p.consumeMessageOrderly} else {p.submitToConsume = p.consumeMessageCurrently}
p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
return p, nil
}
- NewPushConsumer 办法实例化 defaultConsumer 及 pushConsumer
Start
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {rlog.Info("the consumer start beginning", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
"messageModel": pc.model,
"unitMode": pc.unitMode,
})
atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
pc.validate()
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
err = ErrCreated
return
}
err = pc.defaultConsumer.start()
if err != nil {return}
go func() {
// todo start clean msg expired
for {
select {
case pr := <-pc.prCh:
go func() {pc.pullMessage(&pr)
}()
case <-pc.done:
rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
return
}
}
}()
go primitive.WithRecover(func() {
// initial lock.
if !pc.consumeOrderly {return}
time.Sleep(1000 * time.Millisecond)
pc.lockAll()
lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
defer lockTicker.Stop()
for {
select {
case <-lockTicker.C:
pc.lockAll()
case <-pc.done:
rlog.Info("push consumer close tick.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
return
}
}
})
})
if err != nil {return err}
pc.client.UpdateTopicRouteInfo()
for k := range pc.subscribedTopic {_, exist := pc.topicSubscribeInfoTable.Load(k)
if !exist {pc.client.Shutdown()
return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
}
}
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
pc.client.RebalanceImmediately()
return err
}
- Start 办法执行 pc.client.RegisterConsumer 及 pc.defaultConsumer.start(),而后异步执行 pc.pullMessage(&pr);对于非 consumeOrderly 则通过 time.NewTicker 创立 lockTicker,执行 pc.lockAll();之后执行 pc.client.UpdateTopicRouteInfo()、pc.client.CheckClientInBroker()、pc.client.SendHeartbeatToAllBrokerWithLock() 及 pc.client.RebalanceImmediately()
Shutdown
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {close(pc.done)
pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()})
return err
}
- Shutdown 办法则执行 pc.client.UnregisterConsumer 及 pc.defaultConsumer.shutdown()
Subscribe
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {return errors.New("subscribe topic only started before")
}
if pc.option.Namespace != "" {topic = pc.option.Namespace + "%" + topic}
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
pc.consumeFunc.Add(&PushConsumerCallback{
f: f,
topic: topic,
})
return nil
}
- Subscribe 办法先通过 buildSubscriptionData 构建 data,之后执行 pc.subscriptionDataTable.Store(topic, data) 及 pc.consumeFunc.Add
pullMessage
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) pullMessage(request *PullRequest) {rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
})
var sleepTime time.Duration
pq := request.pq
go primitive.WithRecover(func() {
for {
select {
case <-pc.done:
rlog.Info("push consumer close pullMessage.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
return
default:
pc.submitToConsume(request.pq, request.mq)
}
}
})
for {
NEXT:
select {
case <-pc.done:
rlog.Info("push consumer close message handle.", map[string]interface{}{rlog.LogKeyConsumerGroup: pc.consumerGroup,})
return
default:
}
if pq.IsDroppd() {rlog.Debug("the request was dropped, so stop task", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
})
return
}
if sleepTime > 0 {rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval
pq.lastPullTime = time.Now()
err := pc.makeSureStateOK()
if err != nil {rlog.Warning("consumer state error", map[string]interface{}{rlog.LogKeyUnderlayError: err.Error(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if pc.pause {rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",
pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
sleepTime = _PullDelayTimeWhenSuspend
goto NEXT
}
cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.msgCache,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
if pc.queueFlowControlTimes%1000 == 0 {rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.msgCache,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if !pc.consumeOrderly {if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
if pc.queueMaxSpanFlowControlTimes%1000 == 0 {rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{"ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,"minOffset": pq.Min(),"maxOffset": pq.Max(),"maxSpan": pq.getMaxSpan(),"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),})
}
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
} else {if pq.IsLock() {
if !request.lockedFirst {offset := pc.computePullFromWhere(request.mq)
brokerBusy := offset < request.nextOffset
rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
rlog.LogKeyValueChangedFrom: request.nextOffset,
rlog.LogKeyValueChangedTo: offset,
"brokerBusy": brokerBusy,
})
if brokerBusy {rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than"+
"broker consume offset", map[string]interface{}{"offset": offset})
}
request.lockedFirst = true
request.nextOffset = offset
}
} else {rlog.Info("pull message later because not locked in broker", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
}
v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
if !exist {rlog.Info("find the consumer's subscription failed", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
beginTime := time.Now()
var (
commitOffsetEnable bool
commitOffsetValue int64
subExpression string
)
if pc.model == Clustering {commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
if commitOffsetValue > 0 {commitOffsetEnable = true}
}
sd := v.(*internal.SubscriptionData)
classFilter := sd.ClassFilterMode
if pc.option.PostSubscriptionWhenPull && classFilter {subExpression = sd.SubString}
sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
pullRequest := &internal.PullMessageRequestHeader{
ConsumerGroup: pc.consumerGroup,
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize,
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: _SubAll,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
}
//
//if data.ExpType == string(TAG) {
// pullRequest.SubVersion = 0
//} else {
// pullRequest.SubVersion = data.SubVersion
//}
brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
if brokerResult == nil {rlog.Warning("no broker found for mq", map[string]interface{}{rlog.LogKeyPullRequest: request.mq.String(),
})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if brokerResult.Slave {pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
}
result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
if err != nil {rlog.Warning("pull message from broker error", map[string]interface{}{
rlog.LogKeyBroker: brokerResult.BrokerAddr,
rlog.LogKeyUnderlayError: err.Error(),})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if result.Status == primitive.PullBrokerTimeout {rlog.Warning("pull broker timeout", map[string]interface{}{rlog.LogKeyBroker: brokerResult.BrokerAddr,})
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
switch result.Status {
case primitive.PullFound:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime) / time.Millisecond
increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
pc.processPullResult(request.mq, result, sd)
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if msgFounded != nil && len(msgFounded) != 0 {firstMsgOffset = msgFounded[0].QueueOffset
increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
"nextBeginOffset": result.NextBeginOffset,
"firstMsgOffset": firstMsgOffset,
"prevRequestOffset": prevRequestOffset,
})
}
case primitive.PullNoNewMsg:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
case primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case primitive.PullOffsetIllegal:
rlog.Warning("the pull request offset illegal", map[string]interface{}{rlog.LogKeyPullRequest: request.String(),
"result": result.String(),})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.processQueueTable.Delete(request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
}
}
}
- pullMessage 办法会创立 internal.PullMessageRequestHeader,之后通过 pc.defaultConsumer.tryFindBroker 获取 brokerResult,之后执行 pc.client.PullMessage 获取 result;对于 result.Status 为 primitive.PullFound 执行 pc.processPullResult、pq.putMessage 提交到 processQueue;pc.submitToConsume(request.pq, request.mq) 对于 p.consumeOrderly 执行的是 p.consumeMessageOrderly,否则执行的是 p.consumeMessageCurrently,他们都会执行 pc.consumeInner
consumeInner
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {if len(subMsgs) == 0 {return ConsumeRetryLater, errors.New("msg list empty")
}
f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
// fix lost retry message
if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
}
if !exist {return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
}
callback, ok := f.(*PushConsumerCallback)
if !ok {return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
}
if pc.interceptor == nil {return callback.f(ctx, subMsgs...)
} else {
var container ConsumeResultHolder
err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {msgs := req.([]*primitive.MessageExt)
r, e := callback.f(ctx, msgs...)
realReply := reply.(*ConsumeResultHolder)
realReply.ConsumeResult = r
msgCtx, _ := primitive.GetConsumerCtx(ctx)
msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
if realReply.ConsumeResult == ConsumeSuccess {msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
} else {msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
return e
})
return container.ConsumeResult, err
}
}
- consumeInner 办法会触发
f.(*PushConsumerCallback)
小结
pushConsumer 是对 pull 模式的封装,拉到音讯之后若 consumeOrderly 则执行 consumeMessageOrderly,否则执行的是 consumeMessageCurrently,他们外部调用了 consumeInner,会触发 PushConsumerCallback 回调
doc
- push_consumer
正文完