手撸golang etcd raft协定之3

缘起

最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可实现。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。

指标

  • 依据raft协定,实现高可用分布式强统一的kv存储

子目标(Day 3)

  • 持续欠缺raft状态机之Follower状态的解决逻辑
  • 持续欠缺raft状态机之Candidate状态的解决逻辑

设计

  • tFollowerState:

    • 监督Leader心跳是否超时
    • 如果Leader心跳超时,则切换到Candidate状态,竞选新leader
    • 增加RequestVote和AppendEntries两个RPC接口的响应
  • tCandidateState:

    • 进入此状态,立刻向其余节点发动竞选申请
    • 如竞选超时,则从新发动竞选
    • 如收到新Leader心跳,则切换回Follower
    • 如收到N/2+1张票,则切换到Leader,并播送之

tFollowerState.go

持续欠缺raft状态机之Follower状态的解决逻辑

package lsmimport (    "learning/gooop/etcd/raft/config"    "learning/gooop/etcd/raft/roles"    "learning/gooop/etcd/raft/rpc"    "learning/gooop/etcd/raft/timeout"    "sync"    "time")type tFollowerState struct {    tRaftStateBase    mInitOnce  sync.Once    mStartOnce sync.Once    mVotedLeaderID string    mLeaderHeartbeatClock int64    mStateChangedHandler StateChangedHandleFunc    mEventMap  map[tFollowerEvent][]tFollowerEventHandler}type JobFunc func()type tFollowerEvent intconst (    evFollowerStart tFollowerEvent = iota    evFollowerLeaderHeartbeatTimeout tFollowerEvent = iota)type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})func newFollowerState(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) IRaftState {    it := new(tFollowerState)    it.init(term, cfg, handler)    return it}func (me *tFollowerState) init(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) {    me.mInitOnce.Do(func() {        me.tRaftStateBase = *newRaftStateBase(term, cfg)        me.role = roles.Follower        me.mStateChangedHandler = handler        // init event map        me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)        me.registerEventHandlers()    })}func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {    if handlers, ok := me.mEventMap[e]; ok {        for _, it := range handlers {            it(e, args...)        }    }}func (me *tFollowerState) registerEventHandlers() {    me.mEventMap[evFollowerStart] = []tFollowerEventHandler{        me.whenStartThenBeginWatchLeaderTimeout,    }    me.mEventMap[evFollowerLeaderHeartbeatTimeout] = []tFollowerEventHandler {        me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState,    }}func (me *tFollowerState) Start() {    me.mStartOnce.Do(func() {        me.raise(evFollowerStart)    })}func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args... interface{}) {    go func() {        iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3        iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)        for range time.Tick(iCheckingTimeoutInterval) {            now := time.Now().UnixNano()            if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {                me.raise(evFollowerLeaderHeartbeatTimeout)                return            }        }    }()}func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ tFollowerEvent, args... interface{}) {    fn := me.mStateChangedHandler    if fn == nil {        return    }    state := newCandidateState(me.cfg, me.term, me.mStateChangedHandler)    fn(state)}func (me *tFollowerState) Role() roles.RaftRole {    return roles.Follower}func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {    if cmd.Term <= me.term {        ret.Term = me.term        ret.VoteGranted = false        return nil    }    if me.mVotedLeaderID != "" && me.mVotedLeaderID != cmd.CandidateID {        ret.Term = me.term        ret.VoteGranted = false        return nil    }    me.mVotedLeaderID = cmd.CandidateID    ret.Term = cmd.Term    ret.VoteGranted = true    return nil}func (me *tFollowerState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {    if cmd.Term < me.term {        ret.Term = me.term        ret.Success = false        return nil    }    me.term = cmd.Term    me.leaderID = cmd.LeaderID    me.mLeaderHeartbeatClock = time.Now().UnixNano()    if len(cmd.Entries) <= 0 {        // just heartbeat package        ret.Term = cmd.Term        ret.Success = true        return nil    }    // todo: append logs    return nil}func (me *tFollowerState) StateChangedHandler(handler StateChangedHandleFunc) {    me.mStateChangedHandler = handler}

tCandidateState.go

持续欠缺raft状态机之Candidate状态的解决逻辑

package lsmimport (    "errors"    "learning/gooop/etcd/raft/config"    "learning/gooop/etcd/raft/rpc"    "sync")type tCandidateState struct {    tRaftStateBase    mInitOnce  sync.Once    mStartOnce sync.Once    mStateChangedHandler StateChangedHandleFunc    mEventMap  map[tCandidateEvent][]tCandidateEventHandler}func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {    return gErrorCandidateWontReplyRequestVote}func (me *tCandidateState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {    return gErrorCandidateWontReplyAppendEntries}func (me *tCandidateState) StateChangedHandler(handler StateChangedHandleFunc) {    me.mStateChangedHandler = handler}type tCandidateEvent intconst (    evCandidateStart tCandidateEvent = iota    evCandidateElectionTimeout tCandidateEvent = iota    evCandidateGotEnoughVotes tCandidateEvent = iota)type tCandidateEventHandler func(e tCandidateEvent, args ...interface{})func newCandidateState(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) IRaftState {    it := new(tCandidateState)    it.init(cfg, term, handler)    return it}func (me *tCandidateState) init(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) {    me.mInitOnce.Do(func() {        me.cfg = cfg        me.term = term        me.mStateChangedHandler = handler        // init event map        me.mEventMap = make(map[tCandidateEvent][]tCandidateEventHandler)        me.registerEventHandlers()    })}func (me *tCandidateState) registerEventHandlers() {    me.mEventMap[evCandidateStart] = []tCandidateEventHandler{        me.whenStartThenRequestVote,        me.whenStartThenWatchElectionTimeout,    }    me.mEventMap[evCandidateElectionTimeout] = []tCandidateEventHandler{        me.whenElectionTimeoutThenRequestVoteAgain,    }    me.mEventMap[evCandidateGotEnoughVotes] = []tCandidateEventHandler{        me.whenGotEnoughVotesThenSwitchToLeader,    }}func (me *tCandidateState) raise(e tCandidateEvent, args ...interface{}) {    if handlers, ok := me.mEventMap[e]; ok {        for _, it := range handlers {            it(e, args...)        }    }}func (me *tCandidateState) Start() {    me.mStartOnce.Do(func() {        me.raise(evCandidateStart)    })}func (me *tCandidateState) whenStartThenRequestVote(_ tCandidateEvent, _... interface{}) {    // todo: fixme    panic("implements me")}func (me *tCandidateState) whenStartThenWatchElectionTimeout(_ tCandidateEvent, _... interface{}) {    // todo: fixme    panic("implements me")}func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ tCandidateEvent, _... interface{}) {    // todo: fixme    panic("implements me")}func (me *tCandidateState) whenGotEnoughVotesThenSwitchToLeader(_ tCandidateEvent, _... interface{}) {    // todo: fixme    panic("implements me")}var gErrorCandidateWontReplyRequestVote = errors.New("candidate won't reply RequestVote RPC")var gErrorCandidateWontReplyAppendEntries = errors.New("candidate won't reply AppendEntries RPC")

(未完待续)