聊聊kingbus的respgo

序本文主要研究一下kingbus的resp.go writeOKkingbus/mysql/resp.go func (c *Conn) writeOK(r *gomysql.Result) error { if r == nil { r = &gomysql.Result{} } r.Status |= c.status data := make([]byte, 4, 32) data = append(data, gomysql.OK_HEADER) data = append(data, gomysql.PutLengthEncodedInt(r.AffectedRows)...) data = append(data, gomysql.PutLengthEncodedInt(r.InsertId)...) if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 { data = append(data, byte(r.Status), byte(r.Status>>8)) data = append(data, 0, 0) } return c.WritePacket(data)}writeOK方法写入gomysql.OK_HEADERwriteErrorkingbus/mysql/resp.go func (c *Conn) writeError(e error) error { var m *gomysql.MyError var ok bool if m, ok = e.(*gomysql.MyError); !ok { m = gomysql.NewError(gomysql.ER_UNKNOWN_ERROR, e.Error()) } data := make([]byte, 4, 16+len(m.Message)) data = append(data, gomysql.ERR_HEADER) data = append(data, byte(m.Code), byte(m.Code>>8)) if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 { data = append(data, '#') data = append(data, m.State...) } data = append(data, m.Message...) return c.WritePacket(data)}writeError方法写入gomysql.ERR_HEADERwriteEOFkingbus/mysql/resp.go ...

June 24, 2020 · 2 min · jiezi

聊聊kingbus的binlogprogressgo

序本文主要研究一下kingbus的binlog_progress.go BinlogProgresskingbus/server/binlog_progress.go //BinlogProgress is the progress of receiving binlogtype BinlogProgress struct { currentGtid *atomic.String lastSaveGtid string //for heartbeat event lastBinlogFile *atomic.String lastFilePosition *atomic.Uint32 executedGtidSetStr *atomic.String trxBoundaryParser *mysql.TransactionBoundaryParser persistentTime time.Time persistentAppliedIndex uint64 executedGtidSet gomysql.GTIDSet store storage.Storage}BinlogProgress定义了currentGtid、lastSaveGtid、lastBinlogFile、lastFilePosition、executedGtidSetStr、trxBoundaryParser、persistentTime、persistentAppliedIndex、executedGtidSet、store属性newBinlogProgresskingbus/server/binlog_progress.go func newBinlogProgress(store storage.Storage) (*BinlogProgress, error) { var err error p := new(BinlogProgress) p.trxBoundaryParser = new(mysql.TransactionBoundaryParser) p.trxBoundaryParser.Reset() p.currentGtid = atomic.NewString("") p.lastBinlogFile = atomic.NewString("") p.lastFilePosition = atomic.NewUint32(0) p.persistentAppliedIndex = 0 p.persistentTime = time.Unix(0, 0) //get executed gtid_set //This value may be old, but resetBinlogProgress will update it to the latest p.executedGtidSet, err = store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey) if err != nil { log.Log.Errorf("newBinlogProgress:get executedGtidSet error,err:%s", err) return nil, err } p.executedGtidSetStr = atomic.NewString(p.executedGtidSet.String()) p.store = store return p, nil}newBinlogProgress方法创建了BinlogProgress及mysql.TransactionBoundaryParser,之后通过store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey)获取executedGtidSetupdateProcesskingbus/server/binlog_progress.go ...

June 22, 2020 · 2 min · jiezi

聊聊kingbus的DumpBinlogAt

序本文主要研究一下kingbus的DumpBinlogAt DumpBinlogAtkingbus/server/binlog_server.go //DumpBinlogAt implements dump binlog event by slave executed gtid setfunc (s *BinlogServer) DumpBinlogAt(ctx context.Context, startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet, eventC chan<- *storagepb.BinlogEvent, errorC chan<- error) error { var inExcludeGroup = false //new a binlog event reader from startRaftIndex, then send event to slave one by one reader, err := s.store.NewEntryReaderAt(startRaftIndex) if err != nil { log.Log.Errorf("NewEntryReaderAt error,err:%s,raftIndex:%d", err, startRaftIndex) return err } nextRaftIndex := reader.NextRaftIndex() log.Log.Infof("DumpBinlogAt:raftIndex:%d,slaveGtids:%s", nextRaftIndex, slaveGtids.String()) go func() { for { //the next read raftIndex must be little than AppliedIndex if nextRaftIndex <= s.kingbusInfo.AppliedIndex() { raftEntry, err := reader.GetNext() if err != nil { log.Log.Errorf("reader.GetNext error,err:%s,nextRaftIndex:%d,AppliedIndex:%d", err, nextRaftIndex, s.kingbusInfo.AppliedIndex()) select { case errorC <- err: default: } return //need quit } nextRaftIndex = reader.NextRaftIndex() //this entry is not binlog event if utils.IsBinlogEvent(raftEntry) == false { continue } event := utils.DecodeBinlogEvent(raftEntry) //filter the event in slave gtids,if the event has send to slave inExcludeGroup = s.skipEvent(event, slaveGtids, inExcludeGroup) if inExcludeGroup { continue } select { case eventC <- event: case <-ctx.Done(): log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err()) select { case errorC <- ctx.Err(): default: } return //need quit } } else { select { case <-s.broadcast.Receive(): break case <-ctx.Done(): log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err()) select { case errorC <- ctx.Err(): default: } return //need quit } } } }() return nil}DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventCNewEntryReaderAtkingbus/storage/disk_storage.go ...

June 21, 2020 · 2 min · jiezi

聊聊kingbus的commandgo

