序
本文主要研究一下kingbus的command.go
Close
kingbus/mysql/command.go
//Close the Connfunc (c *Conn) Close() { if c.closed.Load() == true { return } c.closed.Store(true) c.Conn.Close() c.cancel() c.Conn = nil}
- Close方法执行c.closed.Store(true)、c.Conn.Close()、c.cancel()
handleQuery
kingbus/mysql/command.go
func (c *Conn) handleQuery(sql string) (err error) { defer func() { if e := recover(); e != nil { if myerr, ok := e.(error); ok { const size = 4096 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] log.Log.Errorf("Conn handleQuery error,err:%s,sql:%s,stack:%s", myerr, sql, string(buf)) err = myerr } return } }() log.Log.Infof("handleQuery sql:%s", sql) sqlParser := parser.New() stmt, err := sqlParser.ParseOneStmt(sql, "", "") if err != nil { return err } switch v := stmt.(type) { case *ast.ShowStmt: return c.handleShow(v) case *ast.SelectStmt: return c.handleSelect(v) case *ast.SetStmt: return c.handleSet(v) case *ast.KillStmt: return c.handleKill(v) //todo get metrics of slave default: return ErrSQLNotSupport } return}
- handleQuery方法通过sqlParser来解析stmt,然后根据stmt的type来执行不同的方法;对于ShowStmt执行c.handleShow(v),对于SelectStmt执行c.handleSelect(v),对于SetStmt执行c.handleSet(v),对于KillStmt执行c.handleKill(v)
writeOK
kingbus/mysql/resp.go
func (c *Conn) writeOK(r *gomysql.Result) error { if r == nil { r = &gomysql.Result{} } r.Status |= c.status data := make([]byte, 4, 32) data = append(data, gomysql.OK_HEADER) data = append(data, gomysql.PutLengthEncodedInt(r.AffectedRows)...) data = append(data, gomysql.PutLengthEncodedInt(r.InsertId)...) if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 { data = append(data, byte(r.Status), byte(r.Status>>8)) data = append(data, 0, 0) } return c.WritePacket(data)}
- writeOK方法用于响应ping命令
handleBinlogDumpGtid
kingbus/mysql/command.go
//todo kill the slave with same uuidfunc (c *Conn) handleBinlogDumpGtid(ctx context.Context, data []byte) error { var ( err error heartbeatPeriod time.Duration ) slaveGtidExecuted, slaveServerID, err := c.parseMysqlGtidDumpPacket(data) if err != nil { return err } //UnregisterSlave slaveUUID := c.userVariables[SlaveUUID].(string) defer c.binlogServer.UnregisterSlave(slaveUUID) err = c.binlogServer.CheckGtidSet(gomysql.MySQLFlavor, slaveGtidExecuted) if err != nil { log.Log.Errorf("CheckGtidSet error,err:%s,slaveGtids:%v", err, slaveGtidExecuted) return err } //get the previousGtidEvent raft index preGtidEventIndex, err := c.binlogServer.GetMySQLDumpAt(slaveGtidExecuted) if err != nil { log.Log.Errorf("GetMySQLDumpAt error,err:%s,slaveGtids:%v", err, slaveGtidExecuted) return err } fde, err := c.binlogServer.GetFde(preGtidEventIndex) if err != nil { log.Log.Errorf("handleBinlogDumpGtid:GetFde error,err:%s, gtidSet: %s,flavor:%s", err, slaveGtidExecuted.String(), gomysql.MySQLFlavor) return err } //1.send fake rotate event masterServerID := binary.LittleEndian.Uint32(fde[5:]) fileName, err := c.binlogServer.GetNextBinlogFile(preGtidEventIndex) if err != nil { log.Log.Errorf("handleBinlogDumpGtid:GetNextBinlogFile error,err:%s, gtidSet: %s,flavor:%s", err, slaveGtidExecuted.String(), gomysql.MySQLFlavor) return err } err = c.sendFakeRotateEvent(masterServerID, fileName) if err != nil { log.Log.Errorf("handleBinlogDumpGtid:sendFakeRotateEvent error,err:%s, serverId: %d,fileName:%s", err, masterServerID, fileName) return err } //2.send fde err = c.sendFormatDescriptionEvent(fde) if err != nil { log.Log.Errorf("handleBinlogDumpGtid:sendFormatDescriptionEvent error,err:%s, fde:%v", err, fde) return err } //3.send event eventC := make(chan *storagepb.BinlogEvent, 2000) errorC := make(chan error, 1) err = c.binlogServer.DumpBinlogAt(ctx, preGtidEventIndex, slaveGtidExecuted, eventC, errorC) if err != nil { log.Log.Errorf("DumpBinlogAt error,err:%s,preGtidEventIndex:%d,slaveGtidExecuted:%v", err, preGtidEventIndex, slaveGtidExecuted) return err } //4.new metrics slaveEps := metrics.NewMeter() slaveThroughput := metrics.NewMeter() metrics.Register(fmt.Sprintf("slave_eps_%d", slaveServerID), slaveEps) metrics.Register(fmt.Sprintf("slave_thoughput_%d", slaveServerID), slaveThroughput) if period, ok := c.userVariables[MasterHeartbeatPeriod]; ok { heartbeatPeriod = time.Duration(period.(int64)) } else { heartbeatPeriod = MaxHeartbeatPeriod } timer := time.NewTimer(heartbeatPeriod) for { select { case event := <-eventC: //event is not divided or the first divided event //WriteEvent need write a ok_header(one byte),after the header size if event.DividedCount == 0 || (0 < event.DividedCount && event.DividedSeqNum == 0) { err = c.WriteEvent(event.Data, true) if err != nil { log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event) return err } } else { err = c.WriteEvent(event.Data, false) if err != nil { log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event) return err } //event is divided,and the last packet size is MaxPayloadLen //need send a empty packet //https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html if event.DividedSeqNum == event.DividedCount-1 && len(event.Data) == MaxPayloadLen { err = c.WriteEvent(nil, false) if err != nil { log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event) return err } } } slaveEps.Mark(1) slaveThroughput.Mark(int64(len(event.Data))) //reset heartbeat period resetTime(timer, heartbeatPeriod) case err = <-errorC: log.Log.Errorf("binlog server DumpBinlogAt error,err:%s", err) return err case <-ctx.Done(): log.Log.Errorf("handleBinlogDumpGtid:ctx done,quit") return ctx.Err() case <-timer.C: //kingbus send the heartbeat log event which received by syncer to slave log.Log.Debugf("send a heartbeat log event to slave") err = c.sendHeartbeatEvent(masterServerID) if err != nil { return err } //reset heartbeat period resetTime(timer, heartbeatPeriod) } } return nil}
- handleBinlogDumpGtid方法先执行c.sendFakeRotateEvent,再执行c.sendFormatDescriptionEvent,接着执行c.binlogServer.DumpBinlogAt,然后通过select来响应相应事件
handleRegisterSlave
kingbus/mysql/command.go
func (c *Conn) handleRegisterSlave(data []byte) error { var s Slave pos := 0 s.ServerID = int32(binary.LittleEndian.Uint32(data[pos:])) pos += 4 hostNameLen := int(data[pos]) pos++ s.HostName = string(data[pos : pos+hostNameLen]) pos += hostNameLen userLen := int(data[pos]) pos++ s.User = string(data[pos : pos+userLen]) pos += userLen passwordLen := int(data[pos]) pos++ s.Password = string(data[pos : pos+passwordLen]) pos += passwordLen s.Port = int16(binary.LittleEndian.Uint16(data[pos:])) pos += 2 s.Rank = binary.LittleEndian.Uint32(data[pos:]) pos += 4 s.MasterID = binary.LittleEndian.Uint32(data[pos:]) s.State = REGISTERED //kill the zombie dump thread with same uuid c.killZombieDumpThreads() if uuid, ok := c.userVariables[SlaveUUID]; ok { s.UUID = uuid.(string) } else { s.UUID = "" } s.ConnectTime = time.Now() s.Conn = c err := c.binlogServer.RegisterSlave(&s) if err != nil { return err } log.Log.Infof("handleRegisterSlave:slave info:%v", s) return c.writeOK(nil)}
- handleRegisterSlave方法用于处理COM_REGISTER_SLAVE命令,这里先执行c.killZombieDumpThreads(),然后执行c.binlogServer.RegisterSlave(&s)
小结
kingbus的command.go提供了Close、handleQuery、writeOK、handleBinlogDumpGtid、handleRegisterSlave等方法
doc
- command