关于golang:手撸golang-etcd-raft协议之3

54次阅读

共计 6410 个字符,预计需要花费 17 分钟才能阅读完成。

手撸 golang etcd raft 协定之 3

缘起

最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop

raft 分布式一致性算法

 分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft 算法的基本操作只需 2 种 RPC 即可实现。RequestVote RPC 是在选举过程中通过旧的 Leader 触发的,AppendEntries RPC 是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。

指标

  • 依据 raft 协定,实现高可用分布式强统一的 kv 存储

子目标(Day 3)

  • 持续欠缺 raft 状态机之 Follower 状态的解决逻辑
  • 持续欠缺 raft 状态机之 Candidate 状态的解决逻辑

设计

  • tFollowerState:

    • 监督 Leader 心跳是否超时
    • 如果 Leader 心跳超时,则切换到 Candidate 状态,竞选新 leader
    • 增加 RequestVote 和 AppendEntries 两个 RPC 接口的响应
  • tCandidateState:

    • 进入此状态,立刻向其余节点发动竞选申请
    • 如竞选超时,则从新发动竞选
    • 如收到新 Leader 心跳,则切换回 Follower
    • 如收到 N /2+ 1 张票,则切换到 Leader,并播送之

tFollowerState.go

持续欠缺 raft 状态机之 Follower 状态的解决逻辑

package lsm

import (
    "learning/gooop/etcd/raft/config"
    "learning/gooop/etcd/raft/roles"
    "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/timeout"
    "sync"
    "time"
)

type tFollowerState struct {
    tRaftStateBase

    mInitOnce  sync.Once
    mStartOnce sync.Once

    mVotedLeaderID string
    mLeaderHeartbeatClock int64
    mStateChangedHandler StateChangedHandleFunc
    mEventMap  map[tFollowerEvent][]tFollowerEventHandler}


type JobFunc func()

type tFollowerEvent int
const (
    evFollowerStart tFollowerEvent = iota
    evFollowerLeaderHeartbeatTimeout tFollowerEvent = iota
)

type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})

func newFollowerState(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) IRaftState {it := new(tFollowerState)
    it.init(term, cfg, handler)

    return it
}

func (me *tFollowerState) init(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) {me.mInitOnce.Do(func() {me.tRaftStateBase = *newRaftStateBase(term, cfg)
        me.role = roles.Follower
        me.mStateChangedHandler = handler

        // init event map
        me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)
        me.registerEventHandlers()})
}

func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {if handlers, ok := me.mEventMap[e]; ok {
        for _, it := range handlers {it(e, args...)
        }
    }
}

func (me *tFollowerState) registerEventHandlers() {me.mEventMap[evFollowerStart] = []tFollowerEventHandler{me.whenStartThenBeginWatchLeaderTimeout,}
    me.mEventMap[evFollowerLeaderHeartbeatTimeout] = []tFollowerEventHandler {me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState,}
}


func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(evFollowerStart)
    })
}

func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args... interface{}) {go func() {
        iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
        iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
        for range time.Tick(iCheckingTimeoutInterval) {now := time.Now().UnixNano()
            if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {me.raise(evFollowerLeaderHeartbeatTimeout)
                return
            }
        }
    }()}


func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ tFollowerEvent, args... interface{}) {
    fn := me.mStateChangedHandler
    if fn == nil {return}

    state := newCandidateState(me.cfg, me.term, me.mStateChangedHandler)
    fn(state)
}

func (me *tFollowerState) Role() roles.RaftRole {return roles.Follower}

func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
    if cmd.Term <= me.term {
        ret.Term = me.term
        ret.VoteGranted = false
        return nil
    }

    if me.mVotedLeaderID != "" && me.mVotedLeaderID != cmd.CandidateID {
        ret.Term = me.term
        ret.VoteGranted = false
        return nil
    }

    me.mVotedLeaderID = cmd.CandidateID
    ret.Term = cmd.Term
    ret.VoteGranted = true
    return nil
}

