本文次要钻研一下rocketmq-client-go的remoteBrokerOffsetStore

remoteBrokerOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type remoteBrokerOffsetStore struct {    group       string    OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`    client      internal.RMQClient    namesrv     internal.Namesrvs    mutex       sync.RWMutex}
  • remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性

NewRemoteOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore {    return &remoteBrokerOffsetStore{        group:       group,        client:      client,        namesrv:     namesrv,        OffsetTable: make(map[primitive.MessageQueue]int64),    }}
  • NewRemoteOffsetStore办法实例化remoteBrokerOffsetStore

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {    r.mutex.Lock()    defer r.mutex.Unlock()    if len(mqs) == 0 {        return    }    used := make(map[primitive.MessageQueue]struct{}, 0)    for _, mq := range mqs {        used[*mq] = struct{}{}    }    for mq, off := range r.OffsetTable {        if _, ok := used[mq]; !ok {            delete(r.OffsetTable, mq)            continue        }        err := r.updateConsumeOffsetToBroker(r.group, mq, off)        if err != nil {            rlog.Warning("update offset to broker error", map[string]interface{}{                rlog.LogKeyConsumerGroup: r.group,                rlog.LogKeyMessageQueue:  mq.String(),                rlog.LogKeyUnderlayError: err.Error(),                "offset":                 off,            })        } else {            rlog.Info("update offset to broker success", map[string]interface{}{                rlog.LogKeyConsumerGroup: r.group,                rlog.LogKeyMessageQueue:  mq.String(),                "offset":                 off,            })        }    }}
  • persist办法遍历OffsetTable,执行r.updateConsumeOffsetToBroker

remove

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {    r.mutex.Lock()    defer r.mutex.Unlock()    delete(r.OffsetTable, *mq)    rlog.Info("delete mq from offset table", map[string]interface{}{        rlog.LogKeyMessageQueue: mq,    })}
  • remove办法执行delete(r.OffsetTable, *mq)

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {    r.mutex.RLock()    switch t {    case _ReadFromMemory, _ReadMemoryThenStore:        off, exist := r.OffsetTable[*mq]        if exist {            r.mutex.RUnlock()            return off        }        if t == _ReadFromMemory {            r.mutex.RUnlock()            return -1        }        fallthrough    case _ReadFromStore:        off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)        if err != nil {            rlog.Error("fecth offset of mq error", map[string]interface{}{                rlog.LogKeyMessageQueue:  mq.String(),                rlog.LogKeyUnderlayError: err,            })            r.mutex.RUnlock()            return -1        }        r.mutex.RUnlock()        r.update(mq, off, true)        return off    default:    }    return -1}
  • read办法针对_ReadFromStore会执行r.fetchConsumeOffsetFromBroker

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {    r.mutex.Lock()    defer r.mutex.Unlock()    localOffset, exist := r.OffsetTable[*mq]    if !exist {        r.OffsetTable[*mq] = offset        return    }    if increaseOnly {        if localOffset < offset {            r.OffsetTable[*mq] = offset        }    } else {        r.OffsetTable[*mq] = offset    }}
  • update办法更新的是r.OffsetTable[*mq]

fetchConsumeOffsetFromBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {    broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)    if broker == "" {        r.namesrv.UpdateTopicRouteInfo(mq.Topic)        broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)    }    if broker == "" {        return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)    }    queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{        ConsumerGroup: group,        Topic:         mq.Topic,        QueueId:       mq.QueueId,    }    cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)    res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)    if err != nil {        return -1, err    }    if res.Code != internal.ResSuccess {        return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)    }    off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)    if err != nil {        return -1, err    }    return off, nil}
  • fetchConsumeOffsetFromBroker办法构建QueryConsumerOffsetRequestHeader申请,而后通过r.client.InvokeSync发动申请

updateConsumeOffsetToBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error {    broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)    if broker == "" {        r.namesrv.UpdateTopicRouteInfo(mq.Topic)        broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)    }    if broker == "" {        return fmt.Errorf("broker: %s address not found", mq.BrokerName)    }    updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{        ConsumerGroup: group,        Topic:         mq.Topic,        QueueId:       mq.QueueId,        CommitOffset:  off,    }    cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)    return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)}
  • updateConsumeOffsetToBroker办法构建UpdateConsumerOffsetRequestHeader申请,而后通过r.client.InvokeOneWay发动申请

小结

remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性;它提供了NewRemoteOffsetStore、persist、remove、read、update、fetchConsumeOffsetFromBroker、updateConsumeOffsetToBroker办法

doc

  • offset_store