序本文主要研究一下kingbus的command.go Closekingbus/mysql/command.go //Close the Connfunc (c *Conn) Close() { if c.closed.Load() == true { return } c.closed.Store(true) c.Conn.Close() c.cancel() c.Conn = nil}Close方法执行c.closed.Store(true)、c.Conn.Close()、c.cancel()handleQuerykingbus/mysql/command.go func (c *Conn) handleQuery(sql string) (err error) { defer func() { if e := recover(); e != nil { if myerr, ok := e.(error); ok { const size = 4096 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] log.Log.Errorf("Conn handleQuery error,err:%s,sql:%s,stack:%s", myerr, sql, string(buf)) err = myerr } return } }() log.Log.Infof("handleQuery sql:%s", sql) sqlParser := parser.New() stmt, err := sqlParser.ParseOneStmt(sql, "", "") if err != nil { return err } switch v := stmt.(type) { case *ast.ShowStmt: return c.handleShow(v) case *ast.SelectStmt: return c.handleSelect(v) case *ast.SetStmt: return c.handleSet(v) case *ast.KillStmt: return c.handleKill(v) //todo get metrics of slave default: return ErrSQLNotSupport } return}handleQuery方法通过sqlParser来解析stmt,然后根据stmt的type来执行不同的方法;对于ShowStmt执行c.handleShow(v),对于SelectStmt执行c.handleSelect(v),对于SetStmt执行c.handleSet(v),对于KillStmt执行c.handleKill(v)writeOKkingbus/mysql/resp.go ...

June 20, 2020 · 4 min · jiezi

聊聊kingbus的startMasterServer

序本文主要研究一下kingbus的startMasterServer startMasterServerkingbus/server/server.go func (s *KingbusServer) startMasterServer(args *config.BinlogServerConfig) error { master, err := NewBinlogServer(args, s, s.store, s.applyBroadcast) if err != nil { log.Log.Errorf("NewBinlogServer error,err:%s,args:%v", err, *args) return err } s.master = master s.master.Start() log.Log.Infof("startMasterServer success,args:%v", *args) return nil}startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法NewBinlogServerkingbus/server/binlog_server.go //NewBinlogServer create a binlog serverfunc NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error) { var err error s := new(BinlogServer) s.started = atomic.NewBool(false) s.cfg = cfg s.listener, err = net.Listen("tcp", s.cfg.Addr) if err != nil { log.Log.Errorf("Listen error,err:%s,addr:%s", err, s.cfg.Addr) return nil, err } s.store = store s.broadcast = broadcast s.kingbusInfo = ki s.slaves = make(map[string]*mysql.Slave) s.errch = make(chan error, 1) return s, nil}NewBinlogServer方法通过new方法创建BinlogServer,之后设置其listener、store、broadcast等属性BinlogServerkingbus/server/binlog_server.go ...

June 20, 2020 · 3 min · jiezi

聊聊kingbus的binlogserverhandlergo

序本文主要研究一下kingbus的binlog_server_handler.go StartBinlogServerkingbus/api/binlog_server_handler.go //StartBinlogServer implements start a binlog serverfunc (h *BinlogServerHandler) StartBinlogServer(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() var args config.BinlogServerConfig var err error defer func() { if err != nil { log.Log.Errorf("StartBinlogServer error,err: %s", err) echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } }() err = echoCtx.Bind(&args) if err != nil { return err } kingbusIP := h.svr.GetIP() //check args err = args.Check(kingbusIP) if err != nil { return err } //start syncer server err = h.svr.StartServer(config.BinlogServerType, &args) if err != nil { log.Log.Errorf("start server error,err:%s,args:%v", err, args) return err } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))}StartBinlogServer方法主要是通过h.svr.StartServer(config.BinlogServerType, &args)来启动binlog serverStartServerkingbus/server/server.go ...

June 18, 2020 · 3 min · jiezi

聊聊kingbus的binlogsyncerhandlergo

序本文主要研究一下kingbus的binlog_syncer_handler.go StartBinlogSyncerkingbus/api/binlog_syncer_handler.go //StartBinlogSyncer implements start a binlog syncerfunc (h *BinlogSyncerHandler) StartBinlogSyncer(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() var args config.SyncerArgs var err error var syncerID int defer func() { if err != nil { log.Log.Errorf("StartBinlogSyncer error,err: %s", err) echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } }() err = echoCtx.Bind(&args) if err != nil { return err } //check args err = args.Check() if err != nil { return err } //forward to leader if h.svr.IsLeader() == false { req, err := json.Marshal(args) if err != nil { return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } resp, err := h.sendToLeader("PUT", "/binlog/syncer/start", req) if err != nil { log.Log.Errorf("sendToLeader error,err:%s,args:%v", err, args) return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } if resp.Message != "success" { return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message)) } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data)) } //start syncer server err = h.svr.StartServer(config.SyncerServerType, &args) if err != nil { log.Log.Errorf("StartServer error,err:%s,args:%v", err, args) return err } //propose start syncer info err = h.ProposeSyncerArgs(&args) if err != nil { log.Log.Errorf("ProposeSyncerArgs error,err:%s,args:%v", err, args) return err } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerID))}StartBinlogSyncer方法先执行echoCtx.Bind(&args),然后针对h.svr.IsLeader()为false的通过h.sendToLeader("PUT", "/binlog/syncer/start", req)将请求转发给leader;为true的则执行h.svr.StartServer(config.SyncerServerType, &args)启动syncer server,然后执行h.ProposeSyncerArgs(&args)启动propose syncerStopBinlogSyncerkingbus/api/binlog_syncer_handler.go ...

June 17, 2020 · 3 min · jiezi