聊聊rocketmqclientgo的PullConsumer

8次阅读

共计 5106 个字符,预计需要花费 13 分钟才能阅读完成。

本文次要钻研一下 rocketmq-client-go 的 PullConsumer

PullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type PullConsumer interface {
    // Start
    Start()

    // Shutdown refuse all new pull operation, finish all submitted.
    Shutdown()

    // Pull pull message of topic,  selector indicate which queue to pull.
    Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)

    // PullFrom pull messages of queue from the offset to offset + numbers
    PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

    // updateOffset update offset of queue in mem
    UpdateOffset(queue *primitive.MessageQueue, offset int64) error

    // PersistOffset persist all offset in mem.
    PersistOffset(ctx context.Context) error

    // CurrentOffset return the current offset of queue in mem.
    CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}
  • PullConsumer 定义了 Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset 办法

defaultPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

type defaultPullConsumer struct {
    *defaultConsumer

    option    consumerOptions
    client    internal.RMQClient
    GroupName string
    Model     MessageModel
    UnitMode  bool

    interceptor primitive.Interceptor
}
  • defaultPullConsumer 定义了 consumerOptions、client、GroupName、Model、UnitMode 属性

NewPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {defaultOpts := defaultPullConsumerOptions()
    for _, apply := range options {apply(&defaultOpts)
    }

    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
    if err != nil {return nil, errors.Wrap(err, "new Namesrv failed.")
    }

    dc := &defaultConsumer{client:        internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
        consumerGroup: defaultOpts.GroupName,
        cType:         _PullConsume,
        state:         int32(internal.StateCreateJust),
        prCh:          make(chan PullRequest, 4),
        model:         defaultOpts.ConsumerModel,
        option:        defaultOpts,

        namesrv: srvs,
    }

    c := &defaultPullConsumer{defaultConsumer: dc,}
    return c, nil
}
  • NewPullConsumer 办法实例化 defaultConsumer

Start

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Start() error {atomic.StoreInt32(&c.state, int32(internal.StateRunning))

    var err error
    c.once.Do(func() {err = c.start()
        if err != nil {return}
    })

    return err
}
  • Start 办法执行 defaultPullConsumer 的 start 办法

Pull

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) {mq := c.getNextQueueOf(topic)
    if mq == nil {return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
    }

    data := buildSubscriptionData(mq.Topic, selector)
    result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)

    if err != nil {return nil, err}

    c.processPullResult(mq, result, data)
    return result, nil
}
  • Pull 办法先通过 c.getNextQueueOf(topic) 获取 mq,而后通过 buildSubscriptionData(mq.Topic, selector) 结构 data,之后执行 c.pull,最初执行 c.processPullResult(mq, result, data)

getNextQueueOf

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
    if err != nil && len(queues) > 0 {rlog.Error("get next mq error", map[string]interface{}{
            rlog.LogKeyTopic:         topic,
            rlog.LogKeyUnderlayError: err.Error(),})
        return nil
    }
    var index int64
    v, exist := queueCounterTable.Load(topic)
    if !exist {
        index = -1
        queueCounterTable.Store(topic, 0)
    } else {index = v.(int64)
    }

    return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
}
  • getNextQueueOf 办法先通过 c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic) 获取 queues,而后执行 queueCounterTable.Load(topic) 获取 index

PullFrom

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PullFrom pull messages of queue from the offset to offset + numbers
func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {if err := c.checkPull(ctx, queue, offset, numbers); err != nil {return nil, err}

    selector := MessageSelector{}
    data := buildSubscriptionData(queue.Topic, selector)

    return c.pull(ctx, queue, data, offset, numbers)
}
  • PullFrom 办法先执行 c.checkPull,之后通过 buildSubscriptionData 结构 subscriptionData,最初通过 c.pull(ctx, queue, data, offset, numbers) 拉取数据

UpdateOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// updateOffset update offset of queue in mem
func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error {return c.updateOffset(queue, offset)
}
  • UpdateOffset 办法通过 defaultPullConsumer 来提交

PersistOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// PersistOffset persist all offset in mem.
func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {return c.persistConsumerOffset()
}
  • PersistOffset 办法通过 defaultPullConsumer 来长久化

CurrentOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// CurrentOffset return the current offset of queue in mem.
func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) {v := c.queryOffset(queue)
    return v, nil
}
  • CurrentOffset 办法执行 c.queryOffset(queue)

Shutdown

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

// Shutdown close defaultConsumer, refuse new request.
func (c *defaultPullConsumer) Shutdown() error {return c.defaultConsumer.shutdown()
}
  • Shutdown 办法则执行 c.defaultConsumer.shutdown()

小结

PullConsumer 定义了 Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset 办法

doc

  • pull_consumer
正文完
 0