func (me *tFollowerState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {
    if cmd.Term < me.term {
        ret.Term = me.term
        ret.Success = false
        return nil
    }

    me.term = cmd.Term
    me.leaderID = cmd.LeaderID
    me.mLeaderHeartbeatClock = time.Now().UnixNano()

    if len(cmd.Entries) <= 0 {
        // just heartbeat package
        ret.Term = cmd.Term
        ret.Success = true
        return nil
    }

    // todo: append logs
    return nil
}


func (me *tFollowerState) StateChangedHandler(handler StateChangedHandleFunc) {me.mStateChangedHandler = handler}

tCandidateState.go

持续欠缺 raft 状态机之 Candidate 状态的解决逻辑

package lsm

import (
    "errors"
    "learning/gooop/etcd/raft/config"
    "learning/gooop/etcd/raft/rpc"
    "sync"
)

type tCandidateState struct {
    tRaftStateBase

    mInitOnce  sync.Once
    mStartOnce sync.Once

    mStateChangedHandler StateChangedHandleFunc
    mEventMap  map[tCandidateEvent][]tCandidateEventHandler}


func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {return gErrorCandidateWontReplyRequestVote}

func (me *tCandidateState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {return gErrorCandidateWontReplyAppendEntries}

func (me *tCandidateState) StateChangedHandler(handler StateChangedHandleFunc) {me.mStateChangedHandler = handler}

type tCandidateEvent int
const (
    evCandidateStart tCandidateEvent = iota
    evCandidateElectionTimeout tCandidateEvent = iota
    evCandidateGotEnoughVotes tCandidateEvent = iota
)

type tCandidateEventHandler func(e tCandidateEvent, args ...interface{})

func newCandidateState(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) IRaftState {it := new(tCandidateState)
    it.init(cfg, term, handler)
    return it
}


func (me *tCandidateState) init(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) {me.mInitOnce.Do(func() {
        me.cfg = cfg
        me.term = term
        me.mStateChangedHandler = handler

        // init event map
        me.mEventMap = make(map[tCandidateEvent][]tCandidateEventHandler)
        me.registerEventHandlers()})
}


func (me *tCandidateState) registerEventHandlers() {me.mEventMap[evCandidateStart] = []tCandidateEventHandler{
        me.whenStartThenRequestVote,
        me.whenStartThenWatchElectionTimeout,
    }

    me.mEventMap[evCandidateElectionTimeout] = []tCandidateEventHandler{me.whenElectionTimeoutThenRequestVoteAgain,}

    me.mEventMap[evCandidateGotEnoughVotes] = []tCandidateEventHandler{me.whenGotEnoughVotesThenSwitchToLeader,}
}

func (me *tCandidateState) raise(e tCandidateEvent, args ...interface{}) {if handlers, ok := me.mEventMap[e]; ok {
        for _, it := range handlers {it(e, args...)
        }
    }
}

func (me *tCandidateState) Start() {me.mStartOnce.Do(func() {me.raise(evCandidateStart)
    })
}

func (me *tCandidateState) whenStartThenRequestVote(_ tCandidateEvent, _... interface{}) {
    // todo: fixme
    panic("implements me")
}

func (me *tCandidateState) whenStartThenWatchElectionTimeout(_ tCandidateEvent, _... interface{}) {
    // todo: fixme
    panic("implements me")
}

func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ tCandidateEvent, _... interface{}) {
    // todo: fixme
    panic("implements me")
}

func (me *tCandidateState) whenGotEnoughVotesThenSwitchToLeader(_ tCandidateEvent, _... interface{}) {
    // todo: fixme
    panic("implements me")
}

var gErrorCandidateWontReplyRequestVote = errors.New("candidate won't reply RequestVote RPC")
var gErrorCandidateWontReplyAppendEntries = errors.New("candidate won't reply AppendEntries RPC")

(未完待续)

正文完
 0