序
本文主要研究一下kingbus的startMasterServer
startMasterServer
kingbus/server/server.go
func (s *KingbusServer) startMasterServer(args *config.BinlogServerConfig) error { master, err := NewBinlogServer(args, s, s.store, s.applyBroadcast) if err != nil { log.Log.Errorf("NewBinlogServer error,err:%s,args:%v", err, *args) return err } s.master = master s.master.Start() log.Log.Infof("startMasterServer success,args:%v", *args) return nil}
- startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法
NewBinlogServer
kingbus/server/binlog_server.go
//NewBinlogServer create a binlog serverfunc NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error) { var err error s := new(BinlogServer) s.started = atomic.NewBool(false) s.cfg = cfg s.listener, err = net.Listen("tcp", s.cfg.Addr) if err != nil { log.Log.Errorf("Listen error,err:%s,addr:%s", err, s.cfg.Addr) return nil, err } s.store = store s.broadcast = broadcast s.kingbusInfo = ki s.slaves = make(map[string]*mysql.Slave) s.errch = make(chan error, 1) return s, nil}
- NewBinlogServer方法通过new方法创建BinlogServer,之后设置其listener、store、broadcast等属性
BinlogServer
kingbus/server/binlog_server.go
//BinlogServer is a binlog server,send binlog event to slave.//The generic process://1.authentication//SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'//SET @master_binlog_checksum='NONE'//SET @master_heartbeat_period=%d//2.COM_REGISTER_SLAVE//3.semi-sync://SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';//SET @rpl_semi_sync_slave = 1//4.COM_BINLOG_DUMP_GTIDtype BinlogServer struct { started *atomic.Bool cfg *config.BinlogServerConfig listener net.Listener errch chan error l sync.RWMutex slaves map[string]*mysql.Slave //key is uuid broadcast *utils.Broadcast kingbusInfo KingbusInfo store storage.Storage}
- BinlogServer定义了Bool、BinlogServerConfig、Listener、Slave、Broadcast、KingbusInfo、Storage属性
Start
kingbus/server/binlog_server.go
//Start implements binlog server startfunc (s *BinlogServer) Start() { s.started.Store(true) go func() { for s.started.Load() { select { case err := <-s.errch: log.Log.Errorf("binlog server Run error,err:%s", err) s.Stop() return default: conn, err := s.listener.Accept() if err != nil { log.Log.Infof("BinlogServer.Start:Accept error,err:%s", err) continue } go s.onConn(conn) } } }()}
- Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn)
onConn
kingbus/server/binlog_server.go
func (s *BinlogServer) onConn(c net.Conn) { mysqlConn, err := mysql.NewConn(c, s, s.cfg.User, s.cfg.Password) if err != nil { log.Log.Errorf("onConn error,err:%s", err) return } mysqlConn.Run()}
- onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法
NewConn
kingbus/mysql/conn.go
//NewConn create a Connfunc NewConn(conn net.Conn, s BinlogServer, user string, password string) (*Conn, error) { c := new(Conn) c.user = user c.BaseConn = NewBaseConn(conn) c.connectionID = baseConnID.Add(1) c.salt, _ = gomysql.RandomBuf(20) c.closed = atomic.NewBool(false) masterInfo, err := s.GetMasterInfo() if err != nil { c.BaseConn.Close() log.Log.Errorf("NewConn:GetMasterInfo error,err:%s", err) return nil, err } err = c.handshake(masterInfo.Version, password) if err != nil { c.BaseConn.Close() log.Log.Errorf("NewConn:handshake error,err:%s", err) return nil, err } c.ctx, c.cancel = context.WithCancel(context.Background()) c.userVariables = make(map[string]interface{}) c.binlogServer = s return c, nil}
- NewConn方法通过s.GetMasterInfo()获取master信息,然后执行c.handshake(masterInfo.Version, password)
handshake
kingbus/mysql/conn.go
//handshake implements the handshake protocol in mysqlfunc (c *Conn) handshake(serverVersion, password string) error { if err := c.writeInitialHandshake(serverVersion); err != nil { return err } if err := c.readHandshakeResponse(password); err != nil { c.writeError(err) return err } if err := c.writeOK(nil); err != nil { return err } c.ResetSequence() return nil}
- handshake方法实现的是mysql的handshake协议
Run
kingbus/mysql/conn.go
//Run implements handle client request in Connfunc (c *Conn) Run() { defer func() { r := recover() if err, ok := r.(error); ok { const size = 4096 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] log.Log.Errorf("Conn Run error,err:%s,stack:%s", err, string(buf)) } c.Close() log.Log.Debugf("close client connection") }() for { select { case <-c.ctx.Done(): log.Log.Debugf("BinlogServer closed, close connection") return default: data, err := c.ReadPacket() if err != nil { log.Log.Errorf("ReadPacket error,err:%s", err) return } if err := c.dispatch(c.ctx, data); err != nil { log.Log.Errorf("dispatch error,err:%s,data:%v", err.Error(), data) //if the error is canceled, means the connection was killed by cmd //don't need send error message if err != context.Canceled && c.closed.Load() == false { c.writeError(err) } return } //if the connection is closed, return from loop if c.closed.Load() == true { log.Log.Infof("connection status is closed,need return") return } c.Sequence = 0 } }}
-Run方法接收请求,然后通过c.dispatch(c.ctx, data)进行分发
dispatch
kingbus/mysql/command.go
func (c *Conn) dispatch(ctx context.Context, data []byte) error { cmd := data[0] data = data[1:] switch cmd { case gomysql.COM_QUIT: c.Close() log.Log.Debugf("close client connection") return nil case gomysql.COM_QUERY: return c.handleQuery(utils.BytesToString(data)) case gomysql.COM_PING: return c.writeOK(nil) case gomysql.COM_BINLOG_DUMP_GTID: return c.handleBinlogDumpGtid(ctx, data) case gomysql.COM_REGISTER_SLAVE: return c.handleRegisterSlave(data) default: log.Log.Errorf("master not support this cmd:%v", data) return c.writeError(ErrSQLNotSupport) } return nil}
- dispatch根据不同的cmd来执行conn的不同方法
小结
startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法;master的Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn);onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法
doc
- server