共计 13715 个字符,预计需要花费 35 分钟才能阅读完成。
手撸 golang etcd raft 协定之 6
缘起
最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop
raft 分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了四个子问题:1. 首领选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变动(membership changes)这几个子问题。
指标
- 依据 raft 协定,实现高可用分布式强统一的 kv 存储
子目标(Day 6)
-
大幅重构,晋升代码的可了解 / 可管理性:
- 基于事件驱动的逻辑编排,重构 Follower 和 Candidate 状态下的实现
-
将字段状态的治理,履行读写拆散。没看错,代码也是能够 ” 读写拆散 ” 的 ^_^
设计
- random:为各种超时工夫增加随机性
- tFollowerState:基于事件驱动重构 Follower 状态的逻辑编排,各字段施行读写拆散治理
- tCandidateState:基于事件驱动重构 Candidate 状态的逻辑编排,各字段施行读写拆散治理
random.go
为各种超时工夫增加随机性
package lsm
import (
"math/rand"
"time"
)
// fnRandomizeInt64 returns int64 value from v to v*1.3
func fnRandomizeInt64(v int64) int64 {return v + v * gRand.Int63n(30) / 100
}
// fnRandomizeDuration returns duration value from v to v*1.3
func fnRandomizeDuration(v time.Duration) time.Duration {i := int64(v)
return time.Duration(fnRandomizeInt64(i))
}
var gRand = rand.New(rand.NewSource(time.Now().UnixNano()))
tFollowerState.go
基于事件驱动重构 Follower 状态的逻辑编排,各字段施行读写拆散治理
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
// tFollowerState presents a follower node
type tFollowerState struct {
tEventDrivenModel
context IRaftLSM
mInitOnce sync.Once
mStartOnce sync.Once
// update: feInit / feLeaderHeartbeat
mTerm int64
// update: feInit / feLeaderHeartbeat
mLeaderHeartbeatTimestamp int64
// update: feLeaderHeartbeat
mLeaderID string
// update: feCandidateRequestVote / feVoteToCandidate
mLastVotedTerm int64
// update: feCandidateRequestVote / feVoteToCandidate
mLastVotedCandidateID string
// update: feCandidateRequestVote / feVoteToCandidate
mLastVotedTimestamp int64
// update: feInit / feDisposing
mDiseposedFlag bool
}
// trigger: init()
// args: empty
const feInit = "follower.init"
// trigger: Start()
// args: empty
const feStart = "follower.Start"
// trigger: Heartbeat()
// args: rpc.HeartbeatCmd
const feLeaderHeartbeat = "follower.LeaderHeartbeat"
// trigger: whenStartThenBeginWatchLeaderTimeout()
// args: empty
const feLeaderHeartbeatTimeout = "follower.LeaderHeartbeatTimeout"
// trigger: RequestVote()
// args: rpc.RequestVoteCmd
const feCandidateRequestVote = "candidate.RequestVote"
// trigger: RequestVote()
// args: rpc.RequestVoteCmd
const feVoteToCandidate = "follower.CandidateRequestVote"
// trigger: whenLeaderHeartbeatTimeoutThenSwitchToCandidateState
const feDisposing = "follower.Disposing"
func newFollowerState(ctx IRaftLSM) IRaftState {it := new(tFollowerState)
it.init(ctx)
return it
}
func (me *tFollowerState) init(ctx IRaftLSM) {me.mInitOnce.Do(func() {
me.context = ctx
me.initEventHandlers()})
}
func (me *tFollowerState) initEventHandlers() {
// write only logic
me.hookEventsForTerm()
me.hookEventsForLeaderHeartbeatTimestamp()
me.hookEventsForLeaderID()
me.hookEventsForLastVotedTerm()
me.hookEventsForLastVotedCandicateID()
me.hookEventsForLastVotedTimestamp()
me.hookEventsForDisposedFlag()
// read only logic
me.hook(feStart,
me.whenStartThenBeginWatchLeaderTimeout)
me.hook(feLeaderHeartbeatTimeout,
me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
}
// hookEventsForTerm maintains field: mTerm
// update : feInit / feLeaderHeartbeat
func (me *tFollowerState) hookEventsForTerm() {me.hook(feInit, func(e string, args ...interface{}) {me.mTerm = me.context.store().LastCommittedTerm()})
me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {cmd := args[0].(*rpc.HeartbeatCmd)
me.mTerm = cmd.Term
})
}
// hookEventsForLeaderHeartbeatClock maintains field: mLeaderHeartbeatClock
// update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
func (me *tFollowerState) hookEventsForLeaderHeartbeatTimestamp() {me.hook(feInit, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()})
me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()})
me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = 0})
}
// hookEventsForLeaderID maintains field: mLeaderID
// update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
func (me *tFollowerState) hookEventsForLeaderID() {me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {cmd := args[0].(*rpc.HeartbeatCmd)
me.mLeaderID = cmd.LeaderID
})
me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {me.mLeaderID = ""})
}
// hookEventsForLastVotedTerm maintains field: mLastVotedTerm
// update : feCandidateRequestVote / feVoteToCandidate
func (me *tFollowerState) hookEventsForLastVotedTerm() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
// before voting, check whether last vote timeout
now := time.Now().UnixNano()
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
// timeout, reset to empty
me.mLastVotedTerm = 0
me.mLastVotedCandidateID = ""
me.mLastVotedTimestamp = 0
}
})
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {cmd := args[0].(*rpc.RequestVoteCmd)
me.mLastVotedTerm = cmd.Term
})
}
// hookEventsForLastVotedCandicateID maintains field: mLastVotedCandidateID
// update : feCandidateRequestVote / feVoteToCandidate
func (me *tFollowerState) hookEventsForLastVotedCandicateID() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
// before voting, check whether last vote timeout
now := time.Now().UnixNano()
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
// timeout, reset to empty
me.mLastVotedTerm = 0
me.mLastVotedCandidateID = ""
me.mLastVotedTimestamp = 0
}
})
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {cmd := args[0].(*rpc.RequestVoteCmd)
me.mLastVotedCandidateID = cmd.CandidateID
})
}
// hookEventsForLastVotedTimestamp maintains field: mLastVotedTimestamp
// update : feCandidateRequestVote / feVoteToCandidate
func (me *tFollowerState) hookEventsForLastVotedTimestamp() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
// before voting, check whether last vote timeout
now := time.Now().UnixNano()
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
// timeout, reset to empty
me.mLastVotedTerm = 0
me.mLastVotedCandidateID = ""
me.mLastVotedTimestamp = 0
}
})
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {me.mLastVotedTimestamp = time.Now().UnixNano()})
}
// hookEventsForDisposedFlag maintains field: mDisposedFlag
// update: feInit / feDisposing
func (me *tFollowerState) hookEventsForDisposedFlag() {me.hook(feInit, func(e string, args ...interface{}) {me.mDiseposedFlag = false})
me.hook(feDisposing, func(e string, args ...interface{}) {me.mDiseposedFlag = true})
}
func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)
})
}
func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {go func() {iCheckingTimeoutInterval := fnRandomizeDuration(timeout.HeartbeatTimeout / 3)
for range time.Tick(iCheckingTimeoutInterval) {
if me.mDiseposedFlag {return}
now := time.Now().UnixNano()
iHeartbeatTimeoutNanos := fnRandomizeInt64(int64(timeout.HeartbeatTimeout / time.Nanosecond))
if now - me.mLeaderHeartbeatTimestamp >= iHeartbeatTimeoutNanos {me.raise(feLeaderHeartbeatTimeout)
return
}
}
}()}
func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {me.raise(feDisposing)
me.context.handleStateChanged(newCandidateState(me.context, me.mTerm + 1))
}
func (me *tFollowerState) Role() roles.RaftRole {return roles.Follower}
// Heartbeat leader to follower
func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
// check term
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.HBTermMismatch
ret.Term = me.mTerm
return nil
}
// raise LeaderHeartbeat
me.raise(feLeaderHeartbeat, cmd)
// return
ret.Code = rpc.HBOk
return nil
}
// AppendLog leader to follower
func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
ret.Term = me.mTerm
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.ALTermMismatch
return nil
}
store := me.context.store()
entry := cmd.Entry
// check log: expecting appending action follows previous committing action
if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
// check log
e, log := store.GetLog(entry.Index)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
}
if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
// bad log
ret.Code = rpc.ALIndexMismatch
ret.PrevLogIndex = store.LastCommittedIndex()
ret.PrevLogTerm = store.LastCommittedTerm()
return nil
}
// good log, but old, just ignore it
ret.Code = rpc.ALOk
return nil
}
// good log
e := store.Append(entry)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
} else {
ret.Code = rpc.ALOk
return nil
}
}
// CommitLog leader to follower
func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {store := me.context.store()
if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
// bad index
ret.Code = rpc.CLLogNotFound
return nil
}
e := store.Commit(cmd.Index)
if e != nil {
ret.Code = rpc.CLInternalError
return nil
}
ret.Code = rpc.CLOk
return nil
}
// RequestVote candidate to follower
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
// before voting
me.raise(feCandidateRequestVote, cmd)
// check term
if cmd.Term <= me.mTerm {
ret.Term = me.mTerm
ret.Code = rpc.RVTermMismatch
return nil
}
// check if already voted another
if me.mLastVotedTerm >= cmd.Term && me.mLastVotedCandidateID != "" && me.mLastVotedCandidateID != cmd.CandidateID {
ret.Code = rpc.RVVotedAnother
return nil
}
// check log index
if cmd.LastLogIndex < me.context.store().LastCommittedIndex() {
ret.Code = rpc.RVLogMismatch
return nil
}
// vote ok
me.raise(feVoteToCandidate, cmd)
ret.Term = cmd.Term
ret.Code = rpc.RVOk
return nil
}
tCandidateState.go
基于事件驱动重构 Candidate 状态的逻辑编排,各字段施行读写拆散治理
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"sync"
"time"
)
// tCandidateState presents a candidate node
type tCandidateState struct {
tEventDrivenModel
context IRaftLSM
mInitOnce sync.Once
mStartOnce sync.Once
// update: init / ceElectionTimeout
mTerm int64
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
mVotedTerm int64
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
mVotedCandidateID string
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
mVotedTimestamp int64
}
// trigger: init()
// args: empty
const ceInit = "candidate.init"
// trigger: Start()
// args: empty
const ceStart = "candidate.Start"
// trigger: whenStartThenWatchElectionTimeout()
// args: empty
const ceElectionTimeout = "candidate.ElectionTimeout"
// trigger: Heartbeat() / AppendLog() / CommitLog()
// args: empty
const ceLeaderAnnounced = "candidate.LeaderAnnounced"
// trigger: RequestVote()
// args: *rpc.RequestVoteCmd
const ceVoteToCandidate = "candidate.VoteToCandidate"
// trigger: whenLeaderHeartbeatThenSwitchToFollower()
// args: empty
const ceDisposing = "candidate.Disposing"
func newCandidateState(ctx IRaftLSM, term int64) IRaftState {it := new(tCandidateState)
it.init(ctx, term)
return it
}
func (me *tCandidateState) init(ctx IRaftLSM, term int64) {me.mInitOnce.Do(func() {
me.context = ctx
me.mTerm = term
me.initEventHandlers()
me.raise(ceInit)
})
}
func (me *tCandidateState) initEventHandlers() {
// write only logic
me.hookEventsForTerm()
me.hookEventsForVotedTerm()
me.hookEventsForVotedCandidateID()
me.hookEventsForVotedTimestamp()
// read only logic
me.hook(ceLeaderAnnounced,
me.whenLeaderAnnouncedThenSwitchToFollower)
me.hook(ceElectionTimeout,
me.whenElectionTimeoutThenRequestVoteAgain)
}
// hookEventsForTerm maintains field: mTerm
// update: ceElectionTimeout
func (me *tCandidateState) hookEventsForTerm() {me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when election timeout, term++ and request vote again
me.mTerm++
})
}
// hookEventsForVotedTerm maintains field: mVotedTerm
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedTerm() {me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedTerm = me.mTerm
})
me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when timeout, reset to itself
me.mVotedTerm = me.mTerm
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedTerm = cmd.Term
})
}
// hookEventsForVotedCandidateID maintains field: mVotedCandidateID
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedCandidateID() {me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedCandidateID = me.context.config().ID()
})
me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when timeout, reset to itself
me.mVotedCandidateID = me.context.config().ID()
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedCandidateID = cmd.CandidateID
})
}
func (me *tCandidateState) hookEventsForVotedTimestamp() {me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedTimestamp = time.Now().UnixNano()
})
me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when timeout, reset to itself
me.mVotedTimestamp = time.Now().UnixNano()
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
me.mVotedTimestamp = time.Now().UnixNano()
})
}
func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
// check term
if cmd.Term <= me.mTerm {
// bad leader
ret.Code = rpc.HBTermMismatch
return nil
}
// new leader
me.raise(ceLeaderAnnounced)
// return ok
ret.Code = rpc.HBOk
return nil
}
func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
// check term
if cmd.Term <= me.mTerm {
// bad leader
ret.Code = rpc.ALTermMismatch
return nil
}
// new leader
me.raise(ceLeaderAnnounced)
// ignore and return
ret.Code = rpc.ALInternalError
return nil
}
func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
// ignore and return
ret.Code = rpc.CLInternalError
return nil
}
func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
// todo: fixme
panic("implements me")
}
func (me *tCandidateState) Role() roles.RaftRole {return roles.Candidate}
func (me *tCandidateState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)
})
}
func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {me.raise(ceDisposing)
me.context.handleStateChanged(newFollowerState(me.context))
}
func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ string, _ ...interface{}) {
// todo: fixme
panic("implements me")
}
(未完待续)
正文完