手撸golang etcd raft协定之4
缘起
最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可实现。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。
指标
- 依据raft协定,实现高可用分布式强统一的kv存储
子目标(Day 4)
应用boltdb存储操作日志和kv键值数据
- unstable存储桶:已收到未提交的日志,重启后清空
- committed存储桶:已提交的日志
- data存储桶:kv键值数据
- meta存储桶:记录末次提交的index和term
设计
- model/LogEntry: 日志条目
- ICmd:操作指令接口
- ICmdFactory:操作指令工厂
- ILogStore:日志存储接口
- tCmdBase:指令基类
- PutCmd:put指令
- DelCmd:del指令
- tBoltDBStore:基于boltdb实现日志暂存,提交和利用
LogEntry.go
日志条目
package modelimport "encoding/json"type LogEntry struct { Tag int Term int64 Index int64 PrevTerm int64 PrevIndex int64 Command []byte}func (me *LogEntry) Marshal() (error, []byte) { j, e := json.Marshal(me) if e != nil { return e, nil } return nil, j}func (me *LogEntry) Unmarshal(data []byte) error { return json.Unmarshal(data, me)}
ICmd.go
操作指令接口
package storeimport "github.com/boltdb/bolt"type ICmd interface { Marshal() []byte Unmarshal(data []byte) Apply(tx *bolt.Tx) error}
ICmdFactory.go
操作指令工厂
package storeimport "fmt"type ICmdFactory interface { OfTag(tag int) ICmd Tag(cmd ICmd) int}type tDefaultCmdFactory intconst gPutCmdTag = 1const gDelCmdTag = 2func (me *tDefaultCmdFactory) OfTag(tag int) ICmd { switch tag { case gPutCmdTag: return new(PutCmd) case gDelCmdTag: return new(DelCmd) } panic(fmt.Sprintf("unknown tag: %d", tag))}func (me *tDefaultCmdFactory) Tag(cmd ICmd) int { if _, ok := cmd.(*PutCmd); ok { return gPutCmdTag } if _, ok := cmd.(*DelCmd); ok { return gDelCmdTag } panic(fmt.Sprintf("unknown cmd: %v", cmd))}var gCmdFactory = new(tDefaultCmdFactory)
ILogStore.go
日志存储接口
package storeimport "learning/gooop/etcd/raft/model"type ILogStore interface { Term() int64 Index() int64 Append(entry *model.LogEntry) error Commit(index int64) error}
tCmdBase.go
指令基类
package storeimport "encoding/json"type tCmdBase struct {}func (me *tCmdBase) Marshal() []byte { j, e := json.Marshal(me) if e != nil { return nil } return j}func (me *tCmdBase) Unmarshal(data []byte) { _ = json.Unmarshal(data, me)}
PutCmd.go
put指令
package storeimport "github.com/boltdb/bolt"type PutCmd struct { tCmdBase Key string Value []byte}func (me *PutCmd) Apply(tx *bolt.Tx) error { b := tx.Bucket(gDataBucket) return b.Put([]byte(me.Key), me.Value)}
DelCmd.go
del指令
package storeimport "github.com/boltdb/bolt"type DelCmd struct { tCmdBase Key string}func (me *DelCmd) Apply(tx *bolt.Tx) error { b := tx.Bucket(gDataBucket) return b.Delete([]byte(me.Key))}
tBoltDBStore.go
基于boltdb实现日志暂存,提交和利用
package storeimport ( "bytes" "encoding/binary" "errors" "github.com/boltdb/bolt" "learning/gooop/etcd/raft/model")type tBoltDBStore struct { file string term int64 index int64 db bolt.DB}func NewBoltStore(file string) (error, ILogStore) { db, err := bolt.Open(file, 0600, nil) if err != nil { return err, nil } store := new(tBoltDBStore) err = db.Update(func(tx *bolt.Tx) error { b, e := tx.CreateBucketIfNotExists(gMetaBucket) if e != nil { return e } v := b.Get(gKeyCommittedTerm) if v == nil { e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm)) if e != nil { return e } store.term = gDefaultTerm } else { store.term = bytesToInt64(v) } v = b.Get(gKeyCommittedIndex) if v == nil { e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex)) if e != nil { return e } store.index = gDefaultIndex } else { store.index = bytesToInt64(v) } b, e = tx.CreateBucketIfNotExists(gDataBucket) if e != nil { return e } e = tx.DeleteBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucketIfNotExists(gCommittedBucket) if e != nil { return e } return nil }) if err != nil { return err, nil } return nil, store}func int64ToBytes(i int64) []byte { buf := bytes.NewBuffer(make([]byte, 8)) _ = binary.Write(buf, binary.BigEndian, i) return buf.Bytes()}func bytesToInt64(data []byte) int64 { var i int64 buf := bytes.NewBuffer(data) _ = binary.Read(buf, binary.BigEndian, &i) return i}func (me *tBoltDBStore) Term() int64 { return me.term}func (me *tBoltDBStore) Index() int64 { return me.index}func (me *tBoltDBStore) Append(entry *model.LogEntry) error { cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e, entryData := entry.Marshal() if e != nil { return e } return me.db.Update(func(tx *bolt.Tx) error { // save log to unstable b := tx.Bucket(gUnstableBucket) e = b.Put(int64ToBytes(entry.Index), entryData) if e != nil { return e } me.index = entry.Index me.term = entry.Term return nil })}func (me *tBoltDBStore) Commit(index int64) error { return me.db.Update(func(tx *bolt.Tx) error { // read unstable log ub := tx.Bucket(gUnstableBucket) k := int64ToBytes(index) data := ub.Get(k) if data == nil { return gErrorCommitLogNotFound } entry := new(model.LogEntry) e := entry.Unmarshal(data) if e != nil { return e } // apply cmd cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e = cmd.Apply(tx) if e != nil { return e } // save to committed log cb := tx.Bucket(gCommittedBucket) e = cb.Put(k, data) if e != nil { return e } // update committed.index, committed.term mb := tx.Bucket(gMetaBucket) e = mb.Put(gKeyCommittedIndex, int64ToBytes(index)) if e != nil { return e } e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term)) if e != nil { return e } // del unstable.index e = ub.Delete(k) if e != nil { return e } return nil })}var gMetaBucket = []byte("meta")var gUnstableBucket = []byte("unstable")var gCommittedBucket = []byte("committed")var gDataBucket = []byte("data")var gKeyCommittedIndex = []byte("committed.index")var gKeyCommittedTerm = []byte("committed.term")var gDefaultTerm int64 = 0var gDefaultIndex int64 = 0var gErrorCommitLogNotFound = errors.New("committing log not found")
(未完待续)