本文主要研究一下kingbus的binlog_progress.go

BinlogProgress

kingbus/server/binlog_progress.go

//BinlogProgress is the progress of receiving binlogtype BinlogProgress struct {    currentGtid  *atomic.String    lastSaveGtid string    //for heartbeat event    lastBinlogFile     *atomic.String    lastFilePosition   *atomic.Uint32    executedGtidSetStr *atomic.String    trxBoundaryParser *mysql.TransactionBoundaryParser    persistentTime         time.Time    persistentAppliedIndex uint64    executedGtidSet        gomysql.GTIDSet    store                  storage.Storage}
  • BinlogProgress定义了currentGtid、lastSaveGtid、lastBinlogFile、lastFilePosition、executedGtidSetStr、trxBoundaryParser、persistentTime、persistentAppliedIndex、executedGtidSet、store属性

newBinlogProgress

kingbus/server/binlog_progress.go

func newBinlogProgress(store storage.Storage) (*BinlogProgress, error) {    var err error    p := new(BinlogProgress)    p.trxBoundaryParser = new(mysql.TransactionBoundaryParser)    p.trxBoundaryParser.Reset()    p.currentGtid = atomic.NewString("")    p.lastBinlogFile = atomic.NewString("")    p.lastFilePosition = atomic.NewUint32(0)    p.persistentAppliedIndex = 0    p.persistentTime = time.Unix(0, 0)    //get executed gtid_set    //This value may be old, but resetBinlogProgress will update it to the latest    p.executedGtidSet, err = store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey)    if err != nil {        log.Log.Errorf("newBinlogProgress:get executedGtidSet error,err:%s", err)        return nil, err    }    p.executedGtidSetStr = atomic.NewString(p.executedGtidSet.String())    p.store = store    return p, nil}
  • newBinlogProgress方法创建了BinlogProgress及mysql.TransactionBoundaryParser,之后通过store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey)获取executedGtidSet

updateProcess

kingbus/server/binlog_progress.go

//updateProcess update and save executedGtid setfunc (s *BinlogProgress) updateProcess(raftIndex uint64, eventRawData []byte) error {    var err error    //parse event header    h := new(replication.EventHeader)    err = h.Decode(eventRawData)    if err != nil {        log.Log.Errorf("Decode error,err:%s,buf:%v", err, eventRawData)        return err    }    //set the heartbeat info    s.lastFilePosition.Store(h.LogPos)    //remove header    eventRawData = eventRawData[replication.EventHeaderSize:]    eventLen := int(h.EventSize) - replication.EventHeaderSize    if len(eventRawData) != eventLen {        return fmt.Errorf("invalid data size %d in event %s, less event length %d",            len(eventRawData), h.EventType, eventLen)    }    //remove crc32    eventRawData = eventRawData[:len(eventRawData)-replication.BinlogChecksumLength]    //the eventRawData maybe the first divided packet, but must not be query event    //so don't worry    eventBoundaryType, err := s.trxBoundaryParser.GetEventBoundaryType(h, eventRawData)    if err != nil {        log.Log.Errorf("GetEventBoundaryType error,err:%s,header:%v",            err, *h)        return err    }    //ignore updateState error, maybe a partial trx    err = s.trxBoundaryParser.UpdateState(eventBoundaryType)    if err != nil {        log.Log.Warnf("trxBoundaryParser UpdateState error,err:%s,header:%v", err, *h)        s.trxBoundaryParser.Reset()        s.currentGtid.Store("")        return nil    }    currentGtidStr := s.currentGtid.Load()    if s.trxBoundaryParser.IsNotInsideTransaction() &&        len(currentGtidStr) != 0 && s.lastSaveGtid != currentGtidStr {        log.Log.Debugf("current gtid is :%s,add into executedGtidSet:%s",            currentGtidStr, s.executedGtidSet.String())        //update executedGtidSet        err = s.executedGtidSet.Update(currentGtidStr)        if err != nil {            return err        }        s.lastSaveGtid = currentGtidStr        s.executedGtidSetStr.Store(s.executedGtidSet.String())        //save the raftIndex and executedGtidSet at the same time        if raftIndex-s.persistentAppliedIndex > persistentCount ||            time.Now().Sub(s.persistentTime) > persistentTimeInterval {            err = s.store.SetBinlogProgress(raftIndex, s.executedGtidSet)            if err != nil {                log.Log.Errorf("SetGtidSet error,err:%s,key:%s,value:%s",                    err, storage.ExecutedGtidSetKey, s.executedGtidSet.String())                return err            }            s.persistentAppliedIndex = raftIndex            s.persistentTime = time.Now()        }    }    return nil}
  • updateProcess方法会解析eventRawData为replication.EventHeader,然后存储h.LogPos;之后通过s.trxBoundaryParser.GetEventBoundaryType(h, eventRawData)获取eventBoundaryType,然后通过s.trxBoundaryParser.UpdateState(eventBoundaryType)更新;之后通过s.executedGtidSet.Update(currentGtidStr)更新currentGtidStr;最后通过s.store.SetBinlogProgress(raftIndex, s.executedGtidSet)更新binlogProgress

小结

kingbus的binlog_progress.go提供了newBinlogProgress、updateProcess方法用于存储binglogProgress

doc

  • binlog_progress