序
本文主要研究一下kingbus的DumpBinlogAt
DumpBinlogAt
kingbus/server/binlog_server.go
//DumpBinlogAt implements dump binlog event by slave executed gtid setfunc (s *BinlogServer) DumpBinlogAt(ctx context.Context, startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet, eventC chan<- *storagepb.BinlogEvent, errorC chan<- error) error { var inExcludeGroup = false //new a binlog event reader from startRaftIndex, then send event to slave one by one reader, err := s.store.NewEntryReaderAt(startRaftIndex) if err != nil { log.Log.Errorf("NewEntryReaderAt error,err:%s,raftIndex:%d", err, startRaftIndex) return err } nextRaftIndex := reader.NextRaftIndex() log.Log.Infof("DumpBinlogAt:raftIndex:%d,slaveGtids:%s", nextRaftIndex, slaveGtids.String()) go func() { for { //the next read raftIndex must be little than AppliedIndex if nextRaftIndex <= s.kingbusInfo.AppliedIndex() { raftEntry, err := reader.GetNext() if err != nil { log.Log.Errorf("reader.GetNext error,err:%s,nextRaftIndex:%d,AppliedIndex:%d", err, nextRaftIndex, s.kingbusInfo.AppliedIndex()) select { case errorC <- err: default: } return //need quit } nextRaftIndex = reader.NextRaftIndex() //this entry is not binlog event if utils.IsBinlogEvent(raftEntry) == false { continue } event := utils.DecodeBinlogEvent(raftEntry) //filter the event in slave gtids,if the event has send to slave inExcludeGroup = s.skipEvent(event, slaveGtids, inExcludeGroup) if inExcludeGroup { continue } select { case eventC <- event: case <-ctx.Done(): log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err()) select { case errorC <- ctx.Err(): default: } return //need quit } } else { select { case <-s.broadcast.Receive(): break case <-ctx.Done(): log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err()) select { case errorC <- ctx.Err(): default: } return //need quit } } } }() return nil}
- DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventC
NewEntryReaderAt
kingbus/storage/disk_storage.go
//NewEntryReaderAt create a DiskEntryReader at raftIndexfunc (s *DiskStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error) { err := s.checkRaftIndex(raftIndex) if err != nil { log.Log.Errorf("checkRaftIndex error,err:%s,raftIndex:%d", err, raftIndex) return nil, err } reader := new(DiskEntryReader) reader.indexReadAt = raftIndex reader.store = s return reader, nil}
- NewEntryReaderAt方法先通过s.checkRaftIndex(raftIndex)校验一下index,然后创建DiskEntryReader,设置indexReadAt为raftIndex
skipEvent
kingbus/server/binlog_server.go
//skipEvent filter the event has been executed by slavefunc (s *BinlogServer) skipEvent(event *storagepb.BinlogEvent, slaveGtids *gomysql.MysqlGTIDSet, inExcludeGroup bool) bool { switch replication.EventType(event.Type) { case replication.GTID_EVENT: //remove header eventBody := event.Data[replication.EventHeaderSize:] //remove crc32 eventBody = eventBody[:len(eventBody)-replication.BinlogChecksumLength] gtidEvent := &replication.GTIDEvent{} if err := gtidEvent.Decode(eventBody); err != nil { log.Log.Errorf("Decode gtid event error,err:%s", err) return true } u, err := uuid.FromBytes(gtidEvent.SID) if err != nil { log.Log.Errorf("FromBytes error,err:%s,sid:%v", err, gtidEvent.SID) return true } gtidStr := fmt.Sprintf("%s:%d", u.String(), gtidEvent.GNO) currentGtidset, err := gomysql.ParseMysqlGTIDSet(gtidStr) if err != nil { log.Log.Errorf("ParseMysqlGTIDSet error,err:%s,gtid:%s", err, gtidStr) return true } return slaveGtids.Contain(currentGtidset) case replication.ROTATE_EVENT: return false } return inExcludeGroup}
- skipEvent方法根据replication.EventType(event.Type)类型来判断,对于能够取到currentGtidset的通过slaveGtids.Contain(currentGtidset)判断,对于replication.ROTATE_EVENT返回false
小结
DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventC
doc
- binlog_server