本文次要钻研一下rocketmq-client-go的localFileOffsetStore

OffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type OffsetStore interface {    persist(mqs []*primitive.MessageQueue)    remove(mq *primitive.MessageQueue)    read(mq *primitive.MessageQueue, t readType) int64    update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)}
  • OffsetStore定义了persist、remove、read、update办法

localFileOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type localFileOffsetStore struct {    group       string    path        string    OffsetTable map[MessageQueueKey]int64    // mutex for offset file    mutex sync.Mutex}
  • localFileOffsetStore定义了group、path、OffsetTable、mutex属性

NewLocalFileOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewLocalFileOffsetStore(clientID, group string) OffsetStore {    store := &localFileOffsetStore{        group:       group,        path:        filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"),        OffsetTable: make(map[MessageQueueKey]int64),    }    store.load()    return store}
  • NewLocalFileOffsetStore创立localFileOffsetStore,而后执行store.load()

load

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) load() {    local.mutex.Lock()    defer local.mutex.Unlock()    data, err := utils.FileReadAll(local.path)    if os.IsNotExist(err) {        return    }    if err != nil {        rlog.Info("read from local store error, try to use bak file", map[string]interface{}{            rlog.LogKeyUnderlayError: err,        })        data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))    }    if err != nil {        rlog.Info("read from local store bak file error", map[string]interface{}{            rlog.LogKeyUnderlayError: err,        })        return    }    datas := make(map[MessageQueueKey]int64)    wrapper := OffsetSerializeWrapper{        OffsetTable: datas,    }    err = jsoniter.Unmarshal(data, &wrapper)    if err != nil {        rlog.Warning("unmarshal local offset error", map[string]interface{}{            "local_path":             local.path,            rlog.LogKeyUnderlayError: err.Error(),        })        return    }    if datas != nil {        local.OffsetTable = datas    }}
  • load办法通过utils.FileReadAll(local.path)读取data,而后通过jsoniter.Unmarshal(data, &wrapper)将数据组装到local.OffsetTable

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {    switch t {    case _ReadFromMemory, _ReadMemoryThenStore:        off := readFromMemory(local.OffsetTable, mq)        if off >= 0 || (off == -1 && t == _ReadFromMemory) {            return off        }        fallthrough    case _ReadFromStore:        local.load()        return readFromMemory(local.OffsetTable, mq)    default:    }    return -1}
  • read办法依据readType来执行是readFromMemory还是执行ReadFromStore

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {    local.mutex.Lock()    defer local.mutex.Unlock()    rlog.Debug("update offset", map[string]interface{}{        rlog.LogKeyMessageQueue: mq,        "new_offset":            offset,    })    key := MessageQueueKey(*mq)    localOffset, exist := local.OffsetTable[key]    if !exist {        local.OffsetTable[key] = offset        return    }    if increaseOnly {        if localOffset < offset {            local.OffsetTable[key] = offset        }    } else {        local.OffsetTable[key] = offset    }}
  • update办法更新local.OffsetTable[key]

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {    if len(mqs) == 0 {        return    }    local.mutex.Lock()    defer local.mutex.Unlock()    wrapper := OffsetSerializeWrapper{        OffsetTable: local.OffsetTable,    }    data, _ := jsoniter.Marshal(wrapper)    utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))}
  • persist办法执行utils.WriteToFile(local.path, data)

小结

OffsetStore定义了persist、remove、read、update办法;localFileOffsetStore定义了group、path、OffsetTable、mutex属性

doc

  • offset_store