手撸golang etcd raft协定之5

缘起

最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可实现。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。

指标

  • 依据raft协定,实现高可用分布式强统一的kv存储

子目标(Day 5)

  • 从新设计RPC接口,将原有稀释的两个接口合成为更易于了解和实现的四个接口( 尽信书则不如无书 -_-|| )
  • 依据新RPC接口重写Follower状态的实现

设计

  • IRaftRPC: 将原有稀释的两个接口合成为更易于了解和实现的四个接口
  • IRaftLSM: 增加局部包内反对接口
  • iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
  • ILogStore:革新适配新合成的RPC接口
  • tBoltDBStore:基于boltdb实现日志暂存,提交和利用
  • tFollowerState:依据新合成的RPC接口,重写Follower状态的实现(未实现)

IRaftRPC.go

将原有稀释的两个接口合成为更易于了解和实现的四个接口。尽信书则不如无书-_-||

package rpcimport "learning/gooop/etcd/raft/model"type IRaftRPC interface {    // leader to follower    Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error    // leader to follower    AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error    // leader to follower    CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error    // candidate to follower    RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error}type HeartbeatCmd struct {    LeaderID string    Term int64}type HeartbeatRet struct {    Code HBCode    Term int64}type HBCode intconst (    HBOk HBCode = iota    HBTermMismatch HBCode = iota)type RequestVoteCmd struct {    CandidateID  string    Term         int64    LastLogIndex int64    LastLogTerm int64}type RequestVoteRet struct {    Code RVCode    Term        int64}type RVCode intconst (    RVOk RVCode = iota    RVLogMismatch RVCode = iota    RVTermMismatch RVCode = iota    RVVotedAnother RVCode = iota)type AppendLogCmd struct {    LeaderID     string    Term         int64    Entry *model.LogEntry}type AppendLogRet struct {    Code ALCode    Term    int64    PrevLogIndex int64    PrevLogTerm int64}type ALCode intconst (    ALOk ALCode = iota    ALTermMismatch ALCode = iota    ALIndexMismatch ALCode = iota    ALInternalError ALCode = iota)type CommitLogCmd struct {    LeaderID     string    Term         int64    Index int64}type CommitLogRet struct {    Code CLCode}type CLCode intconst (    CLOk CLCode = iota    CLLogNotFound CLCode = iota    CLInternalError CLCode = iota)

IRaftLSM.go

增加局部包内反对接口

package lsmimport (    "learning/gooop/etcd/raft/config"    "learning/gooop/etcd/raft/rpc"    "learning/gooop/etcd/raft/store")// IRaftLSM raft无限状态自动机type IRaftLSM interface {    rpc.IRaftRPC    State() IRaftState    config() config.IRaftConfig    store() store.ILogStore    handleStateChanged(state IRaftState)}

iEventDrivenModel.go

抽取并实现事件驱动型的逻辑编排

package lsmtype tEventHandleFunc func(e string, args... interface{})type iEventDrivenModel interface {    hook(e string, handleFunc tEventHandleFunc)    raise(e string, args... interface{})}type tEventDrivenModel struct {    items map[string][]tEventHandleFunc}func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) {    arr, ok := me.items[e]    if ok {        me.items[e] = append(arr, handler)    } else {        me.items[e] = []tEventHandleFunc{handler }    }}func (me *tEventDrivenModel) raise(e string, args... interface{}) {    if handlers, ok := me.items[e];ok {        for _,it := range handlers {            it(e, args...)        }    }}

ILogStore.go

革新适配新合成的RPC接口

package storeimport "learning/gooop/etcd/raft/model"type ILogStore interface {    LastAppendedTerm() int64    LastAppendedIndex() int64    LastCommittedTerm() int64    LastCommittedIndex() int64    Append(entry *model.LogEntry) error    Commit(index int64) error    GetLog(index int64) (error, *model.LogEntry)}

tBoltDBStore.go

基于boltdb实现日志暂存,提交和利用

