手撸golang etcd raft协定之2

缘起

最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft算法的基本操作只需2种RPC即可实现。RequestVote RPC是在选举过程中通过旧的Leader触发的,AppendEntries RPC是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。

指标

  • 依据raft协定,实现高可用分布式强统一的kv存储

子目标(Day 2)

  • 定义raft rpc接口
  • 定义raft lsm无限状态自动机接口(状态模式)

设计

  • config/IRaftConfig.go: 集群配置接口。简略起见, 应用动态配置模式定义节点数量和地址。
  • config/IRaftNodeConfig.go: 节点配置接口
  • roles/roles.go:raft三种角色常量
  • timeout/timeout.go:超时工夫常量
  • rpc/IRaftRPC.go: raft协定的根本RPC接口及参数定义。简略起见,拟采纳net/rpc实现之。
  • rpc/IRaftRPCServer.go: 反对raft协定的服务器接口。简略起见,拟采纳net/rpc实现之。
  • lsm/IRaftLSM.go: raft无限状态机接口
  • lsm/IRaftState.go: 状态接口
  • lsm/tRaftStateBase.go: 根本状态数据
  • lsm/tFollowerState: follower状态的实现,未实现

config/IRaftConfig.go

集群配置接口。简略起见, 应用动态配置模式定义节点数量和地址。

package configtype IRaftConfig interface {    ID() string    Nodes() []IRaftNodeConfig}

config/IRaftNodeConfig.go

节点配置接口

package configtype IRaftNodeConfig interface {    ID() string    Endpoint() string}

roles/roles.go

raft三种角色常量

package rolestype RaftRole intconst Follower RaftRole = 1const Candidate RaftRole = 2const Leader RaftRole = 3

timeout/timeout.go

超时工夫常量

package timeoutimport "time"const HeartbeatInterval = 150 * time.Millisecondconst HeartbeatTimeout = 5 * HeartbeatIntervalconst ElectionTimeout = HeartbeatTimeout

rpc/IRaftRPC.go

raft协定的根本RPC接口及参数定义。简略起见,拟采纳net/rpc实现之。

package rpctype IRaftRPC interface {    RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error    AppendEntries(cmd *AppendEntriesCmd, ret *AppendEntriesRet) error}type RequestVoteCmd struct {    CandidateID  string    Term         int    LastLogIndex int    LastLogTerm  int}type RequestVoteRet struct {    Term        int    VoteGranted bool}type AppendEntriesCmd struct {    Term         int    LeaderID     string    PrevLogTerm  int    PrevLogIndex int    LeaderCommit int    Entries      []*LogEntry}type LogEntry struct {    Tag     int    Content []byte}type AppendEntriesRet struct {    Term    int    Success bool}

rpc/IRaftRPCServer.go

反对raft协定的服务器接口。简略起见,拟采纳net/rpc实现之。

package rpctype IRaftRPCServer interface {    BeginServeTCP(port int, r IRaftRPC)}

lsm/IRaftLSM.go

raft无限状态机接口

package lsmimport "learning/gooop/etcd/raft/rpc"// IRaftLSM raft无限状态自动机type IRaftLSM interface {    rpc.IRaftRPC    State() IRaftState}

lsm/IRaftState.go

状态接口

package lsmimport (    "learning/gooop/etcd/raft/roles"    "learning/gooop/etcd/raft/rpc")type IRaftState interface {    rpc.IRaftRPC    Role() roles.RaftRole    Start()}

lsm/tRaftStateBase.go

根本状态数据

package lsmimport (    "learning/gooop/etcd/raft/config"    "learning/gooop/etcd/raft/roles")//type tRaftStateBase struct {    // 以后角色    role roles.RaftRole    // 以后任期号    term int    // leader.id    leaderID string    // 集群配置    cfg config.IRaftConfig}func newRaftStateBase(term int, cfg config.IRaftConfig) *tRaftStateBase {    it := new(tRaftStateBase)    it.init(term, cfg)    return it}// init initialize self, with term and config specifiedfunc (me *tRaftStateBase) init(term int, cfg config.IRaftConfig) {    me.cfg = cfg    me.role = roles.Follower    me.term = term    me.leaderID = ""}func (me *tRaftStateBase) Role() roles.RaftRole {    return me.role}

lsm/tFollowerState

follower状态的实现,未实现

package lsmimport (    "learning/gooop/etcd/raft/config"    "learning/gooop/etcd/raft/timeout"    "sync"    "time")type tFollowerState struct {    tRaftStateBase    mInitOnce  sync.Once    mStartOnce sync.Once    mEventMap  map[tFollowerEvent][]tFollowerEventHandler}type tFollowerEvent intconst evStart tFollowerEvent = 1type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})func newFollowerState(term int, cfg config.IRaftConfig) *tFollowerState {    it := new(tFollowerState)    it.init(term, cfg)    // todo: to implement IRaftState    return it}func (me *tFollowerState) init(term int, cfg config.IRaftConfig) {    me.mInitOnce.Do(func() {        me.tRaftStateBase = *newRaftStateBase(term, cfg)        // init event map        me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)        me.registerEventHandlers()    })}func (me *tFollowerState) registerEventHandlers() {    me.mEventMap[evStart] = []tFollowerEventHandler{        me.afterStartThenBeginWatchLeaderTimeout,    }}func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {    if handlers, ok := me.mEventMap[e]; ok {        for _, it := range handlers {            it(e, args...)        }    }}func (me *tFollowerState) Start() {    me.mStartOnce.Do(func() {        me.raise(evStart)    })}func (me *tFollowerState) afterStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args ...interface{}) {    go func() {        iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3        for range time.Tick(iCheckingTimeoutInterval) {            // todo: watch leader.AppendEntries rpc timeout        }    }()}

(未完待续)