序
本文主要研究一下 kingbus 的 binlog_syncer_handler.go
StartBinlogSyncer
kingbus/api/binlog_syncer_handler.go
//StartBinlogSyncer implements start a binlog syncer
func (h *BinlogSyncerHandler) StartBinlogSyncer(echoCtx echo.Context) error {h.l.Lock()
defer h.l.Unlock()
var args config.SyncerArgs
var err error
var syncerID int
defer func() {
if err != nil {log.Log.Errorf("StartBinlogSyncer error,err: %s", err)
echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
}
}()
err = echoCtx.Bind(&args)
if err != nil {return err}
//check args
err = args.Check()
if err != nil {return err}
//forward to leader
if h.svr.IsLeader() == false {req, err := json.Marshal(args)
if err != nil {return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
}
resp, err := h.sendToLeader("PUT", "/binlog/syncer/start", req)
if err != nil {log.Log.Errorf("sendToLeader error,err:%s,args:%v", err, args)
return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
}
if resp.Message != "success" {return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
}
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
}
//start syncer server
err = h.svr.StartServer(config.SyncerServerType, &args)
if err != nil {log.Log.Errorf("StartServer error,err:%s,args:%v", err, args)
return err
}
//propose start syncer info
err = h.ProposeSyncerArgs(&args)
if err != nil {log.Log.Errorf("ProposeSyncerArgs error,err:%s,args:%v", err, args)
return err
}
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerID))
}
- StartBinlogSyncer 方法先执行 echoCtx.Bind(&args),然后针对 h.svr.IsLeader()为 false 的通过 h.sendToLeader(“PUT”, “/binlog/syncer/start”, req)将请求转发给 leader;为 true 的则执行 h.svr.StartServer(config.SyncerServerType, &args)启动 syncer server,然后执行 h.ProposeSyncerArgs(&args)启动 propose syncer
StopBinlogSyncer
kingbus/api/binlog_syncer_handler.go
//StopBinlogSyncer implements stop binlog syncer
func (h *BinlogSyncerHandler) StopBinlogSyncer(echoCtx echo.Context) error {h.l.Lock()
defer h.l.Unlock()
//forward to leader
if h.svr.IsLeader() == false {resp, err := h.sendToLeader("PUT", "/binlog/syncer/stop", nil)
if err != nil {log.Log.Errorf("sendToLeader error,err:%s", err)
return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
}
if resp.Message != "success" {return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
}
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
}
h.svr.StopServer(config.SyncerServerType)
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}
- StopBinlogSyncer 方法对于 h.svr.IsLeader()为 false 的通过 h.sendToLeader(“PUT”, “/binlog/syncer/stop”, nil)将请求转发给 leader;为 true 的则执行 h.svr.StopServer(config.SyncerServerType)
GetBinlogSyncerStatus
kingbus/api/binlog_syncer_handler.go
//GetBinlogSyncerStatus implements get binlog syncer status in the runtime state
func (h *BinlogSyncerHandler) GetBinlogSyncerStatus(echoCtx echo.Context) error {h.l.Lock()
defer h.l.Unlock()
//forward to leader
if h.svr.IsLeader() == false {resp, err := h.sendToLeader("GET", "/binlog/syncer/status", nil)
if err != nil {log.Log.Errorf("sendToLeader error,err:%s", err)
return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
}
if resp.Message != "success" {return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))
}
return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
}
status := h.svr.GetServerStatus(config.SyncerServerType)
if syncerStatus, ok := status.(*config.SyncerStatus); ok {return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerStatus))
}
return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))
}
- GetBinlogSyncerStatus 方法对于 h.svr.IsLeader()为 false 的通过 h.sendToLeader(“GET”, “/binlog/syncer/status”, nil)方法转发给 leader;为 true 的则执行 h.svr.GetServerStatus(config.SyncerServerType)获取 status
sendToLeader
kingbus/api/membership_handler.go
func (h *MembershipHandler) sendToLeader(method string, req []byte) (*utils.Resp, error) {leaderID := h.svr.Leader()
leader := h.cluster.Member(leaderID)
if leader == nil {return nil, ErrNoLeader}
if len(leader.AdminURLs) != 1 {log.Log.Errorf("leader admin url is not 1,leader:%v", *leader)
return nil, ErrNoLeader
}
leaderURL, err := url.Parse(leader.AdminURLs[0])
if err != nil {return nil, err}
url := leaderURL.Scheme + "://" + leaderURL.Host + "/members"
resp, err := utils.SendRequest(method, url, req)
if err != nil {log.Log.Errorf("sendToLeader:SendRequest error,err:%s,url:%s", err, url)
return nil, err
}
return resp, nil
}
- sendToLeader 方法通过 h.svr.Leader()获取 leaderId,然后执行 h.cluster.Member(leaderID)获取 leader,之后构造 leaderURL,然后执行 utils.SendRequest(method, url, req)发送请求
SendRequest
kingbus/utils/http_utils.go
//SendRequest send PUT request to leader
func SendRequest(method string, leaderURL string, data []byte) (*Resp, error) {client := &http.Client{}
req, err := http.NewRequest(method, leaderURL, bytes.NewBuffer(data))
if err != nil {return nil, err}
req.Header.Set("Content-Type", "application/json;charset=utf-8")
resp, err := client.Do(req)
if err != nil {return nil, err}
defer resp.Body.Close()
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {return nil, err}
r := new(Resp)
err = json.Unmarshal(respBody, r)
if err != nil {return nil, err}
return r, nil
}
- SendRequest 通过 http.NewRequest 构建 request,其 contentType 为
application/json;charset=utf-8
,之后通过 client.Do(req) 发送请求,然后通过 ioutil.ReadAll(resp.Body)读取 respBody,在通过 json.Unmarshal(respBody, r)解析 json 为 Resp 类型
小结
kingbus 的 binlog_syncer_handler.go 提供了 StartBinlogSyncer、StopBinlogSyncer、GetBinlogSyncerStatus 方法
doc
- binlog_syncer_handler