乐趣区

关于golang:手撸golang-etcd-raft协议之5

手撸 golang etcd raft 协定之 5

缘起

最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop

raft 分布式一致性算法

 分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft 算法的基本操作只需 2 种 RPC 即可实现。RequestVote RPC 是在选举过程中通过旧的 Leader 触发的,AppendEntries RPC 是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。

指标

  • 依据 raft 协定,实现高可用分布式强统一的 kv 存储

子目标(Day 5)

  • 从新设计 RPC 接口,将原有稀释的两个接口合成为更易于了解和实现的四个接口 (尽信书则不如无书 -_-||)
  • 依据新 RPC 接口重写 Follower 状态的实现

设计

  • IRaftRPC: 将原有稀释的两个接口合成为更易于了解和实现的四个接口
  • IRaftLSM: 增加局部包内反对接口
  • iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
  • ILogStore:革新适配新合成的 RPC 接口
  • tBoltDBStore:基于 boltdb 实现日志暂存,提交和利用
  • tFollowerState:依据新合成的 RPC 接口,重写 Follower 状态的实现(未实现)

IRaftRPC.go

将原有稀释的两个接口合成为更易于了解和实现的四个接口。尽信书则不如无书 -_-||

package rpc

import "learning/gooop/etcd/raft/model"

type IRaftRPC interface {
    // leader to follower
    Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error

    // leader to follower
    AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error

    // leader to follower
    CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error

    // candidate to follower
    RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
}


type HeartbeatCmd struct {
    LeaderID string
    Term int64
}

type HeartbeatRet struct {
    Code HBCode
    Term int64
}

type HBCode int
const (
    HBOk HBCode = iota
    HBTermMismatch HBCode = iota
)

type RequestVoteCmd struct {
    CandidateID  string
    Term         int64
    LastLogIndex int64
    LastLogTerm int64
}

type RequestVoteRet struct {
    Code RVCode
    Term        int64
}

type RVCode int
const (
    RVOk RVCode = iota
    RVLogMismatch RVCode = iota
    RVTermMismatch RVCode = iota
    RVVotedAnother RVCode = iota
)

type AppendLogCmd struct {
    LeaderID     string
    Term         int64
    Entry *model.LogEntry
}

type AppendLogRet struct {
    Code ALCode
    Term    int64
    PrevLogIndex int64
    PrevLogTerm int64
}

type ALCode int
const (
    ALOk ALCode = iota
    ALTermMismatch ALCode = iota
    ALIndexMismatch ALCode = iota
    ALInternalError ALCode = iota
)

type CommitLogCmd struct {
    LeaderID     string
    Term         int64
    Index int64
}

type CommitLogRet struct {Code CLCode}

type CLCode int
const (
    CLOk CLCode = iota
    CLLogNotFound CLCode = iota
    CLInternalError CLCode = iota
)

IRaftLSM.go

增加局部包内反对接口

package lsm

import (
    "learning/gooop/etcd/raft/config"
    "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/store"
)

// IRaftLSM raft 无限状态自动机
type IRaftLSM interface {
    rpc.IRaftRPC


    State() IRaftState

    config() config.IRaftConfig
    store() store.ILogStore
    handleStateChanged(state IRaftState)
}

iEventDrivenModel.go

抽取并实现事件驱动型的逻辑编排

package lsm

type tEventHandleFunc func(e string, args... interface{})

type iEventDrivenModel interface {hook(e string, handleFunc tEventHandleFunc)
    raise(e string, args... interface{})
}

type tEventDrivenModel struct {items map[string][]tEventHandleFunc}

func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) {arr, ok := me.items[e]
    if ok {me.items[e] = append(arr, handler)
    } else {me.items[e] = []tEventHandleFunc{handler}
    }
}


func (me *tEventDrivenModel) raise(e string, args... interface{}) {if handlers, ok := me.items[e];ok {
        for _,it := range handlers {it(e, args...)
        }
    }
}

ILogStore.go

革新适配新合成的 RPC 接口

package store

import "learning/gooop/etcd/raft/model"

type ILogStore interface {LastAppendedTerm() int64
    LastAppendedIndex() int64
    LastCommittedTerm() int64
    LastCommittedIndex() int64

    Append(entry *model.LogEntry) error
    Commit(index int64) error
    GetLog(index int64) (error, *model.LogEntry)
}

