序
本文次要钻研一下 rocketmq-client-go 的 localFileOffsetStore
OffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
type OffsetStore interface {persist(mqs []*primitive.MessageQueue)
remove(mq *primitive.MessageQueue)
read(mq *primitive.MessageQueue, t readType) int64
update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
}
- OffsetStore 定义了 persist、remove、read、update 办法
localFileOffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
type localFileOffsetStore struct {
group string
path string
OffsetTable map[MessageQueueKey]int64
// mutex for offset file
mutex sync.Mutex
}
- localFileOffsetStore 定义了 group、path、OffsetTable、mutex 属性
NewLocalFileOffsetStore
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func NewLocalFileOffsetStore(clientID, group string) OffsetStore {
store := &localFileOffsetStore{
group: group,
path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"),
OffsetTable: make(map[MessageQueueKey]int64),
}
store.load()
return store
}
- NewLocalFileOffsetStore 创立 localFileOffsetStore,而后执行 store.load()
load
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) load() {local.mutex.Lock()
defer local.mutex.Unlock()
data, err := utils.FileReadAll(local.path)
if os.IsNotExist(err) {return}
if err != nil {rlog.Info("read from local store error, try to use bak file", map[string]interface{}{rlog.LogKeyUnderlayError: err,})
data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))
}
if err != nil {rlog.Info("read from local store bak file error", map[string]interface{}{rlog.LogKeyUnderlayError: err,})
return
}
datas := make(map[MessageQueueKey]int64)
wrapper := OffsetSerializeWrapper{OffsetTable: datas,}
err = jsoniter.Unmarshal(data, &wrapper)
if err != nil {rlog.Warning("unmarshal local offset error", map[string]interface{}{
"local_path": local.path,
rlog.LogKeyUnderlayError: err.Error(),})
return
}
if datas != nil {local.OffsetTable = datas}
}
- load 办法通过 utils.FileReadAll(local.path) 读取 data,而后通过 jsoniter.Unmarshal(data, &wrapper) 将数据组装到 local.OffsetTable
read
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
switch t {
case _ReadFromMemory, _ReadMemoryThenStore:
off := readFromMemory(local.OffsetTable, mq)
if off >= 0 || (off == -1 && t == _ReadFromMemory) {return off}
fallthrough
case _ReadFromStore:
local.load()
return readFromMemory(local.OffsetTable, mq)
default:
}
return -1
}
- read 办法依据 readType 来执行是 readFromMemory 还是执行 ReadFromStore
update
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {local.mutex.Lock()
defer local.mutex.Unlock()
rlog.Debug("update offset", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
"new_offset": offset,
})
key := MessageQueueKey(*mq)
localOffset, exist := local.OffsetTable[key]
if !exist {local.OffsetTable[key] = offset
return
}
if increaseOnly {
if localOffset < offset {local.OffsetTable[key] = offset
}
} else {local.OffsetTable[key] = offset
}
}
- update 办法更新 local.OffsetTable[key]
persist
rocketmq-client-go-v2.0.0/consumer/offset_store.go
func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {if len(mqs) == 0 {return}
local.mutex.Lock()
defer local.mutex.Unlock()
wrapper := OffsetSerializeWrapper{OffsetTable: local.OffsetTable,}
data, _ := jsoniter.Marshal(wrapper)
utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))
}
- persist 办法执行 utils.WriteToFile(local.path, data)
小结
OffsetStore 定义了 persist、remove、read、update 办法;localFileOffsetStore 定义了 group、path、OffsetTable、mutex 属性
doc
- offset_store