本文主要研究一下kingbus的command.go

Close

kingbus/mysql/command.go

//Close the Connfunc (c *Conn) Close() {    if c.closed.Load() == true {        return    }    c.closed.Store(true)    c.Conn.Close()    c.cancel()    c.Conn = nil}
  • Close方法执行c.closed.Store(true)、c.Conn.Close()、c.cancel()

handleQuery

kingbus/mysql/command.go

func (c *Conn) handleQuery(sql string) (err error) {    defer func() {        if e := recover(); e != nil {            if myerr, ok := e.(error); ok {                const size = 4096                buf := make([]byte, size)                buf = buf[:runtime.Stack(buf, false)]                log.Log.Errorf("Conn handleQuery error,err:%s,sql:%s,stack:%s", myerr, sql, string(buf))                err = myerr            }            return        }    }()    log.Log.Infof("handleQuery sql:%s", sql)    sqlParser := parser.New()    stmt, err := sqlParser.ParseOneStmt(sql, "", "")    if err != nil {        return err    }    switch v := stmt.(type) {    case *ast.ShowStmt:        return c.handleShow(v)    case *ast.SelectStmt:        return c.handleSelect(v)    case *ast.SetStmt:        return c.handleSet(v)    case *ast.KillStmt:        return c.handleKill(v)    //todo get metrics of slave    default:        return ErrSQLNotSupport    }    return}
  • handleQuery方法通过sqlParser来解析stmt,然后根据stmt的type来执行不同的方法;对于ShowStmt执行c.handleShow(v),对于SelectStmt执行c.handleSelect(v),对于SetStmt执行c.handleSet(v),对于KillStmt执行c.handleKill(v)

writeOK

kingbus/mysql/resp.go

func (c *Conn) writeOK(r *gomysql.Result) error {    if r == nil {        r = &gomysql.Result{}    }    r.Status |= c.status    data := make([]byte, 4, 32)    data = append(data, gomysql.OK_HEADER)    data = append(data, gomysql.PutLengthEncodedInt(r.AffectedRows)...)    data = append(data, gomysql.PutLengthEncodedInt(r.InsertId)...)    if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 {        data = append(data, byte(r.Status), byte(r.Status>>8))        data = append(data, 0, 0)    }    return c.WritePacket(data)}
  • writeOK方法用于响应ping命令

handleBinlogDumpGtid

kingbus/mysql/command.go

//todo kill the slave with same uuidfunc (c *Conn) handleBinlogDumpGtid(ctx context.Context, data []byte) error {    var (        err             error        heartbeatPeriod time.Duration    )    slaveGtidExecuted, slaveServerID, err := c.parseMysqlGtidDumpPacket(data)    if err != nil {        return err    }    //UnregisterSlave    slaveUUID := c.userVariables[SlaveUUID].(string)    defer c.binlogServer.UnregisterSlave(slaveUUID)    err = c.binlogServer.CheckGtidSet(gomysql.MySQLFlavor, slaveGtidExecuted)    if err != nil {        log.Log.Errorf("CheckGtidSet error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)        return err    }    //get the previousGtidEvent raft index    preGtidEventIndex, err := c.binlogServer.GetMySQLDumpAt(slaveGtidExecuted)    if err != nil {        log.Log.Errorf("GetMySQLDumpAt error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)        return err    }    fde, err := c.binlogServer.GetFde(preGtidEventIndex)    if err != nil {        log.Log.Errorf("handleBinlogDumpGtid:GetFde error,err:%s, gtidSet: %s,flavor:%s",            err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)        return err    }    //1.send fake rotate event    masterServerID := binary.LittleEndian.Uint32(fde[5:])    fileName, err := c.binlogServer.GetNextBinlogFile(preGtidEventIndex)    if err != nil {        log.Log.Errorf("handleBinlogDumpGtid:GetNextBinlogFile error,err:%s, gtidSet: %s,flavor:%s",            err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)        return err    }    err = c.sendFakeRotateEvent(masterServerID, fileName)    if err != nil {        log.Log.Errorf("handleBinlogDumpGtid:sendFakeRotateEvent error,err:%s, serverId: %d,fileName:%s",            err, masterServerID, fileName)        return err    }    //2.send fde    err = c.sendFormatDescriptionEvent(fde)    if err != nil {        log.Log.Errorf("handleBinlogDumpGtid:sendFormatDescriptionEvent error,err:%s, fde:%v",            err, fde)        return err    }    //3.send event    eventC := make(chan *storagepb.BinlogEvent, 2000)    errorC := make(chan error, 1)    err = c.binlogServer.DumpBinlogAt(ctx, preGtidEventIndex, slaveGtidExecuted, eventC, errorC)    if err != nil {        log.Log.Errorf("DumpBinlogAt error,err:%s,preGtidEventIndex:%d,slaveGtidExecuted:%v",            err, preGtidEventIndex, slaveGtidExecuted)        return err    }    //4.new metrics    slaveEps := metrics.NewMeter()    slaveThroughput := metrics.NewMeter()    metrics.Register(fmt.Sprintf("slave_eps_%d", slaveServerID), slaveEps)    metrics.Register(fmt.Sprintf("slave_thoughput_%d", slaveServerID), slaveThroughput)    if period, ok := c.userVariables[MasterHeartbeatPeriod]; ok {        heartbeatPeriod = time.Duration(period.(int64))    } else {        heartbeatPeriod = MaxHeartbeatPeriod    }    timer := time.NewTimer(heartbeatPeriod)    for {        select {        case event := <-eventC:            //event is not divided or the first divided event            //WriteEvent need write a ok_header(one byte),after the header size            if event.DividedCount == 0 || (0 < event.DividedCount && event.DividedSeqNum == 0) {                err = c.WriteEvent(event.Data, true)                if err != nil {                    log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)                    return err                }            } else {                err = c.WriteEvent(event.Data, false)                if err != nil {                    log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)                    return err                }                //event is divided,and the last packet size is MaxPayloadLen                //need send a empty packet                //https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html                if event.DividedSeqNum == event.DividedCount-1 && len(event.Data) == MaxPayloadLen {                    err = c.WriteEvent(nil, false)                    if err != nil {                        log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)                        return err                    }                }            }            slaveEps.Mark(1)            slaveThroughput.Mark(int64(len(event.Data)))            //reset heartbeat period            resetTime(timer, heartbeatPeriod)        case err = <-errorC:            log.Log.Errorf("binlog server DumpBinlogAt error,err:%s", err)            return err        case <-ctx.Done():            log.Log.Errorf("handleBinlogDumpGtid:ctx done,quit")            return ctx.Err()        case <-timer.C:            //kingbus send the heartbeat log event which received by syncer to slave            log.Log.Debugf("send a heartbeat log event to slave")            err = c.sendHeartbeatEvent(masterServerID)            if err != nil {                return err            }            //reset heartbeat period            resetTime(timer, heartbeatPeriod)        }    }    return nil}
  • handleBinlogDumpGtid方法先执行c.sendFakeRotateEvent,再执行c.sendFormatDescriptionEvent,接着执行c.binlogServer.DumpBinlogAt,然后通过select来响应相应事件

handleRegisterSlave

kingbus/mysql/command.go

func (c *Conn) handleRegisterSlave(data []byte) error {    var s Slave    pos := 0    s.ServerID = int32(binary.LittleEndian.Uint32(data[pos:]))    pos += 4    hostNameLen := int(data[pos])    pos++    s.HostName = string(data[pos : pos+hostNameLen])    pos += hostNameLen    userLen := int(data[pos])    pos++    s.User = string(data[pos : pos+userLen])    pos += userLen    passwordLen := int(data[pos])    pos++    s.Password = string(data[pos : pos+passwordLen])    pos += passwordLen    s.Port = int16(binary.LittleEndian.Uint16(data[pos:]))    pos += 2    s.Rank = binary.LittleEndian.Uint32(data[pos:])    pos += 4    s.MasterID = binary.LittleEndian.Uint32(data[pos:])    s.State = REGISTERED    //kill the zombie dump thread with same uuid    c.killZombieDumpThreads()    if uuid, ok := c.userVariables[SlaveUUID]; ok {        s.UUID = uuid.(string)    } else {        s.UUID = ""    }    s.ConnectTime = time.Now()    s.Conn = c    err := c.binlogServer.RegisterSlave(&s)    if err != nil {        return err    }    log.Log.Infof("handleRegisterSlave:slave info:%v", s)    return c.writeOK(nil)}
  • handleRegisterSlave方法用于处理COM_REGISTER_SLAVE命令,这里先执行c.killZombieDumpThreads(),然后执行c.binlogServer.RegisterSlave(&s)

小结

kingbus的command.go提供了Close、handleQuery、writeOK、handleBinlogDumpGtid、handleRegisterSlave等方法

doc

  • command