手撸golang etcd raft协定之4

缘起

最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可实现。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。

指标

  • 依据raft协定,实现高可用分布式强统一的kv存储

子目标(Day 4)

  • 应用boltdb存储操作日志和kv键值数据

    • unstable存储桶:已收到未提交的日志,重启后清空
    • committed存储桶:已提交的日志
    • data存储桶:kv键值数据
    • meta存储桶:记录末次提交的index和term

设计

  • model/LogEntry: 日志条目
  • ICmd:操作指令接口
  • ICmdFactory:操作指令工厂
  • ILogStore:日志存储接口
  • tCmdBase:指令基类
  • PutCmd:put指令
  • DelCmd:del指令
  • tBoltDBStore:基于boltdb实现日志暂存,提交和利用

LogEntry.go

日志条目

package modelimport "encoding/json"type LogEntry struct {    Tag       int    Term      int64    Index     int64    PrevTerm  int64    PrevIndex int64    Command   []byte}func (me *LogEntry) Marshal() (error, []byte) {    j, e := json.Marshal(me)    if e != nil {        return e, nil    }    return nil, j}func (me *LogEntry) Unmarshal(data []byte) error {    return json.Unmarshal(data, me)}

ICmd.go

操作指令接口

package storeimport "github.com/boltdb/bolt"type ICmd interface {    Marshal() []byte    Unmarshal(data []byte)    Apply(tx *bolt.Tx) error}

ICmdFactory.go

操作指令工厂

package storeimport "fmt"type ICmdFactory interface {    OfTag(tag int) ICmd    Tag(cmd ICmd) int}type tDefaultCmdFactory intconst gPutCmdTag = 1const gDelCmdTag = 2func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {    switch tag {    case gPutCmdTag:        return new(PutCmd)    case gDelCmdTag:        return new(DelCmd)    }    panic(fmt.Sprintf("unknown tag: %d", tag))}func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {    if _, ok := cmd.(*PutCmd); ok {        return gPutCmdTag    }    if _, ok := cmd.(*DelCmd); ok {        return gDelCmdTag    }    panic(fmt.Sprintf("unknown cmd: %v", cmd))}var gCmdFactory = new(tDefaultCmdFactory)

ILogStore.go

日志存储接口

package storeimport "learning/gooop/etcd/raft/model"type ILogStore interface {    Term() int64    Index() int64    Append(entry *model.LogEntry) error    Commit(index int64) error}

tCmdBase.go

指令基类

package storeimport "encoding/json"type tCmdBase struct {}func (me *tCmdBase) Marshal() []byte {    j, e := json.Marshal(me)    if e != nil {        return nil    }    return j}func (me *tCmdBase) Unmarshal(data []byte) {    _ = json.Unmarshal(data, me)}

PutCmd.go

put指令

package storeimport "github.com/boltdb/bolt"type PutCmd struct {    tCmdBase    Key   string    Value []byte}func (me *PutCmd) Apply(tx *bolt.Tx) error {    b := tx.Bucket(gDataBucket)    return b.Put([]byte(me.Key), me.Value)}

DelCmd.go

del指令

package storeimport "github.com/boltdb/bolt"type DelCmd struct {    tCmdBase    Key string}func (me *DelCmd) Apply(tx *bolt.Tx) error {    b := tx.Bucket(gDataBucket)    return b.Delete([]byte(me.Key))}

tBoltDBStore.go

基于boltdb实现日志暂存,提交和利用

package storeimport (    "bytes"    "encoding/binary"    "errors"    "github.com/boltdb/bolt"    "learning/gooop/etcd/raft/model")type tBoltDBStore struct {    file  string    term  int64    index int64    db bolt.DB}func NewBoltStore(file string) (error, ILogStore) {    db, err := bolt.Open(file, 0600, nil)    if err != nil {        return err, nil    }    store := new(tBoltDBStore)    err = db.Update(func(tx *bolt.Tx) error {        b, e := tx.CreateBucketIfNotExists(gMetaBucket)        if e != nil {            return e        }        v := b.Get(gKeyCommittedTerm)        if v == nil {            e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))            if e != nil {                return e            }            store.term = gDefaultTerm        } else {            store.term = bytesToInt64(v)        }        v = b.Get(gKeyCommittedIndex)        if v == nil {            e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))            if e != nil {                return e            }            store.index = gDefaultIndex        } else {            store.index = bytesToInt64(v)        }        b, e = tx.CreateBucketIfNotExists(gDataBucket)        if e != nil {            return e        }        e = tx.DeleteBucket(gUnstableBucket)        if e != nil {            return e        }        _, e = tx.CreateBucket(gUnstableBucket)        if e != nil {            return e        }        _, e = tx.CreateBucketIfNotExists(gCommittedBucket)        if e != nil {            return e        }        return nil    })    if err != nil {        return err, nil    }    return nil, store}func int64ToBytes(i int64) []byte {    buf := bytes.NewBuffer(make([]byte, 8))    _ = binary.Write(buf, binary.BigEndian, i)    return buf.Bytes()}func bytesToInt64(data []byte) int64 {    var i int64    buf := bytes.NewBuffer(data)    _ = binary.Read(buf, binary.BigEndian, &i)    return i}func (me *tBoltDBStore) Term() int64 {    return me.term}func (me *tBoltDBStore) Index() int64 {    return me.index}func (me *tBoltDBStore) Append(entry *model.LogEntry) error {    cmd := gCmdFactory.OfTag(entry.Tag)    cmd.Unmarshal(entry.Command)    e, entryData := entry.Marshal()    if e != nil {        return e    }    return me.db.Update(func(tx *bolt.Tx) error {        // save log to unstable        b := tx.Bucket(gUnstableBucket)        e = b.Put(int64ToBytes(entry.Index), entryData)        if e != nil {            return e        }        me.index = entry.Index        me.term = entry.Term        return nil    })}func (me *tBoltDBStore) Commit(index int64) error {    return me.db.Update(func(tx *bolt.Tx) error {        // read unstable log        ub := tx.Bucket(gUnstableBucket)        k := int64ToBytes(index)        data := ub.Get(k)        if data == nil {            return gErrorCommitLogNotFound        }        entry := new(model.LogEntry)        e := entry.Unmarshal(data)        if e != nil {            return e        }        // apply cmd        cmd := gCmdFactory.OfTag(entry.Tag)        cmd.Unmarshal(entry.Command)        e = cmd.Apply(tx)        if e != nil {            return e        }        // save to committed log        cb := tx.Bucket(gCommittedBucket)        e = cb.Put(k, data)        if e != nil {            return e        }        // update committed.index, committed.term        mb := tx.Bucket(gMetaBucket)        e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))        if e != nil {            return e        }        e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))        if e != nil {            return e        }        // del unstable.index        e = ub.Delete(k)        if e != nil {            return e        }        return nil    })}var gMetaBucket = []byte("meta")var gUnstableBucket = []byte("unstable")var gCommittedBucket = []byte("committed")var gDataBucket = []byte("data")var gKeyCommittedIndex = []byte("committed.index")var gKeyCommittedTerm = []byte("committed.term")var gDefaultTerm int64 = 0var gDefaultIndex int64 = 0var gErrorCommitLogNotFound = errors.New("committing log not found")

(未完待续)