手撸 golang etcd raft 协定之 5
缘起
最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop
raft 分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft 算法的基本操作只需 2 种 RPC 即可实现。RequestVote RPC 是在选举过程中通过旧的 Leader 触发的,AppendEntries RPC 是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。
指标
- 依据 raft 协定,实现高可用分布式强统一的 kv 存储
子目标(Day 5)
- 从新设计 RPC 接口,将原有稀释的两个接口合成为更易于了解和实现的四个接口 (尽信书则不如无书 -_-||)
- 依据新 RPC 接口重写 Follower 状态的实现
设计
- IRaftRPC: 将原有稀释的两个接口合成为更易于了解和实现的四个接口
- IRaftLSM: 增加局部包内反对接口
- iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
- ILogStore:革新适配新合成的 RPC 接口
- tBoltDBStore:基于 boltdb 实现日志暂存,提交和利用
- tFollowerState:依据新合成的 RPC 接口,重写 Follower 状态的实现(未实现)
IRaftRPC.go
将原有稀释的两个接口合成为更易于了解和实现的四个接口。尽信书则不如无书 -_-||
package rpc
import "learning/gooop/etcd/raft/model"
type IRaftRPC interface {
// leader to follower
Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error
// leader to follower
AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error
// leader to follower
CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error
// candidate to follower
RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
}
type HeartbeatCmd struct {
LeaderID string
Term int64
}
type HeartbeatRet struct {
Code HBCode
Term int64
}
type HBCode int
const (
HBOk HBCode = iota
HBTermMismatch HBCode = iota
)
type RequestVoteCmd struct {
CandidateID string
Term int64
LastLogIndex int64
LastLogTerm int64
}
type RequestVoteRet struct {
Code RVCode
Term int64
}
type RVCode int
const (
RVOk RVCode = iota
RVLogMismatch RVCode = iota
RVTermMismatch RVCode = iota
RVVotedAnother RVCode = iota
)
type AppendLogCmd struct {
LeaderID string
Term int64
Entry *model.LogEntry
}
type AppendLogRet struct {
Code ALCode
Term int64
PrevLogIndex int64
PrevLogTerm int64
}
type ALCode int
const (
ALOk ALCode = iota
ALTermMismatch ALCode = iota
ALIndexMismatch ALCode = iota
ALInternalError ALCode = iota
)
type CommitLogCmd struct {
LeaderID string
Term int64
Index int64
}
type CommitLogRet struct {Code CLCode}
type CLCode int
const (
CLOk CLCode = iota
CLLogNotFound CLCode = iota
CLInternalError CLCode = iota
)
IRaftLSM.go
增加局部包内反对接口
package lsm
import (
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/store"
)
// IRaftLSM raft 无限状态自动机
type IRaftLSM interface {
rpc.IRaftRPC
State() IRaftState
config() config.IRaftConfig
store() store.ILogStore
handleStateChanged(state IRaftState)
}
iEventDrivenModel.go
抽取并实现事件驱动型的逻辑编排
package lsm
type tEventHandleFunc func(e string, args... interface{})
type iEventDrivenModel interface {hook(e string, handleFunc tEventHandleFunc)
raise(e string, args... interface{})
}
type tEventDrivenModel struct {items map[string][]tEventHandleFunc}
func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) {arr, ok := me.items[e]
if ok {me.items[e] = append(arr, handler)
} else {me.items[e] = []tEventHandleFunc{handler}
}
}
func (me *tEventDrivenModel) raise(e string, args... interface{}) {if handlers, ok := me.items[e];ok {
for _,it := range handlers {it(e, args...)
}
}
}
ILogStore.go
革新适配新合成的 RPC 接口
package store
import "learning/gooop/etcd/raft/model"
type ILogStore interface {LastAppendedTerm() int64
LastAppendedIndex() int64
LastCommittedTerm() int64
LastCommittedIndex() int64
Append(entry *model.LogEntry) error
Commit(index int64) error
GetLog(index int64) (error, *model.LogEntry)
}
tBoltDBStore.go
基于 boltdb 实现日志暂存,提交和利用
package store
import (
"bytes"
"encoding/binary"
"errors"
"github.com/boltdb/bolt"
"learning/gooop/etcd/raft/model"
)
type tBoltDBStore struct {
file string
lastAppendedTerm int64
lastAppendedIndex int64
lastCommittedTerm int64
lastCommittedIndex int64
db bolt.DB
}
func NewBoltStore(file string) (error, ILogStore) {db, err := bolt.Open(file, 0600, nil)
if err != nil {return err, nil}
store := new(tBoltDBStore)
err = db.Update(func(tx *bolt.Tx) error {b, e := tx.CreateBucketIfNotExists(gMetaBucket)
if e != nil {return e}
v := b.Get(gKeyCommittedTerm)
if v == nil {e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
if e != nil {return e}
store.lastCommittedTerm = gDefaultTerm
} else {store.lastCommittedTerm = bytesToInt64(v)
}
v = b.Get(gKeyCommittedIndex)
if v == nil {e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
if e != nil {return e}
store.lastCommittedIndex = gDefaultIndex
} else {store.lastCommittedIndex = bytesToInt64(v)
}
b, e = tx.CreateBucketIfNotExists(gDataBucket)
if e != nil {return e}
e = tx.DeleteBucket(gUnstableBucket)
if e != nil {return e}
_, e = tx.CreateBucket(gUnstableBucket)
if e != nil {return e}
_, e = tx.CreateBucketIfNotExists(gCommittedBucket)
if e != nil {return e}
return nil
})
if err != nil {return err, nil}
return nil, store
}
func int64ToBytes(i int64) []byte {buf := bytes.NewBuffer(make([]byte, 8))
_ = binary.Write(buf, binary.BigEndian, i)
return buf.Bytes()}
func bytesToInt64(data []byte) int64 {
var i int64
buf := bytes.NewBuffer(data)
_ = binary.Read(buf, binary.BigEndian, &i)
return i
}
func (me *tBoltDBStore) LastCommittedTerm() int64 {return me.lastCommittedTerm}
func (me *tBoltDBStore) LastCommittedIndex() int64 {return me.lastCommittedIndex}
func (me *tBoltDBStore) LastAppendedTerm() int64 {return me.lastAppendedTerm}
func (me *tBoltDBStore) LastAppendedIndex() int64 {return me.lastAppendedIndex}
func (me *tBoltDBStore) Append(entry *model.LogEntry) error {cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e, entryData := entry.Marshal()
if e != nil {return e}
return me.db.Update(func(tx *bolt.Tx) error {
// save log to unstable
b := tx.Bucket(gUnstableBucket)
e = b.Put(int64ToBytes(entry.Index), entryData)
if e != nil {return e}
return nil
})
}
func (me *tBoltDBStore) Commit(index int64) error {return me.db.Update(func(tx *bolt.Tx) error {
// read unstable log
ub := tx.Bucket(gUnstableBucket)
k := int64ToBytes(index)
data := ub.Get(k)
if data == nil {return gErrorCommitLogNotFound}
entry := new(model.LogEntry)
e := entry.Unmarshal(data)
if e != nil {return e}
// apply cmd
cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e = cmd.Apply(tx)
if e != nil {return e}
// save to committed log
cb := tx.Bucket(gCommittedBucket)
e = cb.Put(k, data)
if e != nil {return e}
// update committed.index, committed.term
mb := tx.Bucket(gMetaBucket)
e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
if e != nil {return e}
e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
if e != nil {return e}
// del unstable.index
e = ub.Delete(k)
if e != nil {return e}
me.lastCommittedIndex = entry.Index
me.lastCommittedTerm = entry.Term
return nil
})
}
func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) {ret := []*model.LogEntry{nil}
e := me.db.View(func(tx *bolt.Tx) error {k := int64ToBytes(index)
v := tx.Bucket(gCommittedBucket).Get(k)
if v == nil {return nil}
entry := new(model.LogEntry)
e := entry.Unmarshal(v)
if e != nil {return e}
ret[0] = entry
return nil
})
return e, ret[0]
}
var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")
var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")
var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0
var gErrorCommitLogNotFound = errors.New("committing log not found")
tFollowerState.go
依据新合成的 RPC 接口,重写 Follower 状态的实现(未实现)
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
// tFollowerState presents a follower node
type tFollowerState struct {
tEventDrivenModel
context IRaftLSM
mInitOnce sync.Once
mStartOnce sync.Once
mDisposeOnce sync.Once
// updated when init, set term == store.lastCommittedTerm
// updated when leader.heartbeat
mTerm int64
// updated when leader.heartbeat
mLeaderHeartbeatClock int64
mVotedLeaderID string
mVotedTimestamp int64
}
const feStart string = "follower.Start"
const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"
func newFollowerState(ctx IRaftLSM) IRaftState {it := new(tFollowerState)
it.init(ctx)
return it
}
func (me *tFollowerState) init(ctx IRaftLSM) {me.mInitOnce.Do(func() {
me.context = ctx
me.mTerm = ctx.store().LastCommittedTerm()
me.mLeaderHeartbeatClock = 0
me.initEventHandlers()})
}
func (me *tFollowerState) initEventHandlers() {
me.hook(feStart,
me.whenStartThenBeginWatchLeaderTimeout)
me.hook(feLeaderHeartbeatTimeout,
me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
}
func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(feStart)
})
}
func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {go func() {
iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
for range time.Tick(iCheckingTimeoutInterval) {now := time.Now().UnixNano()
if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {me.raise(feLeaderHeartbeatTimeout)
return
}
}
}()}
func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {panic("implements me")
}
func (me *tFollowerState) Role() roles.RaftRole {return roles.Follower}
// Heartbeat leader to follower
func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.HBTermMismatch
ret.Term = me.mTerm
return nil
} else if cmd.Term > me.mTerm {
// new leader
me.mTerm = cmd.Term
}
// update heartbeat clock and return
me.mLeaderHeartbeatClock = time.Now().UnixNano()
ret.Code = rpc.HBOk
return nil
}
// AppendLog leader to follower
func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
ret.Term = me.mTerm
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.ALTermMismatch
return nil
}
store := me.context.store()
entry := cmd.Entry
// check log: expecting appending action follows previous committing action
if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
// check log
e, log := store.GetLog(entry.Index)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
}
if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
// bad log
ret.Code = rpc.ALIndexMismatch
ret.PrevLogIndex = store.LastCommittedIndex()
ret.PrevLogTerm = store.LastCommittedTerm()
return nil
}
// good log, but old, just ignore it
ret.Code = rpc.ALOk
return nil
}
// good log
e := store.Append(entry)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
} else {
ret.Code = rpc.ALOk
return nil
}
}
// CommitLog leader to follower
func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {store := me.context.store()
if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
// bad index
ret.Code = rpc.CLLogNotFound
return nil
}
e := store.Commit(cmd.Index)
if e != nil {
ret.Code = rpc.CLInternalError
return nil
}
ret.Code = rpc.CLOk
return nil
}
// RequestVote candidate to follower
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {panic("implements me")
}
(未完待续)