共计 13715 个字符,预计需要花费 35 分钟才能阅读完成。
手撸 golang etcd raft 协定之 6
缘起
最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop
raft 分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了四个子问题:1. 首领选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变动(membership changes)这几个子问题。
指标
- 依据 raft 协定,实现高可用分布式强统一的 kv 存储
子目标(Day 6)
-
大幅重构,晋升代码的可了解 / 可管理性:
- 基于事件驱动的逻辑编排,重构 Follower 和 Candidate 状态下的实现
-
将字段状态的治理,履行读写拆散。没看错,代码也是能够 ” 读写拆散 ” 的 ^_^
设计
- random:为各种超时工夫增加随机性
- tFollowerState:基于事件驱动重构 Follower 状态的逻辑编排,各字段施行读写拆散治理
- tCandidateState:基于事件驱动重构 Candidate 状态的逻辑编排,各字段施行读写拆散治理
random.go
为各种超时工夫增加随机性
package lsm | |
import ( | |
"math/rand" | |
"time" | |
) | |
// fnRandomizeInt64 returns int64 value from v to v*1.3 | |
func fnRandomizeInt64(v int64) int64 {return v + v * gRand.Int63n(30) / 100 | |
} | |
// fnRandomizeDuration returns duration value from v to v*1.3 | |
func fnRandomizeDuration(v time.Duration) time.Duration {i := int64(v) | |
return time.Duration(fnRandomizeInt64(i)) | |
} | |
var gRand = rand.New(rand.NewSource(time.Now().UnixNano())) |
tFollowerState.go
基于事件驱动重构 Follower 状态的逻辑编排,各字段施行读写拆散治理
package lsm | |
import ( | |
"learning/gooop/etcd/raft/roles" | |
"learning/gooop/etcd/raft/rpc" | |
"learning/gooop/etcd/raft/timeout" | |
"sync" | |
"time" | |
) | |
// tFollowerState presents a follower node | |
type tFollowerState struct { | |
tEventDrivenModel | |
context IRaftLSM | |
mInitOnce sync.Once | |
mStartOnce sync.Once | |
// update: feInit / feLeaderHeartbeat | |
mTerm int64 | |
// update: feInit / feLeaderHeartbeat | |
mLeaderHeartbeatTimestamp int64 | |
// update: feLeaderHeartbeat | |
mLeaderID string | |
// update: feCandidateRequestVote / feVoteToCandidate | |
mLastVotedTerm int64 | |
// update: feCandidateRequestVote / feVoteToCandidate | |
mLastVotedCandidateID string | |
// update: feCandidateRequestVote / feVoteToCandidate | |
mLastVotedTimestamp int64 | |
// update: feInit / feDisposing | |
mDiseposedFlag bool | |
} | |
// trigger: init() | |
// args: empty | |
const feInit = "follower.init" | |
// trigger: Start() | |
// args: empty | |
const feStart = "follower.Start" | |
// trigger: Heartbeat() | |
// args: rpc.HeartbeatCmd | |
const feLeaderHeartbeat = "follower.LeaderHeartbeat" | |
// trigger: whenStartThenBeginWatchLeaderTimeout() | |
// args: empty | |
const feLeaderHeartbeatTimeout = "follower.LeaderHeartbeatTimeout" | |
// trigger: RequestVote() | |
// args: rpc.RequestVoteCmd | |
const feCandidateRequestVote = "candidate.RequestVote" | |
// trigger: RequestVote() | |
// args: rpc.RequestVoteCmd | |
const feVoteToCandidate = "follower.CandidateRequestVote" | |
// trigger: whenLeaderHeartbeatTimeoutThenSwitchToCandidateState | |
const feDisposing = "follower.Disposing" | |
func newFollowerState(ctx IRaftLSM) IRaftState {it := new(tFollowerState) | |
it.init(ctx) | |
return it | |
} | |
func (me *tFollowerState) init(ctx IRaftLSM) {me.mInitOnce.Do(func() { | |
me.context = ctx | |
me.initEventHandlers()}) | |
} | |
func (me *tFollowerState) initEventHandlers() { | |
// write only logic | |
me.hookEventsForTerm() | |
me.hookEventsForLeaderHeartbeatTimestamp() | |
me.hookEventsForLeaderID() | |
me.hookEventsForLastVotedTerm() | |
me.hookEventsForLastVotedCandicateID() | |
me.hookEventsForLastVotedTimestamp() | |
me.hookEventsForDisposedFlag() | |
// read only logic | |
me.hook(feStart, | |
me.whenStartThenBeginWatchLeaderTimeout) | |
me.hook(feLeaderHeartbeatTimeout, | |
me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState) | |
} | |
// hookEventsForTerm maintains field: mTerm | |
// update : feInit / feLeaderHeartbeat | |
func (me *tFollowerState) hookEventsForTerm() {me.hook(feInit, func(e string, args ...interface{}) {me.mTerm = me.context.store().LastCommittedTerm()}) | |
me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {cmd := args[0].(*rpc.HeartbeatCmd) | |
me.mTerm = cmd.Term | |
}) | |
} | |
// hookEventsForLeaderHeartbeatClock maintains field: mLeaderHeartbeatClock | |
// update : feLeaderHeartbeat / feLeaderHeartbeatTimeout | |
func (me *tFollowerState) hookEventsForLeaderHeartbeatTimestamp() {me.hook(feInit, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()}) | |
me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()}) | |
me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {me.mLeaderHeartbeatTimestamp = 0}) | |
} | |
// hookEventsForLeaderID maintains field: mLeaderID | |
// update : feLeaderHeartbeat / feLeaderHeartbeatTimeout | |
func (me *tFollowerState) hookEventsForLeaderID() {me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {cmd := args[0].(*rpc.HeartbeatCmd) | |
me.mLeaderID = cmd.LeaderID | |
}) | |
me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {me.mLeaderID = ""}) | |
} | |
// hookEventsForLastVotedTerm maintains field: mLastVotedTerm | |
// update : feCandidateRequestVote / feVoteToCandidate | |
func (me *tFollowerState) hookEventsForLastVotedTerm() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) { | |
// before voting, check whether last vote timeout | |
now := time.Now().UnixNano() | |
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) { | |
// timeout, reset to empty | |
me.mLastVotedTerm = 0 | |
me.mLastVotedCandidateID = "" | |
me.mLastVotedTimestamp = 0 | |
} | |
}) | |
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {cmd := args[0].(*rpc.RequestVoteCmd) | |
me.mLastVotedTerm = cmd.Term | |
}) | |
} | |
// hookEventsForLastVotedCandicateID maintains field: mLastVotedCandidateID | |
// update : feCandidateRequestVote / feVoteToCandidate | |
func (me *tFollowerState) hookEventsForLastVotedCandicateID() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) { | |
// before voting, check whether last vote timeout | |
now := time.Now().UnixNano() | |
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) { | |
// timeout, reset to empty | |
me.mLastVotedTerm = 0 | |
me.mLastVotedCandidateID = "" | |
me.mLastVotedTimestamp = 0 | |
} | |
}) | |
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {cmd := args[0].(*rpc.RequestVoteCmd) | |
me.mLastVotedCandidateID = cmd.CandidateID | |
}) | |
} | |
// hookEventsForLastVotedTimestamp maintains field: mLastVotedTimestamp | |
// update : feCandidateRequestVote / feVoteToCandidate | |
func (me *tFollowerState) hookEventsForLastVotedTimestamp() {me.hook(feCandidateRequestVote, func(e string, args ...interface{}) { | |
// before voting, check whether last vote timeout | |
now := time.Now().UnixNano() | |
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) { | |
// timeout, reset to empty | |
me.mLastVotedTerm = 0 | |
me.mLastVotedCandidateID = "" | |
me.mLastVotedTimestamp = 0 | |
} | |
}) | |
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {me.mLastVotedTimestamp = time.Now().UnixNano()}) | |
} | |
// hookEventsForDisposedFlag maintains field: mDisposedFlag | |
// update: feInit / feDisposing | |
func (me *tFollowerState) hookEventsForDisposedFlag() {me.hook(feInit, func(e string, args ...interface{}) {me.mDiseposedFlag = false}) | |
me.hook(feDisposing, func(e string, args ...interface{}) {me.mDiseposedFlag = true}) | |
} | |
func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(feStart) | |
}) | |
} | |
func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {go func() {iCheckingTimeoutInterval := fnRandomizeDuration(timeout.HeartbeatTimeout / 3) | |
for range time.Tick(iCheckingTimeoutInterval) { | |
if me.mDiseposedFlag {return} | |
now := time.Now().UnixNano() | |
iHeartbeatTimeoutNanos := fnRandomizeInt64(int64(timeout.HeartbeatTimeout / time.Nanosecond)) | |
if now - me.mLeaderHeartbeatTimestamp >= iHeartbeatTimeoutNanos {me.raise(feLeaderHeartbeatTimeout) | |
return | |
} | |
} | |
}()} | |
func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {me.raise(feDisposing) | |
me.context.handleStateChanged(newCandidateState(me.context, me.mTerm + 1)) | |
} | |
func (me *tFollowerState) Role() roles.RaftRole {return roles.Follower} | |
// Heartbeat leader to follower | |
func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { | |
// check term | |
if cmd.Term < me.mTerm { | |
// invalid leader | |
ret.Code = rpc.HBTermMismatch | |
ret.Term = me.mTerm | |
return nil | |
} | |
// raise LeaderHeartbeat | |
me.raise(feLeaderHeartbeat, cmd) | |
// return | |
ret.Code = rpc.HBOk | |
return nil | |
} | |
// AppendLog leader to follower | |
func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { | |
ret.Term = me.mTerm | |
if cmd.Term < me.mTerm { | |
// invalid leader | |
ret.Code = rpc.ALTermMismatch | |
return nil | |
} | |
store := me.context.store() | |
entry := cmd.Entry | |
// check log: expecting appending action follows previous committing action | |
if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() { | |
// check log | |
e, log := store.GetLog(entry.Index) | |
if e != nil { | |
ret.Code = rpc.ALInternalError | |
return nil | |
} | |
if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm { | |
// bad log | |
ret.Code = rpc.ALIndexMismatch | |
ret.PrevLogIndex = store.LastCommittedIndex() | |
ret.PrevLogTerm = store.LastCommittedTerm() | |
return nil | |
} | |
// good log, but old, just ignore it | |
ret.Code = rpc.ALOk | |
return nil | |
} | |
// good log | |
e := store.Append(entry) | |
if e != nil { | |
ret.Code = rpc.ALInternalError | |
return nil | |
} else { | |
ret.Code = rpc.ALOk | |
return nil | |
} | |
} | |
// CommitLog leader to follower | |
func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {store := me.context.store() | |
if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() { | |
// bad index | |
ret.Code = rpc.CLLogNotFound | |
return nil | |
} | |
e := store.Commit(cmd.Index) | |
if e != nil { | |
ret.Code = rpc.CLInternalError | |
return nil | |
} | |
ret.Code = rpc.CLOk | |
return nil | |
} | |
// RequestVote candidate to follower | |
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { | |
// before voting | |
me.raise(feCandidateRequestVote, cmd) | |
// check term | |
if cmd.Term <= me.mTerm { | |
ret.Term = me.mTerm | |
ret.Code = rpc.RVTermMismatch | |
return nil | |
} | |
// check if already voted another | |
if me.mLastVotedTerm >= cmd.Term && me.mLastVotedCandidateID != "" && me.mLastVotedCandidateID != cmd.CandidateID { | |
ret.Code = rpc.RVVotedAnother | |
return nil | |
} | |
// check log index | |
if cmd.LastLogIndex < me.context.store().LastCommittedIndex() { | |
ret.Code = rpc.RVLogMismatch | |
return nil | |
} | |
// vote ok | |
me.raise(feVoteToCandidate, cmd) | |
ret.Term = cmd.Term | |
ret.Code = rpc.RVOk | |
return nil | |
} |
tCandidateState.go
基于事件驱动重构 Candidate 状态的逻辑编排,各字段施行读写拆散治理
package lsm | |
import ( | |
"learning/gooop/etcd/raft/roles" | |
"learning/gooop/etcd/raft/rpc" | |
"sync" | |
"time" | |
) | |
// tCandidateState presents a candidate node | |
type tCandidateState struct { | |
tEventDrivenModel | |
context IRaftLSM | |
mInitOnce sync.Once | |
mStartOnce sync.Once | |
// update: init / ceElectionTimeout | |
mTerm int64 | |
// update: ceInit / ceElectionTimeout / ceVoteToCandidate | |
mVotedTerm int64 | |
// update: ceInit / ceElectionTimeout / ceVoteToCandidate | |
mVotedCandidateID string | |
// update: ceInit / ceElectionTimeout / ceVoteToCandidate | |
mVotedTimestamp int64 | |
} | |
// trigger: init() | |
// args: empty | |
const ceInit = "candidate.init" | |
// trigger: Start() | |
// args: empty | |
const ceStart = "candidate.Start" | |
// trigger: whenStartThenWatchElectionTimeout() | |
// args: empty | |
const ceElectionTimeout = "candidate.ElectionTimeout" | |
// trigger: Heartbeat() / AppendLog() / CommitLog() | |
// args: empty | |
const ceLeaderAnnounced = "candidate.LeaderAnnounced" | |
// trigger: RequestVote() | |
// args: *rpc.RequestVoteCmd | |
const ceVoteToCandidate = "candidate.VoteToCandidate" | |
// trigger: whenLeaderHeartbeatThenSwitchToFollower() | |
// args: empty | |
const ceDisposing = "candidate.Disposing" | |
func newCandidateState(ctx IRaftLSM, term int64) IRaftState {it := new(tCandidateState) | |
it.init(ctx, term) | |
return it | |
} | |
func (me *tCandidateState) init(ctx IRaftLSM, term int64) {me.mInitOnce.Do(func() { | |
me.context = ctx | |
me.mTerm = term | |
me.initEventHandlers() | |
me.raise(ceInit) | |
}) | |
} | |
func (me *tCandidateState) initEventHandlers() { | |
// write only logic | |
me.hookEventsForTerm() | |
me.hookEventsForVotedTerm() | |
me.hookEventsForVotedCandidateID() | |
me.hookEventsForVotedTimestamp() | |
// read only logic | |
me.hook(ceLeaderAnnounced, | |
me.whenLeaderAnnouncedThenSwitchToFollower) | |
me.hook(ceElectionTimeout, | |
me.whenElectionTimeoutThenRequestVoteAgain) | |
} | |
// hookEventsForTerm maintains field: mTerm | |
// update: ceElectionTimeout | |
func (me *tCandidateState) hookEventsForTerm() {me.hook(ceElectionTimeout, func(e string, args ...interface{}) { | |
// when election timeout, term++ and request vote again | |
me.mTerm++ | |
}) | |
} | |
// hookEventsForVotedTerm maintains field: mVotedTerm | |
// update: ceInit / ceElectionTimeout / ceVoteToCandidate | |
func (me *tCandidateState) hookEventsForVotedTerm() {me.hook(ceInit, func(e string, args ...interface{}) { | |
// initially, vote to itself | |
me.mVotedTerm = me.mTerm | |
}) | |
me.hook(ceElectionTimeout, func(e string, args ...interface{}) { | |
// when timeout, reset to itself | |
me.mVotedTerm = me.mTerm | |
}) | |
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { | |
// after vote to candidate | |
cmd := args[0].(*rpc.RequestVoteCmd) | |
me.mVotedTerm = cmd.Term | |
}) | |
} | |
// hookEventsForVotedCandidateID maintains field: mVotedCandidateID | |
// update: ceInit / ceElectionTimeout / ceVoteToCandidate | |
func (me *tCandidateState) hookEventsForVotedCandidateID() {me.hook(ceInit, func(e string, args ...interface{}) { | |
// initially, vote to itself | |
me.mVotedCandidateID = me.context.config().ID() | |
}) | |
me.hook(ceElectionTimeout, func(e string, args ...interface{}) { | |
// when timeout, reset to itself | |
me.mVotedCandidateID = me.context.config().ID() | |
}) | |
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { | |
// after vote to candidate | |
cmd := args[0].(*rpc.RequestVoteCmd) | |
me.mVotedCandidateID = cmd.CandidateID | |
}) | |
} | |
func (me *tCandidateState) hookEventsForVotedTimestamp() {me.hook(ceInit, func(e string, args ...interface{}) { | |
// initially, vote to itself | |
me.mVotedTimestamp = time.Now().UnixNano() | |
}) | |
me.hook(ceElectionTimeout, func(e string, args ...interface{}) { | |
// when timeout, reset to itself | |
me.mVotedTimestamp = time.Now().UnixNano() | |
}) | |
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { | |
// after vote to candidate | |
me.mVotedTimestamp = time.Now().UnixNano() | |
}) | |
} | |
func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { | |
// check term | |
if cmd.Term <= me.mTerm { | |
// bad leader | |
ret.Code = rpc.HBTermMismatch | |
return nil | |
} | |
// new leader | |
me.raise(ceLeaderAnnounced) | |
// return ok | |
ret.Code = rpc.HBOk | |
return nil | |
} | |
func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { | |
// check term | |
if cmd.Term <= me.mTerm { | |
// bad leader | |
ret.Code = rpc.ALTermMismatch | |
return nil | |
} | |
// new leader | |
me.raise(ceLeaderAnnounced) | |
// ignore and return | |
ret.Code = rpc.ALInternalError | |
return nil | |
} | |
func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { | |
// ignore and return | |
ret.Code = rpc.CLInternalError | |
return nil | |
} | |
func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { | |
// todo: fixme | |
panic("implements me") | |
} | |
func (me *tCandidateState) Role() roles.RaftRole {return roles.Candidate} | |
func (me *tCandidateState) Start() {me.mStartOnce.Do(func() {me.raise(feStart) | |
}) | |
} | |
func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {me.raise(ceDisposing) | |
me.context.handleStateChanged(newFollowerState(me.context)) | |
} | |
func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ string, _ ...interface{}) { | |
// todo: fixme | |
panic("implements me") | |
} |
(未完待续)
正文完