package storeimport (    "bytes"    "encoding/binary"    "errors"    "github.com/boltdb/bolt"    "learning/gooop/etcd/raft/model")type tBoltDBStore struct {    file  string    lastAppendedTerm  int64    lastAppendedIndex int64    lastCommittedTerm  int64    lastCommittedIndex int64    db bolt.DB}func NewBoltStore(file string) (error, ILogStore) {    db, err := bolt.Open(file, 0600, nil)    if err != nil {        return err, nil    }    store := new(tBoltDBStore)    err = db.Update(func(tx *bolt.Tx) error {        b, e := tx.CreateBucketIfNotExists(gMetaBucket)        if e != nil {            return e        }        v := b.Get(gKeyCommittedTerm)        if v == nil {            e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))            if e != nil {                return e            }            store.lastCommittedTerm = gDefaultTerm        } else {            store.lastCommittedTerm = bytesToInt64(v)        }        v = b.Get(gKeyCommittedIndex)        if v == nil {            e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))            if e != nil {                return e            }            store.lastCommittedIndex = gDefaultIndex        } else {            store.lastCommittedIndex = bytesToInt64(v)        }        b, e = tx.CreateBucketIfNotExists(gDataBucket)        if e != nil {            return e        }        e = tx.DeleteBucket(gUnstableBucket)        if e != nil {            return e        }        _, e = tx.CreateBucket(gUnstableBucket)        if e != nil {            return e        }        _, e = tx.CreateBucketIfNotExists(gCommittedBucket)        if e != nil {            return e        }        return nil    })    if err != nil {        return err, nil    }    return nil, store}func int64ToBytes(i int64) []byte {    buf := bytes.NewBuffer(make([]byte, 8))    _ = binary.Write(buf, binary.BigEndian, i)    return buf.Bytes()}func bytesToInt64(data []byte) int64 {    var i int64    buf := bytes.NewBuffer(data)    _ = binary.Read(buf, binary.BigEndian, &i)    return i}func (me *tBoltDBStore) LastCommittedTerm() int64 {    return me.lastCommittedTerm}func (me *tBoltDBStore) LastCommittedIndex() int64 {    return me.lastCommittedIndex}func (me *tBoltDBStore) LastAppendedTerm() int64 {    return me.lastAppendedTerm}func (me *tBoltDBStore) LastAppendedIndex() int64 {    return me.lastAppendedIndex}func (me *tBoltDBStore) Append(entry *model.LogEntry) error {    cmd := gCmdFactory.OfTag(entry.Tag)    cmd.Unmarshal(entry.Command)    e, entryData := entry.Marshal()    if e != nil {        return e    }    return me.db.Update(func(tx *bolt.Tx) error {        // save log to unstable        b := tx.Bucket(gUnstableBucket)        e = b.Put(int64ToBytes(entry.Index), entryData)        if e != nil {            return e        }        return nil    })}func (me *tBoltDBStore) Commit(index int64) error {    return me.db.Update(func(tx *bolt.Tx) error {        // read unstable log        ub := tx.Bucket(gUnstableBucket)        k := int64ToBytes(index)        data := ub.Get(k)        if data == nil {            return gErrorCommitLogNotFound        }        entry := new(model.LogEntry)        e := entry.Unmarshal(data)        if e != nil {            return e        }        // apply cmd        cmd := gCmdFactory.OfTag(entry.Tag)        cmd.Unmarshal(entry.Command)        e = cmd.Apply(tx)        if e != nil {            return e        }        // save to committed log        cb := tx.Bucket(gCommittedBucket)        e = cb.Put(k, data)        if e != nil {            return e        }        // update committed.index, committed.term        mb := tx.Bucket(gMetaBucket)        e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))        if e != nil {            return e        }        e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))        if e != nil {            return e        }        // del unstable.index        e = ub.Delete(k)        if e != nil {            return e        }        me.lastCommittedIndex = entry.Index        me.lastCommittedTerm = entry.Term        return nil    })}func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) {    ret := []*model.LogEntry{ nil }    e :=  me.db.View(func(tx *bolt.Tx) error {        k := int64ToBytes(index)        v := tx.Bucket(gCommittedBucket).Get(k)        if v == nil {            return nil        }        entry := new(model.LogEntry)        e := entry.Unmarshal(v)        if e != nil {            return e        }        ret[0] = entry        return nil    })    return e, ret[0]}var gMetaBucket = []byte("meta")var gUnstableBucket = []byte("unstable")var gCommittedBucket = []byte("committed")var gDataBucket = []byte("data")var gKeyCommittedIndex = []byte("committed.index")var gKeyCommittedTerm = []byte("committed.term")var gDefaultTerm int64 = 0var gDefaultIndex int64 = 0var gErrorCommitLogNotFound = errors.New("committing log not found")

tFollowerState.go

依据新合成的RPC接口,重写Follower状态的实现(未实现)

package lsmimport (    "learning/gooop/etcd/raft/roles"    "learning/gooop/etcd/raft/rpc"    "learning/gooop/etcd/raft/timeout"    "sync"    "time")// tFollowerState presents a follower nodetype tFollowerState struct {    tEventDrivenModel    context IRaftLSM    mInitOnce  sync.Once    mStartOnce sync.Once    mDisposeOnce sync.Once    // updated when init, set term == store.lastCommittedTerm    // updated when leader.heartbeat    mTerm int64    // updated when leader.heartbeat    mLeaderHeartbeatClock int64    mVotedLeaderID string    mVotedTimestamp int64}const feStart string = "follower.Start"const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"func newFollowerState(ctx IRaftLSM) IRaftState {    it := new(tFollowerState)    it.init(ctx)    return it}func (me *tFollowerState) init(ctx IRaftLSM) {    me.mInitOnce.Do(func() {        me.context = ctx        me.mTerm = ctx.store().LastCommittedTerm()        me.mLeaderHeartbeatClock = 0        me.initEventHandlers()    })}func (me *tFollowerState) initEventHandlers() {    me.hook(feStart,        me.whenStartThenBeginWatchLeaderTimeout)    me.hook(feLeaderHeartbeatTimeout,        me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)}func (me *tFollowerState) Start() {    me.mStartOnce.Do(func() {        me.raise(feStart)    })}func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {    go func() {        iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3        iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)        for range time.Tick(iCheckingTimeoutInterval) {            now := time.Now().UnixNano()            if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {                me.raise(feLeaderHeartbeatTimeout)                return            }        }    }()}func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {    panic("implements me")}func (me *tFollowerState) Role() roles.RaftRole {    return roles.Follower}// Heartbeat leader to followerfunc (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {    if cmd.Term < me.mTerm {        // invalid leader        ret.Code = rpc.HBTermMismatch        ret.Term = me.mTerm        return nil    } else if cmd.Term > me.mTerm {        // new leader        me.mTerm = cmd.Term    }    // update heartbeat clock and return    me.mLeaderHeartbeatClock = time.Now().UnixNano()    ret.Code = rpc.HBOk    return nil}// AppendLog leader to followerfunc (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {    ret.Term = me.mTerm    if cmd.Term < me.mTerm {        // invalid leader        ret.Code = rpc.ALTermMismatch        return nil    }    store := me.context.store()    entry := cmd.Entry    // check log: expecting appending action follows previous committing action    if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {        // check log        e, log := store.GetLog(entry.Index)        if e != nil {            ret.Code = rpc.ALInternalError            return nil        }        if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {            // bad log            ret.Code = rpc.ALIndexMismatch            ret.PrevLogIndex = store.LastCommittedIndex()            ret.PrevLogTerm = store.LastCommittedTerm()            return nil        }        // good log, but old, just ignore it        ret.Code = rpc.ALOk        return nil    }    // good log    e := store.Append(entry)    if e != nil {        ret.Code = rpc.ALInternalError        return nil    } else {        ret.Code = rpc.ALOk        return nil    }}// CommitLog leader to followerfunc (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {    store := me.context.store()    if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {        // bad index        ret.Code = rpc.CLLogNotFound        return nil    }    e := store.Commit(cmd.Index)    if e != nil {        ret.Code = rpc.CLInternalError        return nil    }    ret.Code = rpc.CLOk    return nil}// RequestVote candidate to followerfunc (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {    panic("implements me")}

(未完待续)