关于golang:手撸golang-etcd-raft协议之4

55次阅读

共计 5216 个字符,预计需要花费 14 分钟才能阅读完成。

手撸 golang etcd raft 协定之 4

缘起

最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop

raft 分布式一致性算法

 分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft 算法的基本操作只需 2 种 RPC 即可实现。RequestVote RPC 是在选举过程中通过旧的 Leader 触发的,AppendEntries RPC 是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。

指标

  • 依据 raft 协定,实现高可用分布式强统一的 kv 存储

子目标(Day 4)

  • 应用 boltdb 存储操作日志和 kv 键值数据

    • unstable 存储桶:已收到未提交的日志,重启后清空
    • committed 存储桶:已提交的日志
    • data 存储桶:kv 键值数据
    • meta 存储桶:记录末次提交的 index 和 term

设计

  • model/LogEntry: 日志条目
  • ICmd:操作指令接口
  • ICmdFactory:操作指令工厂
  • ILogStore:日志存储接口
  • tCmdBase:指令基类
  • PutCmd:put 指令
  • DelCmd:del 指令
  • tBoltDBStore:基于 boltdb 实现日志暂存,提交和利用

LogEntry.go

日志条目

package model

import "encoding/json"

type LogEntry struct {
    Tag       int
    Term      int64
    Index     int64
    PrevTerm  int64
    PrevIndex int64
    Command   []byte}

func (me *LogEntry) Marshal() (error, []byte) {j, e := json.Marshal(me)
    if e != nil {return e, nil}
    return nil, j
}

func (me *LogEntry) Unmarshal(data []byte) error {return json.Unmarshal(data, me)
}

ICmd.go

操作指令接口

package store

import "github.com/boltdb/bolt"

type ICmd interface {Marshal() []byte
    Unmarshal(data []byte)
    Apply(tx *bolt.Tx) error
}

ICmdFactory.go

操作指令工厂

package store

import "fmt"

type ICmdFactory interface {OfTag(tag int) ICmd
    Tag(cmd ICmd) int
}

type tDefaultCmdFactory int

const gPutCmdTag = 1
const gDelCmdTag = 2

func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {
    switch tag {
    case gPutCmdTag:
        return new(PutCmd)

    case gDelCmdTag:
        return new(DelCmd)
    }

    panic(fmt.Sprintf("unknown tag: %d", tag))
}

func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {if _, ok := cmd.(*PutCmd); ok {return gPutCmdTag}

    if _, ok := cmd.(*DelCmd); ok {return gDelCmdTag}

    panic(fmt.Sprintf("unknown cmd: %v", cmd))
}

var gCmdFactory = new(tDefaultCmdFactory)

ILogStore.go

日志存储接口

package store

import "learning/gooop/etcd/raft/model"

type ILogStore interface {Term() int64
    Index() int64
    Append(entry *model.LogEntry) error
    Commit(index int64) error
}

tCmdBase.go

指令基类

package store

import "encoding/json"

type tCmdBase struct {
}

func (me *tCmdBase) Marshal() []byte {j, e := json.Marshal(me)
    if e != nil {return nil}
    return j
}

func (me *tCmdBase) Unmarshal(data []byte) {_ = json.Unmarshal(data, me)
}

PutCmd.go

put 指令

package store

import "github.com/boltdb/bolt"

type PutCmd struct {
    tCmdBase

    Key   string
    Value []byte}

func (me *PutCmd) Apply(tx *bolt.Tx) error {b := tx.Bucket(gDataBucket)
    return b.Put([]byte(me.Key), me.Value)
}

DelCmd.go

del 指令

package store

import "github.com/boltdb/bolt"

type DelCmd struct {
    tCmdBase

    Key string
}

func (me *DelCmd) Apply(tx *bolt.Tx) error {b := tx.Bucket(gDataBucket)
    return b.Delete([]byte(me.Key))
}

tBoltDBStore.go

基于 boltdb 实现日志暂存,提交和利用

package store

import (
    "bytes"
    "encoding/binary"
    "errors"
    "github.com/boltdb/bolt"
    "learning/gooop/etcd/raft/model"
)

type tBoltDBStore struct {
    file  string
    term  int64
    index int64

    db bolt.DB
}

func NewBoltStore(file string) (error, ILogStore) {db, err := bolt.Open(file, 0600, nil)
    if err != nil {return err, nil}

    store := new(tBoltDBStore)
    err = db.Update(func(tx *bolt.Tx) error {b, e := tx.CreateBucketIfNotExists(gMetaBucket)
        if e != nil {return e}

        v := b.Get(gKeyCommittedTerm)
        if v == nil {e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
            if e != nil {return e}
            store.term = gDefaultTerm

        } else {store.term = bytesToInt64(v)
        }

        v = b.Get(gKeyCommittedIndex)
        if v == nil {e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
            if e != nil {return e}
            store.index = gDefaultIndex

        } else {store.index = bytesToInt64(v)
        }

        b, e = tx.CreateBucketIfNotExists(gDataBucket)
        if e != nil {return e}

        e = tx.DeleteBucket(gUnstableBucket)
        if e != nil {return e}
        _, e = tx.CreateBucket(gUnstableBucket)
        if e != nil {return e}

        _, e = tx.CreateBucketIfNotExists(gCommittedBucket)
        if e != nil {return e}

        return nil
    })

    if err != nil {return err, nil}

    return nil, store
}

func int64ToBytes(i int64) []byte {buf := bytes.NewBuffer(make([]byte, 8))
    _ = binary.Write(buf, binary.BigEndian, i)
    return buf.Bytes()}

func bytesToInt64(data []byte) int64 {
    var i int64
    buf := bytes.NewBuffer(data)
    _ = binary.Read(buf, binary.BigEndian, &i)
    return i
}

func (me *tBoltDBStore) Term() int64 {return me.term}

func (me *tBoltDBStore) Index() int64 {return me.index}

func (me *tBoltDBStore) Append(entry *model.LogEntry) error {cmd := gCmdFactory.OfTag(entry.Tag)
    cmd.Unmarshal(entry.Command)

    e, entryData := entry.Marshal()
    if e != nil {return e}

    return me.db.Update(func(tx *bolt.Tx) error {
        // save log to unstable
        b := tx.Bucket(gUnstableBucket)
        e = b.Put(int64ToBytes(entry.Index), entryData)
        if e != nil {return e}

        me.index = entry.Index
        me.term = entry.Term

        return nil
    })
}

func (me *tBoltDBStore) Commit(index int64) error {return me.db.Update(func(tx *bolt.Tx) error {
        // read unstable log
        ub := tx.Bucket(gUnstableBucket)
        k := int64ToBytes(index)
        data := ub.Get(k)
        if data == nil {return gErrorCommitLogNotFound}

        entry := new(model.LogEntry)
        e := entry.Unmarshal(data)
        if e != nil {return e}

        // apply cmd
        cmd := gCmdFactory.OfTag(entry.Tag)
        cmd.Unmarshal(entry.Command)
        e = cmd.Apply(tx)
        if e != nil {return e}

        // save to committed log
        cb := tx.Bucket(gCommittedBucket)
        e = cb.Put(k, data)
        if e != nil {return e}

        // update committed.index, committed.term
        mb := tx.Bucket(gMetaBucket)
        e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
        if e != nil {return e}

        e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
        if e != nil {return e}

        // del unstable.index
        e = ub.Delete(k)
        if e != nil {return e}

        return nil
    })
}

var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")

var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")

var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0

var gErrorCommitLogNotFound = errors.New("committing log not found")

(未完待续)

正文完
 0