本文主要研究一下kingbus的startMasterServer

startMasterServer

kingbus/server/server.go

func (s *KingbusServer) startMasterServer(args *config.BinlogServerConfig) error {    master, err := NewBinlogServer(args, s, s.store, s.applyBroadcast)    if err != nil {        log.Log.Errorf("NewBinlogServer error,err:%s,args:%v", err, *args)        return err    }    s.master = master    s.master.Start()    log.Log.Infof("startMasterServer success,args:%v", *args)    return nil}
  • startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法

NewBinlogServer

kingbus/server/binlog_server.go

//NewBinlogServer create a binlog serverfunc NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error) {    var err error    s := new(BinlogServer)    s.started = atomic.NewBool(false)    s.cfg = cfg    s.listener, err = net.Listen("tcp", s.cfg.Addr)    if err != nil {        log.Log.Errorf("Listen error,err:%s,addr:%s", err, s.cfg.Addr)        return nil, err    }    s.store = store    s.broadcast = broadcast    s.kingbusInfo = ki    s.slaves = make(map[string]*mysql.Slave)    s.errch = make(chan error, 1)    return s, nil}
  • NewBinlogServer方法通过new方法创建BinlogServer,之后设置其listener、store、broadcast等属性

BinlogServer

kingbus/server/binlog_server.go

//BinlogServer is a binlog server,send binlog event to slave.//The generic process://1.authentication//SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'//SET @master_binlog_checksum='NONE'//SET @master_heartbeat_period=%d//2.COM_REGISTER_SLAVE//3.semi-sync://SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';//SET @rpl_semi_sync_slave = 1//4.COM_BINLOG_DUMP_GTIDtype BinlogServer struct {    started *atomic.Bool    cfg     *config.BinlogServerConfig    listener net.Listener    errch    chan error    l      sync.RWMutex    slaves map[string]*mysql.Slave //key is uuid    broadcast   *utils.Broadcast    kingbusInfo KingbusInfo    store       storage.Storage}
  • BinlogServer定义了Bool、BinlogServerConfig、Listener、Slave、Broadcast、KingbusInfo、Storage属性

Start

kingbus/server/binlog_server.go

//Start implements binlog server startfunc (s *BinlogServer) Start() {    s.started.Store(true)    go func() {        for s.started.Load() {            select {            case err := <-s.errch:                log.Log.Errorf("binlog server Run error,err:%s", err)                s.Stop()                return            default:                conn, err := s.listener.Accept()                if err != nil {                    log.Log.Infof("BinlogServer.Start:Accept error,err:%s", err)                    continue                }                go s.onConn(conn)            }        }    }()}
  • Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn)

onConn

kingbus/server/binlog_server.go

func (s *BinlogServer) onConn(c net.Conn) {    mysqlConn, err := mysql.NewConn(c, s, s.cfg.User, s.cfg.Password)    if err != nil {        log.Log.Errorf("onConn error,err:%s", err)        return    }    mysqlConn.Run()}
  • onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法

NewConn

kingbus/mysql/conn.go

//NewConn create a Connfunc NewConn(conn net.Conn, s BinlogServer, user string, password string) (*Conn, error) {    c := new(Conn)    c.user = user    c.BaseConn = NewBaseConn(conn)    c.connectionID = baseConnID.Add(1)    c.salt, _ = gomysql.RandomBuf(20)    c.closed = atomic.NewBool(false)    masterInfo, err := s.GetMasterInfo()    if err != nil {        c.BaseConn.Close()        log.Log.Errorf("NewConn:GetMasterInfo error,err:%s", err)        return nil, err    }    err = c.handshake(masterInfo.Version, password)    if err != nil {        c.BaseConn.Close()        log.Log.Errorf("NewConn:handshake error,err:%s", err)        return nil, err    }    c.ctx, c.cancel = context.WithCancel(context.Background())    c.userVariables = make(map[string]interface{})    c.binlogServer = s    return c, nil}
  • NewConn方法通过s.GetMasterInfo()获取master信息,然后执行c.handshake(masterInfo.Version, password)

handshake

kingbus/mysql/conn.go

//handshake implements the handshake protocol in mysqlfunc (c *Conn) handshake(serverVersion, password string) error {    if err := c.writeInitialHandshake(serverVersion); err != nil {        return err    }    if err := c.readHandshakeResponse(password); err != nil {        c.writeError(err)        return err    }    if err := c.writeOK(nil); err != nil {        return err    }    c.ResetSequence()    return nil}
  • handshake方法实现的是mysql的handshake协议

Run

kingbus/mysql/conn.go

//Run implements handle client request in Connfunc (c *Conn) Run() {    defer func() {        r := recover()        if err, ok := r.(error); ok {            const size = 4096            buf := make([]byte, size)            buf = buf[:runtime.Stack(buf, false)]            log.Log.Errorf("Conn Run error,err:%s,stack:%s", err, string(buf))        }        c.Close()        log.Log.Debugf("close client connection")    }()    for {        select {        case <-c.ctx.Done():            log.Log.Debugf("BinlogServer closed, close connection")            return        default:            data, err := c.ReadPacket()            if err != nil {                log.Log.Errorf("ReadPacket error,err:%s", err)                return            }            if err := c.dispatch(c.ctx, data); err != nil {                log.Log.Errorf("dispatch error,err:%s,data:%v", err.Error(), data)                //if the error is canceled, means the connection was killed by cmd                //don't need send error message                if err != context.Canceled && c.closed.Load() == false {                    c.writeError(err)                }                return            }            //if the connection is closed, return from loop            if c.closed.Load() == true {                log.Log.Infof("connection status is closed,need return")                return            }            c.Sequence = 0        }    }}

-Run方法接收请求,然后通过c.dispatch(c.ctx, data)进行分发

dispatch

kingbus/mysql/command.go

func (c *Conn) dispatch(ctx context.Context, data []byte) error {    cmd := data[0]    data = data[1:]    switch cmd {    case gomysql.COM_QUIT:        c.Close()        log.Log.Debugf("close client connection")        return nil    case gomysql.COM_QUERY:        return c.handleQuery(utils.BytesToString(data))    case gomysql.COM_PING:        return c.writeOK(nil)    case gomysql.COM_BINLOG_DUMP_GTID:        return c.handleBinlogDumpGtid(ctx, data)    case gomysql.COM_REGISTER_SLAVE:        return c.handleRegisterSlave(data)    default:        log.Log.Errorf("master not support this cmd:%v", data)        return c.writeError(ErrSQLNotSupport)    }    return nil}
  • dispatch根据不同的cmd来执行conn的不同方法

小结

startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法;master的Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn);onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法

doc

  • server