tBoltDBStore.go

基于 boltdb 实现日志暂存,提交和利用

package store

import (
    "bytes"
    "encoding/binary"
    "errors"
    "github.com/boltdb/bolt"
    "learning/gooop/etcd/raft/model"
)

type tBoltDBStore struct {
    file  string
    lastAppendedTerm  int64
    lastAppendedIndex int64
    lastCommittedTerm  int64
    lastCommittedIndex int64

    db bolt.DB
}



func NewBoltStore(file string) (error, ILogStore) {db, err := bolt.Open(file, 0600, nil)
    if err != nil {return err, nil}

    store := new(tBoltDBStore)
    err = db.Update(func(tx *bolt.Tx) error {b, e := tx.CreateBucketIfNotExists(gMetaBucket)
        if e != nil {return e}

        v := b.Get(gKeyCommittedTerm)
        if v == nil {e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
            if e != nil {return e}
            store.lastCommittedTerm = gDefaultTerm

        } else {store.lastCommittedTerm = bytesToInt64(v)
        }

        v = b.Get(gKeyCommittedIndex)
        if v == nil {e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
            if e != nil {return e}
            store.lastCommittedIndex = gDefaultIndex

        } else {store.lastCommittedIndex = bytesToInt64(v)
        }

        b, e = tx.CreateBucketIfNotExists(gDataBucket)
        if e != nil {return e}

        e = tx.DeleteBucket(gUnstableBucket)
        if e != nil {return e}
        _, e = tx.CreateBucket(gUnstableBucket)
        if e != nil {return e}

        _, e = tx.CreateBucketIfNotExists(gCommittedBucket)
        if e != nil {return e}

        return nil
    })

    if err != nil {return err, nil}

    return nil, store
}

func int64ToBytes(i int64) []byte {buf := bytes.NewBuffer(make([]byte, 8))
    _ = binary.Write(buf, binary.BigEndian, i)
    return buf.Bytes()}

func bytesToInt64(data []byte) int64 {
    var i int64
    buf := bytes.NewBuffer(data)
    _ = binary.Read(buf, binary.BigEndian, &i)
    return i
}


func (me *tBoltDBStore) LastCommittedTerm() int64 {return me.lastCommittedTerm}

func (me *tBoltDBStore) LastCommittedIndex() int64 {return me.lastCommittedIndex}

func (me *tBoltDBStore) LastAppendedTerm() int64 {return me.lastAppendedTerm}

func (me *tBoltDBStore) LastAppendedIndex() int64 {return me.lastAppendedIndex}

func (me *tBoltDBStore) Append(entry *model.LogEntry) error {cmd := gCmdFactory.OfTag(entry.Tag)
    cmd.Unmarshal(entry.Command)

    e, entryData := entry.Marshal()
    if e != nil {return e}

    return me.db.Update(func(tx *bolt.Tx) error {
        // save log to unstable
        b := tx.Bucket(gUnstableBucket)
        e = b.Put(int64ToBytes(entry.Index), entryData)
        if e != nil {return e}

        return nil
    })
}

func (me *tBoltDBStore) Commit(index int64) error {return me.db.Update(func(tx *bolt.Tx) error {
        // read unstable log
        ub := tx.Bucket(gUnstableBucket)
        k := int64ToBytes(index)
        data := ub.Get(k)
        if data == nil {return gErrorCommitLogNotFound}

        entry := new(model.LogEntry)
        e := entry.Unmarshal(data)
        if e != nil {return e}

        // apply cmd
        cmd := gCmdFactory.OfTag(entry.Tag)
        cmd.Unmarshal(entry.Command)
        e = cmd.Apply(tx)
        if e != nil {return e}

        // save to committed log
        cb := tx.Bucket(gCommittedBucket)
        e = cb.Put(k, data)
        if e != nil {return e}

        // update committed.index, committed.term
        mb := tx.Bucket(gMetaBucket)
        e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
        if e != nil {return e}

        e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
        if e != nil {return e}

        // del unstable.index
        e = ub.Delete(k)
        if e != nil {return e}

        me.lastCommittedIndex = entry.Index
        me.lastCommittedTerm = entry.Term
        return nil
    })
}


