手撸golang etcd raft协定之7

缘起

最近浏览 [云原生分布式存储基石:etcd深刻解析] (杜军 , 2019.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过保护多个副原本进行容错,以进步零碎的可用性。这就引出了分布式存储系统的外围问题——如何保障多个正本的一致性?Raft算法把问题分解成了四个子问题:1. 首领选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变动(membership changes)这几个子问题。

指标

  • 依据raft协定,实现高可用分布式强统一的kv存储

子目标(Day 7)

  • 实现各raft节点之间的rpc通信

    • 定义IRaftClientService服务,治理所有节点的tcp长连贯
    • 定义IRaftClient接口,封装节点间的rpc调用

      • 基于状态模式,辨别已连贯状态和已断开状态
      • 基于事件驱动的逻辑编排
      • 基于读写拆散的字段治理

设计

  • model/IEventDirvenModel: 事件驱动的逻辑编排基类
  • IRaftClientService:治理所有的节点间rpc连贯
  • IRaftClient:治理以后节点与某个节点间的rpc连贯
  • iClientState:基于状态模式的rpc连贯状态接口
  • iStateContext:状态模式下的连贯状态上下文接口
  • tRaftClient:IRaftClient接口的具体实现,并实现iStateContext接口。
  • tConnectedState: 治理已连贯状态的rpc连贯

    • 定时Ping以检测连贯状态
    • 基于事件驱动的逻辑编排
    • 基于读写拆散的字段治理
  • tBrokenState:治理已断开状态的rpc连贯

    • 定时Dial以尝试重连贯
    • 基于事件驱动的逻辑编排
    • 基于读写拆散的字段治理

model/IEventDirvenModel.go

事件驱动的逻辑编排基类

package modeltype TEventHandleFunc func(e string, args ...interface{})type IEventDrivenModel interface {    hook(e string, handleFunc TEventHandleFunc)    raise(e string, args ...interface{})}type TEventDrivenModel struct {    items map[string][]TEventHandleFunc}func (me *TEventDrivenModel) Hook(e string, handler TEventHandleFunc) {    arr, ok := me.items[e]    if ok {        me.items[e] = append(arr, handler)    } else {        me.items[e] = []TEventHandleFunc{handler}    }}func (me *TEventDrivenModel) Raise(e string, args ...interface{}) {    if handlers, ok := me.items[e]; ok {        for _, it := range handlers {            it(e, args...)        }    }}

IRaftClientService.go

治理所有的节点间rpc连贯

package clientimport (    "learning/gooop/etcd/raft/config"    "learning/gooop/etcd/raft/rpc"    netrpc "net/rpc"    "sync")type tRaftClientService struct {    cfg config.IRaftConfig    rwmutex *sync.RWMutex    clients map[string]IRaftClient}func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {    it := new(tRaftClientService)    it.init(cfg)    return it}func (me *tRaftClientService) init(cfg config.IRaftConfig) {    me.cfg = cfg    me.rwmutex = new(sync.RWMutex)    me.clients = make(map[string]IRaftClient)}func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {    // check client exists?    me.rwmutex.RLock()    it,ok := me.clients[peerID]    if ok {        return action(it)    }    var nodeCfg config.IRaftNodeConfig    for _,it := range me.cfg.Nodes() {        if it.ID() == peerID {            nodeCfg = it            break        }    }    me.rwmutex.RUnlock()    // dial to peer    conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())    if err != nil {        return err    }    // to create new client    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    // recheck client    _,ok = me.clients[peerID]    if ok {        defer conn.Close()        return action(it)    }    // create new client    return action(newRaftClient(nodeCfg, conn))}

IRaftClient.go

治理以后节点与某个节点间的rpc连贯

package clientimport "learning/gooop/etcd/raft/rpc"type IRaftClient interface {    rpc.IRaftRPC    iStateContext    Ping(cmd *PingCmd, ret *PingRet) error}type PingCmd struct {    SenderID string    Timestamp int64}type PingRet struct {    SenderID string    Timestamp int64}

iClientState.go

基于状态模式的rpc连贯状态接口

package clientimport "learning/gooop/etcd/raft/rpc"type iClientState interface {    rpc.IRaftRPC    Start()    Ping(cmd *PingCmd, ret *PingRet) error}

iStateContext.go

状态模式下的连贯状态上下文接口

package clientimport (    "learning/gooop/etcd/raft/config"    "net/rpc")type iStateContext interface {    Config() config.IRaftNodeConfig    GetConn() *rpc.Client    SetConn(client *rpc.Client)    HandleStateChanged(state iClientState)}

tRaftClient.go

IRaftClient接口的具体实现,并实现iStateContext接口。

package clientimport (    "learning/gooop/etcd/raft/config"    "net/rpc"    rrpc "learning/gooop/etcd/raft/rpc")type tRaftClient struct {    cfg config.IRaftNodeConfig    conn *rpc.Client    state iClientState}func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {    it := new(tRaftClient)    it.init(cfg, conn)    return it}func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {    me.cfg = cfg    me.conn = conn}func (me *tRaftClient) Config() config.IRaftNodeConfig {    return me.cfg}func (me *tRaftClient) GetConn() *rpc.Client {    return me.conn}func (me *tRaftClient) SetConn(conn *rpc.Client) {    me.conn = conn}func (me *tRaftClient) HandleStateChanged(state iClientState) {    me.state = state    state.Start()}func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {    return me.state.Heartbeat(cmd, ret)}func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {    return me.state.AppendLog(cmd, ret)}func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {    return me.state.CommitLog(cmd, ret)}func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {    return me.state.RequestVote(cmd, ret)}func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {    return me.state.Ping(cmd, ret)}

tConnectedState.go

治理已连贯状态的rpc连贯

  • 定时Ping以检测连贯状态
  • 基于事件驱动的逻辑编排
  • 基于读写拆散的字段治理

    package client

import (

"learning/gooop/etcd/raft/model""learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/timeout""sync""time"

)

type tConnectedState struct {

model.TEventDrivenModelcontext iStateContextmInitOnce sync.OncemStartOnce sync.Once// update: ceInit, ceDisposingmDisposedFlag bool

}

// trigger: init()
// args: empty
const ceInit = "connected.init"

// trigger: Start()
// args: empty
const ceStart = "connected.Start"

// trigger:
// args: empty
const ceDisposing = "connected.Disposing"

// trigger: whenStartThenBeginPing()
// args: empty
const cePingFailed = "connected.PingFailed"

func newConnectedState(ctx iStateContext) iClientState {

it := new(tConnectedState)it.init(ctx)return it

}

func (me *tConnectedState) init(ctx iStateContext) {

me.mInitOnce.Do(func() {    me.context = ctx    me.initEventHandlers()    me.Raise(ceInit)})

}

func (me *tConnectedState) initEventHandlers() {

// write only logicme.hookEventsForDisposedFlag()// read only logicme.Hook(ceStart,    me.whenStartThenBeginPing)me.Hook(cePingFailed,    me.whenPingFailedThenSwitchToBrokenState)me.Hook(ceDisposing,    me.whenDisposingThenCloseConn)

}

func (me *tConnectedState) Start() {

me.mStartOnce.Do(func() {    me.Raise(ceStart)})

}

func (me *tConnectedState) hookEventsForDisposedFlag() {

me.Hook(ceInit, func(e string, args ...interface{}) {    me.mDisposedFlag = false})me.Hook(ceDisposing, func(e string, args ...interface{}) {    me.mDisposedFlag = true})

}

func (me *tConnectedState) whenStartThenBeginPing(_ string, _ ...interface{}) {

go func() {    cmd := &PingCmd{        SenderID: me.context.Config().ID(),        Timestamp: time.Now().UnixNano(),    }    ret := &PingRet{}    for range time.Tick(timeout.ClientPingInterval) {        if me.mDisposedFlag {            return        }        cmd.Timestamp = time.Now().UnixNano()        err := me.Ping(cmd, ret)        if err != nil {            me.Raise(cePingFailed)        }    }}()

}

func (me *tConnectedState) whenPingFailedThenSwitchToBrokenState(_ string, _ ...interface{}) {

me.Raise(ceDisposing)me.context.HandleStateChanged(newBrokenState(me.context))

}

func (me *tConnectedState) whenDisposingThenCloseConn(_ string, _ ...interface{}) {

it := me.context.GetConn()if it != nil {    it.Close()}me.context.SetConn(nil)

}

func (me tConnectedState) Heartbeat(cmd rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {

return me.context.GetConn().Call("TRaftRPCServer.Heartbeat", cmd, ret)

}

func (me tConnectedState) AppendLog(cmd rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {

return me.context.GetConn().Call("TRaftRPCServer.AppendLog", cmd, ret)

}

func (me tConnectedState) CommitLog(cmd rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {

return me.context.GetConn().Call("TRaftRPCServer.CommitLog", cmd, ret)

}

func (me tConnectedState) RequestVote(cmd rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {

return me.context.GetConn().Call("TRaftRPCServer.RequestVote", cmd, ret)

}

func (me tConnectedState) Ping(cmd PingCmd, ret *PingRet) error {

return me.context.GetConn().Call("TRaftRPCServer.Ping", cmd, ret)

}

# tBrokenState.go治理已断开状态的rpc连贯- 定时Dial以尝试重连贯- 基于事件驱动的逻辑编排- 基于读写拆散的字段治理

package client

import (

"errors""learning/gooop/etcd/raft/model"rrpc "learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/timeout""sync""net/rpc""time"

)

type tBrokenState struct {

model.TEventDrivenModelcontext iStateContextmInitOnce sync.OncemStartOnce sync.OncemDisposedFlag bool

}

// trigger : init()
// args: empty
const beInit = "broken.init"

// trigger: Start()
// args: empty
const beStart = "broken.Start"

// trigger: whenStartThenBeginDial
// args: *rpc.Client
const beDialOK = "broken.DialOK"

// trigger: whenDialOKThenSwitchToConnectedState
// args: empty
const beDisposing = "broken.Disposing"

func newBrokenState(ctx iStateContext) iClientState {

it := new(tBrokenState)it.init(ctx)return it

}

func (me *tBrokenState) init(ctx iStateContext) {

me.mInitOnce.Do(func() {    me.context = ctx    me.initEventHandlers()    me.Raise(beInit)})

}

func (me *tBrokenState) initEventHandlers() {

// write only logicme.hookEventsForDisposedFlag()// read only logicme.Hook(beStart,    me.whenStartThenBeginDial)me.Hook(beDialOK,    me.whenDialOKThenSetConn)me.Hook(beDialOK,    me.whenDialOKThenSwitchToConnectedState)

}

func (me *tBrokenState) hookEventsForDisposedFlag() {

me.Hook(beInit, func(e string, args ...interface{}) {    me.mDisposedFlag = false})me.Hook(beDisposing, func(e string, args ...interface{}) {    me.mDisposedFlag = true})

}

func (me *tBrokenState) Start() {

me.mStartOnce.Do(func() {    me.Raise(beStart)})

}

func (me *tBrokenState) whenStartThenBeginDial(_ string, _ ...interface{}) {

go func() {    for !me.mDisposedFlag {        conn, err := rpc.Dial("tcp", me.context.Config().Endpoint())        if err == nil {            me.Raise(beDialOK, conn)            break        } else {            time.Sleep(timeout.ClientRedialInterval)        }    }}()

}

func (me *tBrokenState) whenDialOKThenSetConn(_ string, args ...interface{}) {

conn := args[0].(*rpc.Client)me.context.SetConn(conn)

}

func (me *tBrokenState) whenDialOKThenSwitchToConnectedState(_ string, _ ...interface{}) {

me.Raise(beDisposing)me.context.HandleStateChanged(newConnectedState(me.context))

}

func (me tBrokenState) Heartbeat(cmd rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {

return gErrorConnectionBroken

}

func (me tBrokenState) AppendLog(cmd rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {

return gErrorConnectionBroken

}

func (me tBrokenState) CommitLog(cmd rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {

return gErrorConnectionBroken

}

func (me tBrokenState) RequestVote(cmd rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {

return gErrorConnectionBroken

}

func (me tBrokenState) Ping(cmd PingCmd, ret *PingRet) error {

return gErrorConnectionBroken

}

var gErrorConnectionBroken = errors.New("peer connection broken")

(未完待续)