共计 5332 个字符,预计需要花费 14 分钟才能阅读完成。
序
本文主要研究一下 kingbus 的 startMasterServer
startMasterServer
kingbus/server/server.go
func (s *KingbusServer) startMasterServer(args *config.BinlogServerConfig) error {master, err := NewBinlogServer(args, s, s.store, s.applyBroadcast)
if err != nil {log.Log.Errorf("NewBinlogServer error,err:%s,args:%v", err, *args)
return err
}
s.master = master
s.master.Start()
log.Log.Infof("startMasterServer success,args:%v", *args)
return nil
}
- startMasterServer 方法先执行 NewBinlogServer 创建 master,然后执行 master 的 Start 方法
NewBinlogServer
kingbus/server/binlog_server.go
//NewBinlogServer create a binlog server
func NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error) {
var err error
s := new(BinlogServer)
s.started = atomic.NewBool(false)
s.cfg = cfg
s.listener, err = net.Listen("tcp", s.cfg.Addr)
if err != nil {log.Log.Errorf("Listen error,err:%s,addr:%s", err, s.cfg.Addr)
return nil, err
}
s.store = store
s.broadcast = broadcast
s.kingbusInfo = ki
s.slaves = make(map[string]*mysql.Slave)
s.errch = make(chan error, 1)
return s, nil
}
- NewBinlogServer 方法通过 new 方法创建 BinlogServer,之后设置其 listener、store、broadcast 等属性
BinlogServer
kingbus/server/binlog_server.go
//BinlogServer is a binlog server,send binlog event to slave.
//The generic process:
//1.authentication
//SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'
//SET @master_binlog_checksum='NONE'
//SET @master_heartbeat_period=%d
//2.COM_REGISTER_SLAVE
//3.semi-sync:
//SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';
//SET @rpl_semi_sync_slave = 1
//4.COM_BINLOG_DUMP_GTID
type BinlogServer struct {
started *atomic.Bool
cfg *config.BinlogServerConfig
listener net.Listener
errch chan error
l sync.RWMutex
slaves map[string]*mysql.Slave //key is uuid
broadcast *utils.Broadcast
kingbusInfo KingbusInfo
store storage.Storage
}
- BinlogServer 定义了 Bool、BinlogServerConfig、Listener、Slave、Broadcast、KingbusInfo、Storage 属性
Start
kingbus/server/binlog_server.go
//Start implements binlog server start
func (s *BinlogServer) Start() {s.started.Store(true)
go func() {for s.started.Load() {
select {
case err := <-s.errch:
log.Log.Errorf("binlog server Run error,err:%s", err)
s.Stop()
return
default:
conn, err := s.listener.Accept()
if err != nil {log.Log.Infof("BinlogServer.Start:Accept error,err:%s", err)
continue
}
go s.onConn(conn)
}
}
}()}
- Start 方法先执行 s.started.Store(true),然后通过 select 机制监听 s.listener.Accept(),之后执行 s.onConn(conn)
onConn
kingbus/server/binlog_server.go
func (s *BinlogServer) onConn(c net.Conn) {mysqlConn, err := mysql.NewConn(c, s, s.cfg.User, s.cfg.Password)
if err != nil {log.Log.Errorf("onConn error,err:%s", err)
return
}
mysqlConn.Run()}
- onConn 方法通过 mysql.NewConn 创建 mysqlConn,然后执行 mysqlConn.Run 方法
NewConn
kingbus/mysql/conn.go
//NewConn create a Conn
func NewConn(conn net.Conn, s BinlogServer, user string, password string) (*Conn, error) {c := new(Conn)
c.user = user
c.BaseConn = NewBaseConn(conn)
c.connectionID = baseConnID.Add(1)
c.salt, _ = gomysql.RandomBuf(20)
c.closed = atomic.NewBool(false)
masterInfo, err := s.GetMasterInfo()
if err != nil {c.BaseConn.Close()
log.Log.Errorf("NewConn:GetMasterInfo error,err:%s", err)
return nil, err
}
err = c.handshake(masterInfo.Version, password)
if err != nil {c.BaseConn.Close()
log.Log.Errorf("NewConn:handshake error,err:%s", err)
return nil, err
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.userVariables = make(map[string]interface{})
c.binlogServer = s
return c, nil
}
- NewConn 方法通过 s.GetMasterInfo() 获取 master 信息,然后执行 c.handshake(masterInfo.Version, password)
handshake
kingbus/mysql/conn.go
//handshake implements the handshake protocol in mysql
func (c *Conn) handshake(serverVersion, password string) error {if err := c.writeInitialHandshake(serverVersion); err != nil {return err}
if err := c.readHandshakeResponse(password); err != nil {c.writeError(err)
return err
}
if err := c.writeOK(nil); err != nil {return err}
c.ResetSequence()
return nil
}
- handshake 方法实现的是 mysql 的 handshake 协议
Run
kingbus/mysql/conn.go
//Run implements handle client request in Conn
func (c *Conn) Run() {defer func() {r := recover()
if err, ok := r.(error); ok {
const size = 4096
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Log.Errorf("Conn Run error,err:%s,stack:%s", err, string(buf))
}
c.Close()
log.Log.Debugf("close client connection")
}()
for {
select {case <-c.ctx.Done():
log.Log.Debugf("BinlogServer closed, close connection")
return
default:
data, err := c.ReadPacket()
if err != nil {log.Log.Errorf("ReadPacket error,err:%s", err)
return
}
if err := c.dispatch(c.ctx, data); err != nil {log.Log.Errorf("dispatch error,err:%s,data:%v", err.Error(), data)
//if the error is canceled, means the connection was killed by cmd
//don't need send error message
if err != context.Canceled && c.closed.Load() == false {c.writeError(err)
}
return
}
//if the connection is closed, return from loop
if c.closed.Load() == true {log.Log.Infof("connection status is closed,need return")
return
}
c.Sequence = 0
}
}
}
-Run 方法接收请求,然后通过 c.dispatch(c.ctx, data) 进行分发
dispatch
kingbus/mysql/command.go
func (c *Conn) dispatch(ctx context.Context, data []byte) error {cmd := data[0]
data = data[1:]
switch cmd {
case gomysql.COM_QUIT:
c.Close()
log.Log.Debugf("close client connection")
return nil
case gomysql.COM_QUERY:
return c.handleQuery(utils.BytesToString(data))
case gomysql.COM_PING:
return c.writeOK(nil)
case gomysql.COM_BINLOG_DUMP_GTID:
return c.handleBinlogDumpGtid(ctx, data)
case gomysql.COM_REGISTER_SLAVE:
return c.handleRegisterSlave(data)
default:
log.Log.Errorf("master not support this cmd:%v", data)
return c.writeError(ErrSQLNotSupport)
}
return nil
}
- dispatch 根据不同的 cmd 来执行 conn 的不同方法
小结
startMasterServer 方法先执行 NewBinlogServer 创建 master,然后执行 master 的 Start 方法;master 的 Start 方法先执行 s.started.Store(true),然后通过 select 机制监听 s.listener.Accept(),之后执行 s.onConn(conn);onConn 方法通过 mysql.NewConn 创建 mysqlConn,然后执行 mysqlConn.Run 方法
doc
- server
正文完