手撸golang etcd raft协定之3
缘起
最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可实现。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。
指标
- 依据raft协定,实现高可用分布式强统一的kv存储
子目标(Day 3)
- 持续欠缺raft状态机之Follower状态的解决逻辑
- 持续欠缺raft状态机之Candidate状态的解决逻辑
设计
tFollowerState:
- 监督Leader心跳是否超时
- 如果Leader心跳超时,则切换到Candidate状态,竞选新leader
- 增加RequestVote和AppendEntries两个RPC接口的响应
tCandidateState:
- 进入此状态,立刻向其余节点发动竞选申请
- 如竞选超时,则从新发动竞选
- 如收到新Leader心跳,则切换回Follower
- 如收到N/2+1张票,则切换到Leader,并播送之
tFollowerState.go
持续欠缺raft状态机之Follower状态的解决逻辑
package lsmimport ( "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/roles" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/timeout" "sync" "time")type tFollowerState struct { tRaftStateBase mInitOnce sync.Once mStartOnce sync.Once mVotedLeaderID string mLeaderHeartbeatClock int64 mStateChangedHandler StateChangedHandleFunc mEventMap map[tFollowerEvent][]tFollowerEventHandler}type JobFunc func()type tFollowerEvent intconst ( evFollowerStart tFollowerEvent = iota evFollowerLeaderHeartbeatTimeout tFollowerEvent = iota)type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})func newFollowerState(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) IRaftState { it := new(tFollowerState) it.init(term, cfg, handler) return it}func (me *tFollowerState) init(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) { me.mInitOnce.Do(func() { me.tRaftStateBase = *newRaftStateBase(term, cfg) me.role = roles.Follower me.mStateChangedHandler = handler // init event map me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler) me.registerEventHandlers() })}func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) { if handlers, ok := me.mEventMap[e]; ok { for _, it := range handlers { it(e, args...) } }}func (me *tFollowerState) registerEventHandlers() { me.mEventMap[evFollowerStart] = []tFollowerEventHandler{ me.whenStartThenBeginWatchLeaderTimeout, } me.mEventMap[evFollowerLeaderHeartbeatTimeout] = []tFollowerEventHandler { me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState, }}func (me *tFollowerState) Start() { me.mStartOnce.Do(func() { me.raise(evFollowerStart) })}func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args... interface{}) { go func() { iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3 iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond) for range time.Tick(iCheckingTimeoutInterval) { now := time.Now().UnixNano() if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos { me.raise(evFollowerLeaderHeartbeatTimeout) return } } }()}func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ tFollowerEvent, args... interface{}) { fn := me.mStateChangedHandler if fn == nil { return } state := newCandidateState(me.cfg, me.term, me.mStateChangedHandler) fn(state)}func (me *tFollowerState) Role() roles.RaftRole { return roles.Follower}func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { if cmd.Term <= me.term { ret.Term = me.term ret.VoteGranted = false return nil } if me.mVotedLeaderID != "" && me.mVotedLeaderID != cmd.CandidateID { ret.Term = me.term ret.VoteGranted = false return nil } me.mVotedLeaderID = cmd.CandidateID ret.Term = cmd.Term ret.VoteGranted = true return nil}func (me *tFollowerState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error { if cmd.Term < me.term { ret.Term = me.term ret.Success = false return nil } me.term = cmd.Term me.leaderID = cmd.LeaderID me.mLeaderHeartbeatClock = time.Now().UnixNano() if len(cmd.Entries) <= 0 { // just heartbeat package ret.Term = cmd.Term ret.Success = true return nil } // todo: append logs return nil}func (me *tFollowerState) StateChangedHandler(handler StateChangedHandleFunc) { me.mStateChangedHandler = handler}
tCandidateState.go
持续欠缺raft状态机之Candidate状态的解决逻辑
package lsmimport ( "errors" "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/rpc" "sync")type tCandidateState struct { tRaftStateBase mInitOnce sync.Once mStartOnce sync.Once mStateChangedHandler StateChangedHandleFunc mEventMap map[tCandidateEvent][]tCandidateEventHandler}func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { return gErrorCandidateWontReplyRequestVote}func (me *tCandidateState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error { return gErrorCandidateWontReplyAppendEntries}func (me *tCandidateState) StateChangedHandler(handler StateChangedHandleFunc) { me.mStateChangedHandler = handler}type tCandidateEvent intconst ( evCandidateStart tCandidateEvent = iota evCandidateElectionTimeout tCandidateEvent = iota evCandidateGotEnoughVotes tCandidateEvent = iota)type tCandidateEventHandler func(e tCandidateEvent, args ...interface{})func newCandidateState(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) IRaftState { it := new(tCandidateState) it.init(cfg, term, handler) return it}func (me *tCandidateState) init(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) { me.mInitOnce.Do(func() { me.cfg = cfg me.term = term me.mStateChangedHandler = handler // init event map me.mEventMap = make(map[tCandidateEvent][]tCandidateEventHandler) me.registerEventHandlers() })}func (me *tCandidateState) registerEventHandlers() { me.mEventMap[evCandidateStart] = []tCandidateEventHandler{ me.whenStartThenRequestVote, me.whenStartThenWatchElectionTimeout, } me.mEventMap[evCandidateElectionTimeout] = []tCandidateEventHandler{ me.whenElectionTimeoutThenRequestVoteAgain, } me.mEventMap[evCandidateGotEnoughVotes] = []tCandidateEventHandler{ me.whenGotEnoughVotesThenSwitchToLeader, }}func (me *tCandidateState) raise(e tCandidateEvent, args ...interface{}) { if handlers, ok := me.mEventMap[e]; ok { for _, it := range handlers { it(e, args...) } }}func (me *tCandidateState) Start() { me.mStartOnce.Do(func() { me.raise(evCandidateStart) })}func (me *tCandidateState) whenStartThenRequestVote(_ tCandidateEvent, _... interface{}) { // todo: fixme panic("implements me")}func (me *tCandidateState) whenStartThenWatchElectionTimeout(_ tCandidateEvent, _... interface{}) { // todo: fixme panic("implements me")}func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ tCandidateEvent, _... interface{}) { // todo: fixme panic("implements me")}func (me *tCandidateState) whenGotEnoughVotesThenSwitchToLeader(_ tCandidateEvent, _... interface{}) { // todo: fixme panic("implements me")}var gErrorCandidateWontReplyRequestVote = errors.New("candidate won't reply RequestVote RPC")var gErrorCandidateWontReplyAppendEntries = errors.New("candidate won't reply AppendEntries RPC")
(未完待续)