func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) {ret := []*model.LogEntry{nil}
    e :=  me.db.View(func(tx *bolt.Tx) error {k := int64ToBytes(index)
        v := tx.Bucket(gCommittedBucket).Get(k)

        if v == nil {return nil}

        entry := new(model.LogEntry)
        e := entry.Unmarshal(v)
        if e != nil {return e}

        ret[0] = entry
        return nil
    })

    return e, ret[0]
}

var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")

var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")

var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0

var gErrorCommitLogNotFound = errors.New("committing log not found")

tFollowerState.go

依据新合成的 RPC 接口,重写 Follower 状态的实现(未实现)

package lsm

import (
    "learning/gooop/etcd/raft/roles"
    "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/timeout"
    "sync"
    "time"
)

// tFollowerState presents a follower node
type tFollowerState struct {
    tEventDrivenModel

    context IRaftLSM

    mInitOnce  sync.Once
    mStartOnce sync.Once
    mDisposeOnce sync.Once

    // updated when init, set term == store.lastCommittedTerm
    // updated when leader.heartbeat
    mTerm int64

    // updated when leader.heartbeat
    mLeaderHeartbeatClock int64

    mVotedLeaderID string
    mVotedTimestamp int64
}

const feStart string = "follower.Start"
const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"

func newFollowerState(ctx IRaftLSM) IRaftState {it := new(tFollowerState)
    it.init(ctx)
    return it
}

func (me *tFollowerState) init(ctx IRaftLSM) {me.mInitOnce.Do(func() {
        me.context = ctx
        me.mTerm = ctx.store().LastCommittedTerm()
        me.mLeaderHeartbeatClock = 0
        me.initEventHandlers()})
}


func (me *tFollowerState) initEventHandlers() {
    me.hook(feStart,
        me.whenStartThenBeginWatchLeaderTimeout)
    me.hook(feLeaderHeartbeatTimeout,
        me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
}

func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)
    })
}

func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {go func() {
        iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
        iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
        for range time.Tick(iCheckingTimeoutInterval) {now := time.Now().UnixNano()
            if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {me.raise(feLeaderHeartbeatTimeout)
                return
            }
        }
    }()}

func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {panic("implements me")
}

func (me *tFollowerState) Role() roles.RaftRole {return roles.Follower}


// Heartbeat leader to follower
func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
    if cmd.Term < me.mTerm {
        // invalid leader
        ret.Code = rpc.HBTermMismatch
        ret.Term = me.mTerm
        return nil

    } else if cmd.Term > me.mTerm {
        // new leader
        me.mTerm = cmd.Term
    }

    // update heartbeat clock and return
    me.mLeaderHeartbeatClock = time.Now().UnixNano()
    ret.Code = rpc.HBOk
    return nil
}

// AppendLog leader to follower
func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
    ret.Term = me.mTerm

    if cmd.Term < me.mTerm {
        // invalid leader
        ret.Code = rpc.ALTermMismatch
        return nil
    }

    store := me.context.store()
    entry := cmd.Entry

    // check log: expecting appending action follows previous committing action
    if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
        // check log
        e, log := store.GetLog(entry.Index)
        if e != nil {
            ret.Code = rpc.ALInternalError
            return nil
        }

        if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
            // bad log
            ret.Code = rpc.ALIndexMismatch
            ret.PrevLogIndex = store.LastCommittedIndex()
            ret.PrevLogTerm = store.LastCommittedTerm()
            return nil
        }

        // good log, but old, just ignore it
        ret.Code = rpc.ALOk
        return nil
    }

    // good log
    e := store.Append(entry)
    if e != nil {
        ret.Code = rpc.ALInternalError
        return nil
    } else {
        ret.Code = rpc.ALOk
        return nil
    }
}

// CommitLog leader to follower
func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {store := me.context.store()
    if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
        // bad index
        ret.Code = rpc.CLLogNotFound
        return nil
    }

    e := store.Commit(cmd.Index)
    if e != nil {
        ret.Code = rpc.CLInternalError
        return nil
    }

    ret.Code = rpc.CLOk
    return nil
}

// RequestVote candidate to follower
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {panic("implements me")
}

(未完待续)

退出移动版