手撸golang etcd raft协定之5
缘起
最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可实现。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。
指标
- 依据raft协定,实现高可用分布式强统一的kv存储
子目标(Day 5)
- 从新设计RPC接口,将原有稀释的两个接口合成为更易于了解和实现的四个接口( 尽信书则不如无书 -_-|| )
- 依据新RPC接口重写Follower状态的实现
设计
- IRaftRPC: 将原有稀释的两个接口合成为更易于了解和实现的四个接口
- IRaftLSM: 增加局部包内反对接口
- iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
- ILogStore:革新适配新合成的RPC接口
- tBoltDBStore:基于boltdb实现日志暂存,提交和利用
- tFollowerState:依据新合成的RPC接口,重写Follower状态的实现(未实现)
IRaftRPC.go
将原有稀释的两个接口合成为更易于了解和实现的四个接口。尽信书则不如无书-_-||
package rpcimport "learning/gooop/etcd/raft/model"type IRaftRPC interface { // leader to follower Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error // leader to follower AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error // leader to follower CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error // candidate to follower RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error}type HeartbeatCmd struct { LeaderID string Term int64}type HeartbeatRet struct { Code HBCode Term int64}type HBCode intconst ( HBOk HBCode = iota HBTermMismatch HBCode = iota)type RequestVoteCmd struct { CandidateID string Term int64 LastLogIndex int64 LastLogTerm int64}type RequestVoteRet struct { Code RVCode Term int64}type RVCode intconst ( RVOk RVCode = iota RVLogMismatch RVCode = iota RVTermMismatch RVCode = iota RVVotedAnother RVCode = iota)type AppendLogCmd struct { LeaderID string Term int64 Entry *model.LogEntry}type AppendLogRet struct { Code ALCode Term int64 PrevLogIndex int64 PrevLogTerm int64}type ALCode intconst ( ALOk ALCode = iota ALTermMismatch ALCode = iota ALIndexMismatch ALCode = iota ALInternalError ALCode = iota)type CommitLogCmd struct { LeaderID string Term int64 Index int64}type CommitLogRet struct { Code CLCode}type CLCode intconst ( CLOk CLCode = iota CLLogNotFound CLCode = iota CLInternalError CLCode = iota)
IRaftLSM.go
增加局部包内反对接口
package lsmimport ( "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/store")// IRaftLSM raft无限状态自动机type IRaftLSM interface { rpc.IRaftRPC State() IRaftState config() config.IRaftConfig store() store.ILogStore handleStateChanged(state IRaftState)}
iEventDrivenModel.go
抽取并实现事件驱动型的逻辑编排
package lsmtype tEventHandleFunc func(e string, args... interface{})type iEventDrivenModel interface { hook(e string, handleFunc tEventHandleFunc) raise(e string, args... interface{})}type tEventDrivenModel struct { items map[string][]tEventHandleFunc}func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) { arr, ok := me.items[e] if ok { me.items[e] = append(arr, handler) } else { me.items[e] = []tEventHandleFunc{handler } }}func (me *tEventDrivenModel) raise(e string, args... interface{}) { if handlers, ok := me.items[e];ok { for _,it := range handlers { it(e, args...) } }}
ILogStore.go
革新适配新合成的RPC接口
package storeimport "learning/gooop/etcd/raft/model"type ILogStore interface { LastAppendedTerm() int64 LastAppendedIndex() int64 LastCommittedTerm() int64 LastCommittedIndex() int64 Append(entry *model.LogEntry) error Commit(index int64) error GetLog(index int64) (error, *model.LogEntry)}
tBoltDBStore.go
基于boltdb实现日志暂存,提交和利用
package storeimport ( "bytes" "encoding/binary" "errors" "github.com/boltdb/bolt" "learning/gooop/etcd/raft/model")type tBoltDBStore struct { file string lastAppendedTerm int64 lastAppendedIndex int64 lastCommittedTerm int64 lastCommittedIndex int64 db bolt.DB}func NewBoltStore(file string) (error, ILogStore) { db, err := bolt.Open(file, 0600, nil) if err != nil { return err, nil } store := new(tBoltDBStore) err = db.Update(func(tx *bolt.Tx) error { b, e := tx.CreateBucketIfNotExists(gMetaBucket) if e != nil { return e } v := b.Get(gKeyCommittedTerm) if v == nil { e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm)) if e != nil { return e } store.lastCommittedTerm = gDefaultTerm } else { store.lastCommittedTerm = bytesToInt64(v) } v = b.Get(gKeyCommittedIndex) if v == nil { e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex)) if e != nil { return e } store.lastCommittedIndex = gDefaultIndex } else { store.lastCommittedIndex = bytesToInt64(v) } b, e = tx.CreateBucketIfNotExists(gDataBucket) if e != nil { return e } e = tx.DeleteBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucketIfNotExists(gCommittedBucket) if e != nil { return e } return nil }) if err != nil { return err, nil } return nil, store}func int64ToBytes(i int64) []byte { buf := bytes.NewBuffer(make([]byte, 8)) _ = binary.Write(buf, binary.BigEndian, i) return buf.Bytes()}func bytesToInt64(data []byte) int64 { var i int64 buf := bytes.NewBuffer(data) _ = binary.Read(buf, binary.BigEndian, &i) return i}func (me *tBoltDBStore) LastCommittedTerm() int64 { return me.lastCommittedTerm}func (me *tBoltDBStore) LastCommittedIndex() int64 { return me.lastCommittedIndex}func (me *tBoltDBStore) LastAppendedTerm() int64 { return me.lastAppendedTerm}func (me *tBoltDBStore) LastAppendedIndex() int64 { return me.lastAppendedIndex}func (me *tBoltDBStore) Append(entry *model.LogEntry) error { cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e, entryData := entry.Marshal() if e != nil { return e } return me.db.Update(func(tx *bolt.Tx) error { // save log to unstable b := tx.Bucket(gUnstableBucket) e = b.Put(int64ToBytes(entry.Index), entryData) if e != nil { return e } return nil })}func (me *tBoltDBStore) Commit(index int64) error { return me.db.Update(func(tx *bolt.Tx) error { // read unstable log ub := tx.Bucket(gUnstableBucket) k := int64ToBytes(index) data := ub.Get(k) if data == nil { return gErrorCommitLogNotFound } entry := new(model.LogEntry) e := entry.Unmarshal(data) if e != nil { return e } // apply cmd cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e = cmd.Apply(tx) if e != nil { return e } // save to committed log cb := tx.Bucket(gCommittedBucket) e = cb.Put(k, data) if e != nil { return e } // update committed.index, committed.term mb := tx.Bucket(gMetaBucket) e = mb.Put(gKeyCommittedIndex, int64ToBytes(index)) if e != nil { return e } e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term)) if e != nil { return e } // del unstable.index e = ub.Delete(k) if e != nil { return e } me.lastCommittedIndex = entry.Index me.lastCommittedTerm = entry.Term return nil })}func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) { ret := []*model.LogEntry{ nil } e := me.db.View(func(tx *bolt.Tx) error { k := int64ToBytes(index) v := tx.Bucket(gCommittedBucket).Get(k) if v == nil { return nil } entry := new(model.LogEntry) e := entry.Unmarshal(v) if e != nil { return e } ret[0] = entry return nil }) return e, ret[0]}var gMetaBucket = []byte("meta")var gUnstableBucket = []byte("unstable")var gCommittedBucket = []byte("committed")var gDataBucket = []byte("data")var gKeyCommittedIndex = []byte("committed.index")var gKeyCommittedTerm = []byte("committed.term")var gDefaultTerm int64 = 0var gDefaultIndex int64 = 0var gErrorCommitLogNotFound = errors.New("committing log not found")
tFollowerState.go
依据新合成的RPC接口,重写Follower状态的实现(未实现)
package lsmimport ( "learning/gooop/etcd/raft/roles" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/timeout" "sync" "time")// tFollowerState presents a follower nodetype tFollowerState struct { tEventDrivenModel context IRaftLSM mInitOnce sync.Once mStartOnce sync.Once mDisposeOnce sync.Once // updated when init, set term == store.lastCommittedTerm // updated when leader.heartbeat mTerm int64 // updated when leader.heartbeat mLeaderHeartbeatClock int64 mVotedLeaderID string mVotedTimestamp int64}const feStart string = "follower.Start"const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"func newFollowerState(ctx IRaftLSM) IRaftState { it := new(tFollowerState) it.init(ctx) return it}func (me *tFollowerState) init(ctx IRaftLSM) { me.mInitOnce.Do(func() { me.context = ctx me.mTerm = ctx.store().LastCommittedTerm() me.mLeaderHeartbeatClock = 0 me.initEventHandlers() })}func (me *tFollowerState) initEventHandlers() { me.hook(feStart, me.whenStartThenBeginWatchLeaderTimeout) me.hook(feLeaderHeartbeatTimeout, me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)}func (me *tFollowerState) Start() { me.mStartOnce.Do(func() { me.raise(feStart) })}func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) { go func() { iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3 iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond) for range time.Tick(iCheckingTimeoutInterval) { now := time.Now().UnixNano() if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos { me.raise(feLeaderHeartbeatTimeout) return } } }()}func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) { panic("implements me")}func (me *tFollowerState) Role() roles.RaftRole { return roles.Follower}// Heartbeat leader to followerfunc (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { if cmd.Term < me.mTerm { // invalid leader ret.Code = rpc.HBTermMismatch ret.Term = me.mTerm return nil } else if cmd.Term > me.mTerm { // new leader me.mTerm = cmd.Term } // update heartbeat clock and return me.mLeaderHeartbeatClock = time.Now().UnixNano() ret.Code = rpc.HBOk return nil}// AppendLog leader to followerfunc (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { ret.Term = me.mTerm if cmd.Term < me.mTerm { // invalid leader ret.Code = rpc.ALTermMismatch return nil } store := me.context.store() entry := cmd.Entry // check log: expecting appending action follows previous committing action if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() { // check log e, log := store.GetLog(entry.Index) if e != nil { ret.Code = rpc.ALInternalError return nil } if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm { // bad log ret.Code = rpc.ALIndexMismatch ret.PrevLogIndex = store.LastCommittedIndex() ret.PrevLogTerm = store.LastCommittedTerm() return nil } // good log, but old, just ignore it ret.Code = rpc.ALOk return nil } // good log e := store.Append(entry) if e != nil { ret.Code = rpc.ALInternalError return nil } else { ret.Code = rpc.ALOk return nil }}// CommitLog leader to followerfunc (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { store := me.context.store() if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() { // bad index ret.Code = rpc.CLLogNotFound return nil } e := store.Commit(cmd.Index) if e != nil { ret.Code = rpc.CLInternalError return nil } ret.Code = rpc.CLOk return nil}// RequestVote candidate to followerfunc (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { panic("implements me")}
(未完待续)