本文主要研究一下kingbus的binlog_server_handler.go

StartBinlogServer

kingbus/api/binlog_server_handler.go

//StartBinlogServer implements start a binlog serverfunc (h *BinlogServerHandler) StartBinlogServer(echoCtx echo.Context) error {    h.l.Lock()    defer h.l.Unlock()    var args config.BinlogServerConfig    var err error    defer func() {        if err != nil {            log.Log.Errorf("StartBinlogServer error,err: %s", err)            echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))        }    }()    err = echoCtx.Bind(&args)    if err != nil {        return err    }    kingbusIP := h.svr.GetIP()    //check args    err = args.Check(kingbusIP)    if err != nil {        return err    }    //start syncer server    err = h.svr.StartServer(config.BinlogServerType, &args)    if err != nil {        log.Log.Errorf("start server error,err:%s,args:%v", err, args)        return err    }    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))}
  • StartBinlogServer方法主要是通过h.svr.StartServer(config.BinlogServerType, &args)来启动binlog server

StartServer

kingbus/server/server.go

//StartServer start sub servers:syncer server or binlog master serverfunc (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error {    var err error    switch svrType {    case config.SyncerServerType:        if s.IsSyncerStarted() {            return ErrStarted        }        syncerArgs, ok := args.(*config.SyncerArgs)        if !ok {            log.Log.Errorf("StartServer args is illegal,args:%v", args)            return ErrArgs        }        err = s.startSyncerServer(syncerArgs)        if err != nil {            log.Log.Errorf("startSyncerServer error,err:%s,args:%v", err, *syncerArgs)            return ErrArgs        }        //start to propose binlog event to raft cluster        s.StartProposeBinlog(s.syncer.ctx)        log.Log.Debugf("start syncer,and propose!!!")        return nil    case config.BinlogServerType:        if s.IsBinlogServerStarted() {            return ErrStarted        }        masterArgs, ok := args.(*config.BinlogServerConfig)        if !ok {            log.Log.Errorf("StartServer args is illegal,args:%v", args)            return ErrArgs        }        err = s.startMasterServer(masterArgs)        if err != nil {            log.Log.Errorf("startMasterServer error,err:%s,args:%v", err, *masterArgs)            return ErrArgs        }        return nil    default:        log.Log.Fatalf("StartServer:server type not support,serverType:%v", svrType)    }    return nil}
  • StartServer方法根据svrType参数来启动不同的server,若是config.SyncerServerType则启动的是SyncerServer,若是config.BinlogServerType则启动的是BinlogServer

StopBinlogServer

kingbus/api/binlog_server_handler.go

//StopBinlogServer implements stop binlog serverfunc (h *BinlogServerHandler) StopBinlogServer(echoCtx echo.Context) error {    h.l.Lock()    defer h.l.Unlock()    h.svr.StopServer(config.BinlogServerType)    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))}
  • StopBinlogServer方法通过h.svr.StopServer(config.BinlogServerType)来关闭binlog server

StopServer

kingbus/server/server.go

//StopServer stop sub serverfunc (s *KingbusServer) StopServer(svrType config.SubServerType) {    switch svrType {    case config.SyncerServerType:        if s.IsSyncerStarted() {            s.syncer.Stop()        }    case config.BinlogServerType:        if s.IsBinlogServerStarted() {            s.master.Stop()        }    default:        log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)    }}
  • StopServer方法根据svrType参数来关闭不同的server,若是config.SyncerServerType则关闭的是SyncerServer,若是config.BinlogServerType则关闭的是BinlogServer

GetBinlogServerStatus

kingbus/api/binlog_server_handler.go

//GetBinlogServerStatus implements get binlog server status in the runtime statefunc (h *BinlogServerHandler) GetBinlogServerStatus(echoCtx echo.Context) error {    h.l.Lock()    defer h.l.Unlock()    status := h.svr.GetServerStatus(config.BinlogServerType)    if masterStatus, ok := status.(*config.BinlogServerStatus); ok {        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(masterStatus))    }    return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))}
  • GetBinlogServerStatus方法通过h.svr.GetServerStatus(config.BinlogServerType)来获取server status

GetServerStatus

//GetServerStatus get the sub server statusfunc (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{} {    switch svrType {    case config.SyncerServerType:        var syncerStatus config.SyncerStatus        if s.IsSyncerStarted() {            cfg := s.syncer.cfg            syncerStatus.MysqlAddr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)            syncerStatus.MysqlUser = cfg.User            syncerStatus.MysqlPassword = cfg.Password            syncerStatus.SemiSync = cfg.SemiSyncEnabled            syncerStatus.Status = config.ServerRunningStatus            syncerStatus.CurrentGtid = s.CurrentGtidStr()            syncerStatus.LastBinlogFile = s.LastBinlogFile()            syncerStatus.LastFilePosition = s.LastFilePosition()            syncerStatus.ExecutedGtidSet = s.ExecutedGtidSetStr()            purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)            if err != nil {                log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)            }            syncerStatus.PurgedGtidSet = purgedGtids.String()        } else {            syncerStatus.Status = config.ServerStoppedStatus        }        return &syncerStatus    case config.BinlogServerType:        var status config.BinlogServerStatus        if s.IsBinlogServerStarted() {            cfg := s.master.cfg            status.Addr = cfg.Addr            status.User = cfg.User            status.Password = cfg.Password            status.Slaves = make([]*mysql.Slave, 0, 2)            slaves := s.master.GetSlaves()            for _, s := range slaves {                status.Slaves = append(status.Slaves, s)            }            status.CurrentGtid = s.CurrentGtidStr()            status.LastBinlogFile = s.LastBinlogFile()            status.LastFilePosition = s.LastFilePosition()            status.ExecutedGtidSet = s.ExecutedGtidSetStr()            purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)            if err != nil {                log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)            }            status.PurgedGtidSet = purgedGtids.String()            status.Status = config.ServerRunningStatus        } else {            status.Status = config.ServerStoppedStatus        }        return &status    default:        log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)    }    return nil}
  • GetServerStatus方法根据svrType参数来获取不同的status,若是config.SyncerServerType则获取的是SyncerStatus,若是config.BinlogServerType则获取的是BinlogServerStatus

小结

kingbus的binlog_server_handler提供了StartBinlogServer、StopBinlogServer、GetBinlogServerStatus,他们均委托给了server.go的对应方法

doc

  • binlog_server_handler