序
本文次要钻研一下rocketmq-client-go的remoteBrokerOffsetStore
remoteBrokerOffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
type remoteBrokerOffsetStore struct { group string OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"` client internal.RMQClient namesrv internal.Namesrvs mutex sync.RWMutex}
- remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性
NewRemoteOffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore { return &remoteBrokerOffsetStore{ group: group, client: client, namesrv: namesrv, OffsetTable: make(map[primitive.MessageQueue]int64), }}
- NewRemoteOffsetStore办法实例化remoteBrokerOffsetStore
persist
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) { r.mutex.Lock() defer r.mutex.Unlock() if len(mqs) == 0 { return } used := make(map[primitive.MessageQueue]struct{}, 0) for _, mq := range mqs { used[*mq] = struct{}{} } for mq, off := range r.OffsetTable { if _, ok := used[mq]; !ok { delete(r.OffsetTable, mq) continue } err := r.updateConsumeOffsetToBroker(r.group, mq, off) if err != nil { rlog.Warning("update offset to broker error", map[string]interface{}{ rlog.LogKeyConsumerGroup: r.group, rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyUnderlayError: err.Error(), "offset": off, }) } else { rlog.Info("update offset to broker success", map[string]interface{}{ rlog.LogKeyConsumerGroup: r.group, rlog.LogKeyMessageQueue: mq.String(), "offset": off, }) } }}
- persist办法遍历OffsetTable,执行r.updateConsumeOffsetToBroker
remove
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) { r.mutex.Lock() defer r.mutex.Unlock() delete(r.OffsetTable, *mq) rlog.Info("delete mq from offset table", map[string]interface{}{ rlog.LogKeyMessageQueue: mq, })}
- remove办法执行
delete(r.OffsetTable, *mq)
read
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 { r.mutex.RLock() switch t { case _ReadFromMemory, _ReadMemoryThenStore: off, exist := r.OffsetTable[*mq] if exist { r.mutex.RUnlock() return off } if t == _ReadFromMemory { r.mutex.RUnlock() return -1 } fallthrough case _ReadFromStore: off, err := r.fetchConsumeOffsetFromBroker(r.group, mq) if err != nil { rlog.Error("fecth offset of mq error", map[string]interface{}{ rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyUnderlayError: err, }) r.mutex.RUnlock() return -1 } r.mutex.RUnlock() r.update(mq, off, true) return off default: } return -1}
- read办法针对_ReadFromStore会执行r.fetchConsumeOffsetFromBroker
update
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) { r.mutex.Lock() defer r.mutex.Unlock() localOffset, exist := r.OffsetTable[*mq] if !exist { r.OffsetTable[*mq] = offset return } if increaseOnly { if localOffset < offset { r.OffsetTable[*mq] = offset } } else { r.OffsetTable[*mq] = offset }}
- update办法更新的是r.OffsetTable[*mq]
fetchConsumeOffsetFromBroker
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) { broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName) if broker == "" { r.namesrv.UpdateTopicRouteInfo(mq.Topic) broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName) } if broker == "" { return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName) } queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{ ConsumerGroup: group, Topic: mq.Topic, QueueId: mq.QueueId, } cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil) res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second) if err != nil { return -1, err } if res.Code != internal.ResSuccess { return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark) } off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64) if err != nil { return -1, err } return off, nil}
- fetchConsumeOffsetFromBroker办法构建QueryConsumerOffsetRequestHeader申请,而后通过r.client.InvokeSync发动申请
updateConsumeOffsetToBroker
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error { broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName) if broker == "" { r.namesrv.UpdateTopicRouteInfo(mq.Topic) broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName) } if broker == "" { return fmt.Errorf("broker: %s address not found", mq.BrokerName) } updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{ ConsumerGroup: group, Topic: mq.Topic, QueueId: mq.QueueId, CommitOffset: off, } cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil) return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)}
- updateConsumeOffsetToBroker办法构建UpdateConsumerOffsetRequestHeader申请,而后通过r.client.InvokeOneWay发动申请
小结
remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性;它提供了NewRemoteOffsetStore、persist、remove、read、update、fetchConsumeOffsetFromBroker、updateConsumeOffsetToBroker办法
doc
- offset_store