共计 11723 个字符,预计需要花费 30 分钟才能阅读完成。
手撸 golang etcd raft 协定之 8
缘起
最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop
raft 分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了四个子问题:1. 首领选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变动(membership changes)这几个子问题。
指标
- 依据 raft 协定,实现高可用分布式强统一的 kv 存储
子目标(Day 8)
- 简化 rpc 连贯管理器 tRaftClientService 的实现
- 剥离 IRaftLSM 的外部反对接口到 iRaftStateContext 接口
- 实现 Candidate 状态的解决逻辑
设计
- IRaftLSM:raft 无限状态机接口
- iRaftStateContext:提供状态模式下的上下文反对
- tCandidateState:Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写拆散的字段治理。
- tRaftClient:治理到指定 raft 节点的 rpc 连贯
- tRaftClientService:治理以后节点到其余 raft 节点的 rpc 连贯
IRaftLSM.go
raft 无限状态机接口
package lsm | |
import ("learning/gooop/etcd/raft/rpc") | |
// IRaftLSM raft 无限状态自动机 | |
type IRaftLSM interface { | |
rpc.IRaftRPC | |
iRaftStateContext | |
State() IRaftState} |
iRaftStateContext.go
提供状态模式下的上下文反对
package lsm | |
import ( | |
"learning/gooop/etcd/raft/config" | |
"learning/gooop/etcd/raft/rpc/client" | |
"learning/gooop/etcd/raft/store" | |
) | |
type iRaftStateContext interface {Config() config.IRaftConfig | |
Store() store.ILogStore | |
HandleStateChanged(state IRaftState) | |
RaftClientService() client.IRaftClientService} |
tCandidateState.go
Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写拆散的字段治理。
package lsm | |
import ( | |
"learning/gooop/etcd/raft/roles" | |
"learning/gooop/etcd/raft/rpc" | |
"learning/gooop/etcd/raft/timeout" | |
"sync" | |
"time" | |
) | |
// tCandidateState presents a candidate node | |
type tCandidateState struct { | |
tEventDrivenModel | |
context iRaftStateContext | |
mInitOnce sync.Once | |
mStartOnce sync.Once | |
// update: init / ceAskingForVote | |
mTerm int64 | |
// update: ceInit / ceAskingForVote / ceVoteToCandidate | |
mVotedTerm int64 | |
// update: ceInit / ceAskingForVote / ceVoteToCandidate | |
mVotedCandidateID string | |
// update: ceInit / ceAskingForVote / ceVoteToCandidate | |
mVotedTimestamp int64 | |
// update: ceInit / ceAskingForVote / ceReceiveTicket / ceDisposing | |
mTicketCount map[string]bool | |
mTicketMutex *sync.Mutex | |
// update: ceInit / ceDisposing | |
mDisposedFlag bool | |
} | |
// trigger: init() | |
// args: empty | |
const ceInit = "candidate.init" | |
// trigger: Start() | |
// args: empty | |
const ceStart = "candidate.Start" | |
// trigger: whenAskingForVoteThenWatchElectionTimeout() | |
// args: empty | |
const ceElectionTimeout = "candidate.ElectionTimeout" | |
// trigger: Heartbeat() / AppendLog() / CommitLog() | |
// args: empty | |
const ceLeaderAnnounced = "candidate.LeaderAnnounced" | |
// trigger: RequestVote() | |
// args: *rpc.RequestVoteCmd | |
const ceVoteToCandidate = "candidate.VoteToCandidate" | |
// trigger: whenLeaderAnnouncedThenSwitchToFollower() | |
// args: empty | |
const ceDisposing = "candidate.Disposing" | |
// trigger: beginAskForVote() | |
// args: empty | |
const ceAskingForVote = "candidate.AskingForVote" | |
// trigger: handleRequestVoteOK() | |
// args: empty | |
const ceReceiveTicket = "candidate.ReceiveTicket" | |
// trigger: whenReceiveTicketThenCheckTicketCount | |
// args: empty | |
const ceWinningTheVote = "candidate.ceWinningTheVote" | |
func newCandidateState(ctx iRaftStateContext, term int64) IRaftState {it := new(tCandidateState) | |
it.init(ctx, term) | |
return it | |
} | |
func (me *tCandidateState) init(ctx iRaftStateContext, term int64) {me.mInitOnce.Do(func() { | |
me.context = ctx | |
me.mTerm = term | |
me.initEventHandlers() | |
me.raise(ceInit) | |
}) | |
} | |
func (me *tCandidateState) initEventHandlers() { | |
// write only logic | |
me.hookEventsForTerm() | |
me.hookEventsForVotedTerm() | |
me.hookEventsForVotedCandidateID() | |
me.hookEventsForVotedTimestamp() | |
me.hookEventsForTicketCount() | |
me.hookEventsForDisposedFlag() | |
// read only logic | |
me.hook(ceStart, | |
me.whenStartThenAskForVote) | |
me.hook(ceAskingForVote, | |
me.whenAskingForVoteThenWatchElectionTimeout) | |
me.hook(ceReceiveTicket, | |
me.whenReceiveTicketThenCheckTicketCount) | |
me.hook(ceElectionTimeout, | |
me.whenElectionTimeoutThenAskForVoteAgain) | |
me.hook(ceWinningTheVote, | |
me.whenWinningTheVoteThenSwitchToLeader) | |
me.hook(ceLeaderAnnounced, | |
me.whenLeaderAnnouncedThenSwitchToFollower) | |
} | |
// hookEventsForTerm maintains field: mTerm | |
// update: ceElectionTimeout | |
func (me *tCandidateState) hookEventsForTerm() {me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mTerm++}) | |
} | |
// hookEventsForVotedTerm maintains field: mVotedTerm | |
// update: ceInit / ceElectionTimeout / ceVoteToCandidate | |
func (me *tCandidateState) hookEventsForVotedTerm() {me.hook(ceInit, func(e string, args ...interface{}) { | |
// initially, vote to itself | |
me.mVotedTerm = me.mTerm | |
}) | |
me.hook(ceAskingForVote, func(e string, args ...interface{}) { | |
// when timeout, reset to itself | |
me.mVotedTerm = me.mTerm | |
}) | |
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { | |
// after vote to candidate | |
cmd := args[0].(*rpc.RequestVoteCmd) | |
me.mVotedTerm = cmd.Term | |
}) | |
} | |
// hookEventsForVotedCandidateID maintains field: mVotedCandidateID | |
// update: ceInit / ceElectionTimeout / ceVoteToCandidate | |
func (me *tCandidateState) hookEventsForVotedCandidateID() {me.hook(ceInit, func(e string, args ...interface{}) { | |
// initially, vote to itself | |
me.mVotedCandidateID = me.context.Config().ID() | |
}) | |
me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mVotedCandidateID = me.context.Config().ID()}) | |
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { | |
// after vote to candidate | |
cmd := args[0].(*rpc.RequestVoteCmd) | |
me.mVotedCandidateID = cmd.CandidateID | |
}) | |
} | |
func (me *tCandidateState) hookEventsForVotedTimestamp() {me.hook(ceInit, func(e string, args ...interface{}) { | |
// initially, vote to itself | |
me.mVotedTimestamp = time.Now().UnixNano() | |
}) | |
me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mVotedTimestamp = time.Now().UnixNano()}) | |
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { | |
// after vote to candidate | |
me.mVotedTimestamp = time.Now().UnixNano() | |
}) | |
} | |
func (me *tCandidateState) hookEventsForTicketCount() {me.hook(ceInit, func(e string, args ...interface{}) {me.mTicketMutex = new(sync.Mutex) | |
me.mTicketCount = make(map[string]bool, 0) | |
me.mTicketCount[me.context.Config().ID()] = true | |
}) | |
me.hook(ceAskingForVote, func(e string, args ...interface{}) {me.mTicketMutex.Lock() | |
defer me.mTicketMutex.Unlock() | |
me.mTicketCount = make(map[string]bool, 0) | |
me.mTicketCount[me.context.Config().ID()] = true | |
}) | |
me.hook(ceReceiveTicket, func(e string, args ...interface{}) {peerID := args[0].(string) | |
me.mTicketMutex.Lock() | |
defer me.mTicketMutex.Unlock() | |
me.mTicketCount[peerID] = true | |
}) | |
me.hook(ceDisposing, func(e string, args ...interface{}) {me.mTicketMutex.Lock() | |
defer me.mTicketMutex.Unlock() | |
me.mTicketCount = make(map[string]bool, 0) | |
}) | |
} | |
func (me *tCandidateState) hookEventsForDisposedFlag() {me.hook(ceInit, func(e string, args ...interface{}) {me.mDisposedFlag = false}) | |
me.hook(ceDisposing, func(e string, args ...interface{}) {me.mDisposedFlag = true}) | |
} | |
func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { | |
// check term | |
if cmd.Term <= me.mTerm { | |
// bad leader | |
ret.Code = rpc.HBTermMismatch | |
return nil | |
} | |
// new leader | |
me.raise(ceLeaderAnnounced) | |
// return ok | |
ret.Code = rpc.HBOk | |
return nil | |
} | |
func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { | |
// check term | |
if cmd.Term <= me.mTerm { | |
// bad leader | |
ret.Code = rpc.ALTermMismatch | |
return nil | |
} | |
// new leader | |
me.raise(ceLeaderAnnounced) | |
// ignore and return | |
ret.Code = rpc.ALInternalError | |
return nil | |
} | |
func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { | |
// ignore and return | |
ret.Code = rpc.CLInternalError | |
return nil | |
} | |
func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { | |
// check voted term | |
if cmd.Term < me.mVotedTerm { | |
ret.Code = rpc.RVTermMismatch | |
return nil | |
} | |
if cmd.Term == me.mVotedTerm { | |
if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID { | |
// already vote another | |
ret.Code = rpc.RVVotedAnother | |
return nil | |
} else { | |
// already voted | |
ret.Code = rpc.RVOk | |
return nil | |
} | |
} | |
if cmd.Term > me.mVotedTerm { | |
// new term, check log | |
if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() { | |
// good log | |
me.raise(ceVoteToCandidate, cmd) | |
ret.Code = rpc.RVOk | |
} else { | |
// bad log | |
ret.Code = rpc.RVLogMismatch | |
} | |
return nil | |
} | |
// should not reaches here | |
ret.Code = rpc.RVTermMismatch | |
return nil | |
} | |
func (me *tCandidateState) Role() roles.RaftRole {return roles.Candidate} | |
func (me *tCandidateState) Start() {me.mStartOnce.Do(func() {me.raise(feStart) | |
}) | |
} | |
func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {me.raise(ceDisposing) | |
me.context.HandleStateChanged(newFollowerState(me.context)) | |
} | |
func (me *tCandidateState) whenElectionTimeoutThenAskForVoteAgain(_ string, _ ...interface{}) {me.beginAskForVote() | |
} | |
func (me *tCandidateState) whenStartThenAskForVote(_ string, _ ...interface{}) {me.beginAskForVote() | |
} | |
func (me *tCandidateState) beginAskForVote() { | |
// raise ceAskingForVote | |
me.raise(ceAskingForVote) | |
// for each node, call node.RequestVote | |
cmd := new(rpc.RequestVoteCmd) | |
cmd.CandidateID = me.context.Config().ID() | |
cmd.Term = me.mTerm | |
store := me.context.Store() | |
cmd.LastLogIndex = store.LastCommittedIndex() | |
cmd.LastLogTerm = store.LastCommittedTerm() | |
term := me.mTerm | |
for _,node := range me.context.Config().Nodes() {if node.ID() == me.context.Config().ID() {continue} | |
peerID := node.ID() | |
go func() {ret := new(rpc.RequestVoteRet) | |
err := me.context.RaftClientService().Using(peerID, func(client rpc.IRaftRPC) error {return client.RequestVote(cmd, ret) | |
}) | |
if err == nil && ret.Code == rpc.RVOk {me.handleRequestVoteOK(peerID, term) | |
} | |
}()} | |
} | |
func (me *tCandidateState) whenAskingForVoteThenWatchElectionTimeout(_ string, _ ...interface{}) { | |
term := me.mTerm | |
go func() {time.Sleep(timeout.RandElectionTimeout()) | |
if me.mDisposedFlag || me.mTerm != term {return} | |
tc := me.getTicketCount() | |
if tc < len(me.context.Config().Nodes())/2 + 1 {me.raise(ceElectionTimeout) | |
} | |
}()} | |
func (me *tCandidateState) handleRequestVoteOK(peerID string, term int64) { | |
if me.mDisposedFlag || me.mTerm != term {return} | |
me.raise(ceReceiveTicket, peerID) | |
} | |
func (me *tCandidateState) whenReceiveTicketThenCheckTicketCount(_ string, _ ...interface{}) {tc := me.getTicketCount() | |
if tc >= len(me.context.Config().Nodes())/2 + 1 { | |
// win the vote | |
me.raise(ceWinningTheVote) | |
} | |
} | |
func (me *tCandidateState) getTicketCount() int {me.mTicketMutex.Lock() | |
defer me.mTicketMutex.Unlock() | |
return len(me.mTicketCount) | |
} | |
func (me *tCandidateState) whenWinningTheVoteThenSwitchToLeader(_ string, _ ...interface{}) {me.raise(ceDisposing) | |
me.context.HandleStateChanged(newLeaderState(me.context, me.mTerm)) | |
} |
tRaftClient.go
治理到指定 raft 节点的 rpc 连贯
package client | |
import ( | |
"learning/gooop/etcd/raft/config" | |
rrpc "learning/gooop/etcd/raft/rpc" | |
"net/rpc" | |
) | |
type tRaftClient struct { | |
cfg config.IRaftNodeConfig | |
conn *rpc.Client | |
state iClientState | |
} | |
func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {it := new(tRaftClient) | |
it.init(cfg, conn) | |
return it | |
} | |
func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) { | |
me.cfg = cfg | |
me.conn = conn | |
if conn == nil {me.state = newBrokenState(me) | |
} else {me.state = newConnectedState(me) | |
} | |
me.state.Start()} | |
func (me *tRaftClient) Config() config.IRaftNodeConfig {return me.cfg} | |
func (me *tRaftClient) GetConn() *rpc.Client {return me.conn} | |
func (me *tRaftClient) SetConn(conn *rpc.Client) {me.conn = conn} | |
func (me *tRaftClient) HandleStateChanged(state iClientState) { | |
me.state = state | |
state.Start()} | |
func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {return me.state.Heartbeat(cmd, ret) | |
} | |
func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {return me.state.AppendLog(cmd, ret) | |
} | |
func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {return me.state.CommitLog(cmd, ret) | |
} | |
func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {return me.state.RequestVote(cmd, ret) | |
} | |
func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {return me.state.Ping(cmd, ret) | |
} |
tRaftClientService.go
治理以后节点到其余 raft 节点的 rpc 连贯
package client | |
import ( | |
"errors" | |
"learning/gooop/etcd/raft/config" | |
"learning/gooop/etcd/raft/rpc" | |
netrpc "net/rpc" | |
) | |
type tRaftClientService struct { | |
cfg config.IRaftConfig | |
clients map[string]IRaftClient | |
} | |
func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {it := new(tRaftClientService) | |
it.init(cfg) | |
return it | |
} | |
func (me *tRaftClientService) init(cfg config.IRaftConfig) { | |
me.cfg = cfg | |
me.clients = make(map[string]IRaftClient) | |
for _,nc := range me.cfg.Nodes() {me.clients[nc.ID()] = me.createRaftClient(nc) | |
} | |
} | |
func (me *tRaftClientService) createRaftClient(nodeCfg config.IRaftNodeConfig) IRaftClient { | |
// dial to peer | |
conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint()) | |
if err != nil {return newRaftClient(nodeCfg, nil) | |
} else {return newRaftClient(nodeCfg, conn) | |
} | |
} | |
func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {it, ok := me.clients[peerID] | |
if ok {return action(it) | |
} else {return gErrorUnknownRaftPeer} | |
} | |
var gErrorUnknownRaftPeer = errors.New("unknown raft peer") |
(未完待续)
正文完