序
本文次要钻研一下rocketmq-client-go的PullConsumer
PullConsumer
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
type PullConsumer interface { // Start Start() // Shutdown refuse all new pull operation, finish all submitted. Shutdown() // Pull pull message of topic, selector indicate which queue to pull. Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) // PullFrom pull messages of queue from the offset to offset + numbers PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) // updateOffset update offset of queue in mem UpdateOffset(queue *primitive.MessageQueue, offset int64) error // PersistOffset persist all offset in mem. PersistOffset(ctx context.Context) error // CurrentOffset return the current offset of queue in mem. CurrentOffset(queue *primitive.MessageQueue) (int64, error)}
- PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset办法
defaultPullConsumer
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
type defaultPullConsumer struct { *defaultConsumer option consumerOptions client internal.RMQClient GroupName string Model MessageModel UnitMode bool interceptor primitive.Interceptor}
- defaultPullConsumer定义了consumerOptions、client、GroupName、Model、UnitMode属性
NewPullConsumer
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) { defaultOpts := defaultPullConsumerOptions() for _, apply := range options { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } dc := &defaultConsumer{ client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: defaultOpts.GroupName, cType: _PullConsume, state: int32(internal.StateCreateJust), prCh: make(chan PullRequest, 4), model: defaultOpts.ConsumerModel, option: defaultOpts, namesrv: srvs, } c := &defaultPullConsumer{ defaultConsumer: dc, } return c, nil}
- NewPullConsumer办法实例化defaultConsumer
Start
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
func (c *defaultPullConsumer) Start() error { atomic.StoreInt32(&c.state, int32(internal.StateRunning)) var err error c.once.Do(func() { err = c.start() if err != nil { return } }) return err}
- Start办法执行defaultPullConsumer的start办法
Pull
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) { mq := c.getNextQueueOf(topic) if mq == nil { return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic) } data := buildSubscriptionData(mq.Topic, selector) result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers) if err != nil { return nil, err } c.processPullResult(mq, result, data) return result, nil}
- Pull办法先通过c.getNextQueueOf(topic)获取mq,而后通过buildSubscriptionData(mq.Topic, selector)结构data,之后执行c.pull,最初执行c.processPullResult(mq, result, data)
getNextQueueOf
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue { queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic) if err != nil && len(queues) > 0 { rlog.Error("get next mq error", map[string]interface{}{ rlog.LogKeyTopic: topic, rlog.LogKeyUnderlayError: err.Error(), }) return nil } var index int64 v, exist := queueCounterTable.Load(topic) if !exist { index = -1 queueCounterTable.Store(topic, 0) } else { index = v.(int64) } return queues[int(atomic.AddInt64(&index, 1))%len(queues)]}
- getNextQueueOf办法先通过c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)获取queues,而后执行queueCounterTable.Load(topic)获取index
PullFrom
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
// PullFrom pull messages of queue from the offset to offset + numbersfunc (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) { if err := c.checkPull(ctx, queue, offset, numbers); err != nil { return nil, err } selector := MessageSelector{} data := buildSubscriptionData(queue.Topic, selector) return c.pull(ctx, queue, data, offset, numbers)}
- PullFrom办法先执行c.checkPull,之后通过buildSubscriptionData结构subscriptionData,最初通过c.pull(ctx, queue, data, offset, numbers)拉取数据
UpdateOffset
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
// updateOffset update offset of queue in memfunc (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error { return c.updateOffset(queue, offset)}
- UpdateOffset办法通过defaultPullConsumer来提交
PersistOffset
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
// PersistOffset persist all offset in mem.func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error { return c.persistConsumerOffset()}
- PersistOffset办法通过defaultPullConsumer来长久化
CurrentOffset
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
// CurrentOffset return the current offset of queue in mem.func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) { v := c.queryOffset(queue) return v, nil}
- CurrentOffset办法执行c.queryOffset(queue)
Shutdown
rocketmq-client-go-v2.0.0/consumer/pull_consumer.go
// Shutdown close defaultConsumer, refuse new request.func (c *defaultPullConsumer) Shutdown() error { return c.defaultConsumer.shutdown()}
- Shutdown办法则执行c.defaultConsumer.shutdown()
小结
PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset办法
doc
- pull_consumer