本文次要钻研一下rocketmq-client-go的PullConsumer

PullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type PullConsumer interface {    // Start    Start()    // Shutdown refuse all new pull operation, finish all submitted.    Shutdown()    // Pull pull message of topic,  selector indicate which queue to pull.    Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)    // PullFrom pull messages of queue from the offset to offset + numbers    PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)    // updateOffset update offset of queue in mem    UpdateOffset(queue *primitive.MessageQueue, offset int64) error    // PersistOffset persist all offset in mem.    PersistOffset(ctx context.Context) error    // CurrentOffset return the current offset of queue in mem.    CurrentOffset(queue *primitive.MessageQueue) (int64, error)}
  • PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset办法

defaultPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type defaultPullConsumer struct {    *defaultConsumer    option    consumerOptions    client    internal.RMQClient    GroupName string    Model     MessageModel    UnitMode  bool    interceptor primitive.Interceptor}
  • defaultPullConsumer定义了consumerOptions、client、GroupName、Model、UnitMode属性

NewPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {    defaultOpts := defaultPullConsumerOptions()    for _, apply := range options {        apply(&defaultOpts)    }    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)    if err != nil {        return nil, errors.Wrap(err, "new Namesrv failed.")    }    dc := &defaultConsumer{        client:        internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),        consumerGroup: defaultOpts.GroupName,        cType:         _PullConsume,        state:         int32(internal.StateCreateJust),        prCh:          make(chan PullRequest, 4),        model:         defaultOpts.ConsumerModel,        option:        defaultOpts,        namesrv: srvs,    }    c := &defaultPullConsumer{        defaultConsumer: dc,    }    return c, nil}
  • NewPullConsumer办法实例化defaultConsumer

Start

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Start() error {    atomic.StoreInt32(&c.state, int32(internal.StateRunning))    var err error    c.once.Do(func() {        err = c.start()        if err != nil {            return        }    })    return err}
  • Start办法执行defaultPullConsumer的start办法

Pull

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) {    mq := c.getNextQueueOf(topic)    if mq == nil {        return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)    }    data := buildSubscriptionData(mq.Topic, selector)    result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)    if err != nil {        return nil, err    }    c.processPullResult(mq, result, data)    return result, nil}
  • Pull办法先通过c.getNextQueueOf(topic)获取mq,而后通过buildSubscriptionData(mq.Topic, selector)结构data,之后执行c.pull,最初执行c.processPullResult(mq, result, data)

getNextQueueOf

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {    queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)    if err != nil && len(queues) > 0 {        rlog.Error("get next mq error", map[string]interface{}{            rlog.LogKeyTopic:         topic,            rlog.LogKeyUnderlayError: err.Error(),        })        return nil    }    var index int64    v, exist := queueCounterTable.Load(topic)    if !exist {        index = -1        queueCounterTable.Store(topic, 0)    } else {        index = v.(int64)    }    return queues[int(atomic.AddInt64(&index, 1))%len(queues)]}
  • getNextQueueOf办法先通过c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)获取queues,而后执行queueCounterTable.Load(topic)获取index

PullFrom

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PullFrom pull messages of queue from the offset to offset + numbersfunc (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {    if err := c.checkPull(ctx, queue, offset, numbers); err != nil {        return nil, err    }    selector := MessageSelector{}    data := buildSubscriptionData(queue.Topic, selector)    return c.pull(ctx, queue, data, offset, numbers)}
  • PullFrom办法先执行c.checkPull,之后通过buildSubscriptionData结构subscriptionData,最初通过c.pull(ctx, queue, data, offset, numbers)拉取数据

UpdateOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// updateOffset update offset of queue in memfunc (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error {    return c.updateOffset(queue, offset)}
  • UpdateOffset办法通过defaultPullConsumer来提交

PersistOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PersistOffset persist all offset in mem.func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {    return c.persistConsumerOffset()}
  • PersistOffset办法通过defaultPullConsumer来长久化

CurrentOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// CurrentOffset return the current offset of queue in mem.func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) {    v := c.queryOffset(queue)    return v, nil}
  • CurrentOffset办法执行c.queryOffset(queue)

Shutdown

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// Shutdown close defaultConsumer, refuse new request.func (c *defaultPullConsumer) Shutdown() error {    return c.defaultConsumer.shutdown()}
  • Shutdown办法则执行c.defaultConsumer.shutdown()

小结

PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset办法

doc

  • pull_consumer