共计 4235 个字符,预计需要花费 11 分钟才能阅读完成。
手撸 golang etcd raft 协定之 2
缘起
最近浏览 [云原生分布式存储基石:etcd 深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop
raft 分布式一致性算法
分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft 算法把问题分解成了首领选举(leader election)、日志复制(log replication)、安全性(safety)和成员关系变动(membership changes)这几个子问题。Raft 算法的基本操作只需 2 种 RPC 即可实现。RequestVote RPC 是在选举过程中通过旧的 Leader 触发的,AppendEntries RPC 是领导人触发的,目标是向其余节点复制日志条目和发送心跳(heartbeat)。
指标
- 依据 raft 协定,实现高可用分布式强统一的 kv 存储
子目标 (Day 2)
- 定义 raft rpc 接口
- 定义 raft lsm 无限状态自动机接口(状态模式)
设计
- config/IRaftConfig.go: 集群配置接口。简略起见,应用动态配置模式定义节点数量和地址。
- config/IRaftNodeConfig.go: 节点配置接口
- roles/roles.go:raft 三种角色常量
- timeout/timeout.go:超时工夫常量
- rpc/IRaftRPC.go: raft 协定的根本 RPC 接口及参数定义。简略起见,拟采纳 net/rpc 实现之。
- rpc/IRaftRPCServer.go: 反对 raft 协定的服务器接口。简略起见,拟采纳 net/rpc 实现之。
- lsm/IRaftLSM.go: raft 无限状态机接口
- lsm/IRaftState.go: 状态接口
- lsm/tRaftStateBase.go: 根本状态数据
- lsm/tFollowerState: follower 状态的实现,未实现
config/IRaftConfig.go
集群配置接口。简略起见,应用动态配置模式定义节点数量和地址。
package config
type IRaftConfig interface {ID() string
Nodes() []IRaftNodeConfig
}
config/IRaftNodeConfig.go
节点配置接口
package config
type IRaftNodeConfig interface {ID() string
Endpoint() string}
roles/roles.go
raft 三种角色常量
package roles
type RaftRole int
const Follower RaftRole = 1
const Candidate RaftRole = 2
const Leader RaftRole = 3
timeout/timeout.go
超时工夫常量
package timeout
import "time"
const HeartbeatInterval = 150 * time.Millisecond
const HeartbeatTimeout = 5 * HeartbeatInterval
const ElectionTimeout = HeartbeatTimeout
rpc/IRaftRPC.go
raft 协定的根本 RPC 接口及参数定义。简略起见,拟采纳 net/rpc 实现之。
package rpc
type IRaftRPC interface {RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
AppendEntries(cmd *AppendEntriesCmd, ret *AppendEntriesRet) error
}
type RequestVoteCmd struct {
CandidateID string
Term int
LastLogIndex int
LastLogTerm int
}
type RequestVoteRet struct {
Term int
VoteGranted bool
}
type AppendEntriesCmd struct {
Term int
LeaderID string
PrevLogTerm int
PrevLogIndex int
LeaderCommit int
Entries []*LogEntry}
type LogEntry struct {
Tag int
Content []byte}
type AppendEntriesRet struct {
Term int
Success bool
}
rpc/IRaftRPCServer.go
反对 raft 协定的服务器接口。简略起见,拟采纳 net/rpc 实现之。
package rpc
type IRaftRPCServer interface {BeginServeTCP(port int, r IRaftRPC)
}
lsm/IRaftLSM.go
raft 无限状态机接口
package lsm
import "learning/gooop/etcd/raft/rpc"
// IRaftLSM raft 无限状态自动机
type IRaftLSM interface {
rpc.IRaftRPC
State() IRaftState}
lsm/IRaftState.go
状态接口
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
)
type IRaftState interface {
rpc.IRaftRPC
Role() roles.RaftRole
Start()}
lsm/tRaftStateBase.go
根本状态数据
package lsm
import (
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/roles"
)
//
type tRaftStateBase struct {
// 以后角色
role roles.RaftRole
// 以后任期号
term int
// leader.id
leaderID string
// 集群配置
cfg config.IRaftConfig
}
func newRaftStateBase(term int, cfg config.IRaftConfig) *tRaftStateBase {it := new(tRaftStateBase)
it.init(term, cfg)
return it
}
// init initialize self, with term and config specified
func (me *tRaftStateBase) init(term int, cfg config.IRaftConfig) {
me.cfg = cfg
me.role = roles.Follower
me.term = term
me.leaderID = ""
}
func (me *tRaftStateBase) Role() roles.RaftRole {return me.role}
lsm/tFollowerState
follower 状态的实现,未实现
package lsm
import (
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
type tFollowerState struct {
tRaftStateBase
mInitOnce sync.Once
mStartOnce sync.Once
mEventMap map[tFollowerEvent][]tFollowerEventHandler}
type tFollowerEvent int
const evStart tFollowerEvent = 1
type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})
func newFollowerState(term int, cfg config.IRaftConfig) *tFollowerState {it := new(tFollowerState)
it.init(term, cfg)
// todo: to implement IRaftState
return it
}
func (me *tFollowerState) init(term int, cfg config.IRaftConfig) {me.mInitOnce.Do(func() {me.tRaftStateBase = *newRaftStateBase(term, cfg)
// init event map
me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)
me.registerEventHandlers()})
}
func (me *tFollowerState) registerEventHandlers() {me.mEventMap[evStart] = []tFollowerEventHandler{me.afterStartThenBeginWatchLeaderTimeout,}
}
func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {if handlers, ok := me.mEventMap[e]; ok {
for _, it := range handlers {it(e, args...)
}
}
}
func (me *tFollowerState) Start() {me.mStartOnce.Do(func() {me.raise(evStart)
})
}
func (me *tFollowerState) afterStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args ...interface{}) {go func() {
iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
for range time.Tick(iCheckingTimeoutInterval) {// todo: watch leader.AppendEntries rpc timeout}
}()}
(未完待续)
正文完