序
本文次要钻研一下rocketmq-client-go的localFileOffsetStore
OffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
type OffsetStore interface { persist(mqs []*primitive.MessageQueue) remove(mq *primitive.MessageQueue) read(mq *primitive.MessageQueue, t readType) int64 update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)}
- OffsetStore定义了persist、remove、read、update办法
localFileOffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
type localFileOffsetStore struct { group string path string OffsetTable map[MessageQueueKey]int64 // mutex for offset file mutex sync.Mutex}
- localFileOffsetStore定义了group、path、OffsetTable、mutex属性
NewLocalFileOffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func NewLocalFileOffsetStore(clientID, group string) OffsetStore { store := &localFileOffsetStore{ group: group, path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"), OffsetTable: make(map[MessageQueueKey]int64), } store.load() return store}
- NewLocalFileOffsetStore创立localFileOffsetStore,而后执行store.load()
load
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) load() { local.mutex.Lock() defer local.mutex.Unlock() data, err := utils.FileReadAll(local.path) if os.IsNotExist(err) { return } if err != nil { rlog.Info("read from local store error, try to use bak file", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) data, err = utils.FileReadAll(filepath.Join(local.path, ".bak")) } if err != nil { rlog.Info("read from local store bak file error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) return } datas := make(map[MessageQueueKey]int64) wrapper := OffsetSerializeWrapper{ OffsetTable: datas, } err = jsoniter.Unmarshal(data, &wrapper) if err != nil { rlog.Warning("unmarshal local offset error", map[string]interface{}{ "local_path": local.path, rlog.LogKeyUnderlayError: err.Error(), }) return } if datas != nil { local.OffsetTable = datas }}
- load办法通过utils.FileReadAll(local.path)读取data,而后通过jsoniter.Unmarshal(data, &wrapper)将数据组装到local.OffsetTable
read
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 { switch t { case _ReadFromMemory, _ReadMemoryThenStore: off := readFromMemory(local.OffsetTable, mq) if off >= 0 || (off == -1 && t == _ReadFromMemory) { return off } fallthrough case _ReadFromStore: local.load() return readFromMemory(local.OffsetTable, mq) default: } return -1}
- read办法依据readType来执行是readFromMemory还是执行ReadFromStore
update
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) { local.mutex.Lock() defer local.mutex.Unlock() rlog.Debug("update offset", map[string]interface{}{ rlog.LogKeyMessageQueue: mq, "new_offset": offset, }) key := MessageQueueKey(*mq) localOffset, exist := local.OffsetTable[key] if !exist { local.OffsetTable[key] = offset return } if increaseOnly { if localOffset < offset { local.OffsetTable[key] = offset } } else { local.OffsetTable[key] = offset }}
- update办法更新local.OffsetTable[key]
persist
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) { if len(mqs) == 0 { return } local.mutex.Lock() defer local.mutex.Unlock() wrapper := OffsetSerializeWrapper{ OffsetTable: local.OffsetTable, } data, _ := jsoniter.Marshal(wrapper) utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))}
- persist办法执行utils.WriteToFile(local.path, data)
小结
OffsetStore定义了persist、remove、read、update办法;localFileOffsetStore定义了group、path、OffsetTable、mutex属性
doc
- offset_store