共计 5216 个字符,预计需要花费 14 分钟才能阅读完成。
手撸 golang etcd raft 协定之 4
缘起
最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop
raft 分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft 算法的基本操作只需 2 种 RPC 即可实现。RequestVote RPC 是在选举过程中通过旧的 Leader 触发的,AppendEntries RPC 是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。
指标
- 依据 raft 协定,实现高可用分布式强统一的 kv 存储
子目标(Day 4)
-
应用 boltdb 存储操作日志和 kv 键值数据
- unstable 存储桶:已收到未提交的日志,重启后清空
- committed 存储桶:已提交的日志
- data 存储桶:kv 键值数据
- meta 存储桶:记录末次提交的 index 和 term
设计
- model/LogEntry: 日志条目
- ICmd:操作指令接口
- ICmdFactory:操作指令工厂
- ILogStore:日志存储接口
- tCmdBase:指令基类
- PutCmd:put 指令
- DelCmd:del 指令
- tBoltDBStore:基于 boltdb 实现日志暂存,提交和利用
LogEntry.go
日志条目
package model
import "encoding/json"
type LogEntry struct {
Tag int
Term int64
Index int64
PrevTerm int64
PrevIndex int64
Command []byte}
func (me *LogEntry) Marshal() (error, []byte) {j, e := json.Marshal(me)
if e != nil {return e, nil}
return nil, j
}
func (me *LogEntry) Unmarshal(data []byte) error {return json.Unmarshal(data, me)
}
ICmd.go
操作指令接口
package store
import "github.com/boltdb/bolt"
type ICmd interface {Marshal() []byte
Unmarshal(data []byte)
Apply(tx *bolt.Tx) error
}
ICmdFactory.go
操作指令工厂
package store
import "fmt"
type ICmdFactory interface {OfTag(tag int) ICmd
Tag(cmd ICmd) int
}
type tDefaultCmdFactory int
const gPutCmdTag = 1
const gDelCmdTag = 2
func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {
switch tag {
case gPutCmdTag:
return new(PutCmd)
case gDelCmdTag:
return new(DelCmd)
}
panic(fmt.Sprintf("unknown tag: %d", tag))
}
func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {if _, ok := cmd.(*PutCmd); ok {return gPutCmdTag}
if _, ok := cmd.(*DelCmd); ok {return gDelCmdTag}
panic(fmt.Sprintf("unknown cmd: %v", cmd))
}
var gCmdFactory = new(tDefaultCmdFactory)
ILogStore.go
日志存储接口
package store
import "learning/gooop/etcd/raft/model"
type ILogStore interface {Term() int64
Index() int64
Append(entry *model.LogEntry) error
Commit(index int64) error
}
tCmdBase.go
指令基类
package store
import "encoding/json"
type tCmdBase struct {
}
func (me *tCmdBase) Marshal() []byte {j, e := json.Marshal(me)
if e != nil {return nil}
return j
}
func (me *tCmdBase) Unmarshal(data []byte) {_ = json.Unmarshal(data, me)
}
PutCmd.go
put 指令
package store
import "github.com/boltdb/bolt"
type PutCmd struct {
tCmdBase
Key string
Value []byte}
func (me *PutCmd) Apply(tx *bolt.Tx) error {b := tx.Bucket(gDataBucket)
return b.Put([]byte(me.Key), me.Value)
}
DelCmd.go
del 指令
package store
import "github.com/boltdb/bolt"
type DelCmd struct {
tCmdBase
Key string
}
func (me *DelCmd) Apply(tx *bolt.Tx) error {b := tx.Bucket(gDataBucket)
return b.Delete([]byte(me.Key))
}
tBoltDBStore.go
基于 boltdb 实现日志暂存,提交和利用
package store
import (
"bytes"
"encoding/binary"
"errors"
"github.com/boltdb/bolt"
"learning/gooop/etcd/raft/model"
)
type tBoltDBStore struct {
file string
term int64
index int64
db bolt.DB
}
func NewBoltStore(file string) (error, ILogStore) {db, err := bolt.Open(file, 0600, nil)
if err != nil {return err, nil}
store := new(tBoltDBStore)
err = db.Update(func(tx *bolt.Tx) error {b, e := tx.CreateBucketIfNotExists(gMetaBucket)
if e != nil {return e}
v := b.Get(gKeyCommittedTerm)
if v == nil {e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
if e != nil {return e}
store.term = gDefaultTerm
} else {store.term = bytesToInt64(v)
}
v = b.Get(gKeyCommittedIndex)
if v == nil {e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
if e != nil {return e}
store.index = gDefaultIndex
} else {store.index = bytesToInt64(v)
}
b, e = tx.CreateBucketIfNotExists(gDataBucket)
if e != nil {return e}
e = tx.DeleteBucket(gUnstableBucket)
if e != nil {return e}
_, e = tx.CreateBucket(gUnstableBucket)
if e != nil {return e}
_, e = tx.CreateBucketIfNotExists(gCommittedBucket)
if e != nil {return e}
return nil
})
if err != nil {return err, nil}
return nil, store
}
func int64ToBytes(i int64) []byte {buf := bytes.NewBuffer(make([]byte, 8))
_ = binary.Write(buf, binary.BigEndian, i)
return buf.Bytes()}
func bytesToInt64(data []byte) int64 {
var i int64
buf := bytes.NewBuffer(data)
_ = binary.Read(buf, binary.BigEndian, &i)
return i
}
func (me *tBoltDBStore) Term() int64 {return me.term}
func (me *tBoltDBStore) Index() int64 {return me.index}
func (me *tBoltDBStore) Append(entry *model.LogEntry) error {cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e, entryData := entry.Marshal()
if e != nil {return e}
return me.db.Update(func(tx *bolt.Tx) error {
// save log to unstable
b := tx.Bucket(gUnstableBucket)
e = b.Put(int64ToBytes(entry.Index), entryData)
if e != nil {return e}
me.index = entry.Index
me.term = entry.Term
return nil
})
}
func (me *tBoltDBStore) Commit(index int64) error {return me.db.Update(func(tx *bolt.Tx) error {
// read unstable log
ub := tx.Bucket(gUnstableBucket)
k := int64ToBytes(index)
data := ub.Get(k)
if data == nil {return gErrorCommitLogNotFound}
entry := new(model.LogEntry)
e := entry.Unmarshal(data)
if e != nil {return e}
// apply cmd
cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e = cmd.Apply(tx)
if e != nil {return e}
// save to committed log
cb := tx.Bucket(gCommittedBucket)
e = cb.Put(k, data)
if e != nil {return e}
// update committed.index, committed.term
mb := tx.Bucket(gMetaBucket)
e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
if e != nil {return e}
e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
if e != nil {return e}
// del unstable.index
e = ub.Delete(k)
if e != nil {return e}
return nil
})
}
var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")
var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")
var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0
var gErrorCommitLogNotFound = errors.New("committing log not found")
(未完待续)
正文完