关于raft:深入解析raftexample理解raft协议

3次阅读

共计 44876 个字符,预计需要花费 113 分钟才能阅读完成。

说到 raftexample,很多人可能很生疏,我晓得 raft,我也晓得 example,哪来的 raftexample?这里做下简略的介绍,raftexample 是 etcd 外面 raft 模块实现的简略示例,它实现了一个简略的基于 raft 协定的 kvstore 存储集群零碎,并提供了 rest api 以供应用

而 raftexample 仅有以下几个文件,几百行代码,通过浏览,能够帮忙咱们更好的了解 raft 协定,投入产出比超大

演示动画

官网举荐的动画演示地址:http://thesecretlivesofdata.c…

leader 选举

日志同步

概念解析

逻辑时钟

逻辑时钟其实是一个定时器 time.Tick,每隔一段时间触发一次,是推动 raft 选主的外围,在这外面有几个属性先理解下

electionElapsed:逻辑时钟推动计数,follower 和 leader 没推动一次逻辑时钟,这个数值就会 +1;follower 收到 leader 的心跳音讯后会重置为 0;leader 则在每次发送心跳前重置为 0

heartbeatElapsed:leader 的逻辑时钟推动计数,不仅仅会减少 electionElapsed 计数,还会减少 heartbeatElapsed 的计数

heartbeatTimeout:当 leader 的 heartbeatElapsed 计数达到 heartbeatTimeout 预约义的值的时候,就会向各个节点发送一次心跳

electionTimeout:当 leader 的 electionElapsed 的逻辑时钟推动次数超过这个值的时候,如果 leader 同时开启了 checkQuorum 来判断以后集群各节点的存活状态时,这时候 leader 就会进行探活(探活不会发动网络申请,靠本身存储的各个节点状态)

randomizedElectionTimeout:随机生产的一个值,当 follower 的 electionElapsed 计数达到这个值的时候,就会开始发动新一轮选举

所以,能够看进去,逻辑时钟次要是推动 leader 心跳和探活、follower 的选举的

follower 发动选举的条件:electionElapsed >= randomizedElectionTimeout

leader 发送心跳的条件:heartbeatElapsed >= heartbeatTimeout

leader 发动集群节点探活的条件:electionElapsed > electionTimeout

这里就有一个问题了,leader 节点为什么会有 electionElapsed 和 heartbeatElapsed 两个计数,一个不就能够了吗?

其实是因为,当 leader 发动探活或者计数满足探活条件的时候,electionElapsed 就会被置为 0,所以对于 leader 而言,electionElapsed 跟 heartbeatElapsed 的值并不统一,也不同步

raft.Peer

Peer: 集群中的节点,每一个退出集群的利用过程,都是一个节点

type Peer struct {
    ID      uint64
    Context []byte}

ID: 节点的 id,过程初始化的时候,会给集群中的每个节点调配一个 ID

Context: 上下文

raftpb.Message

Message: 这是 raftexample(前面简称 raft 来代替)中的一个重要的构造体,是所有音讯的形象,包含且不限于选举 / 增加数据 / 配置变更,都是一种音讯类型

type Message struct {
    Type MessageType 
    To   uint64      
    From uint64      
    Term uint64      
    LogTerm    uint64   
    Index      uint64   
    Entries    []Entry  
    Commit     uint64   
    Snapshot   Snapshot 
    Reject     bool     
    RejectHint uint64   
    Context    []byte}

Type: 音讯类型,raft 中就是依据不同的音讯类型来实现不同的逻辑解决的

  • MsgHup MessageType = 0 // follower 节点认为 leader 挂了的时候,发动选举
  • MsgBeat MessageType = 1 // leader 才会发送的本地音讯,raft 初始化时会定义一个心跳超时的次数,当逻辑时钟推动的次数超过定义的心跳超时次数后,就会触发这个音讯,而后 leader 会向 follower 发送 MsgHeartbeat 音讯
  • MsgProp MessageType = 2 // 写入数据或批改集群配置的音讯类型
  • MsgApp MessageType = 3 // leader 向 follower 播送追加日志的音讯
  • MsgAppResp MessageType = 4 // follower 响应 leader 的追加日志申请的音讯类型
  • MsgVote MessageType = 5 // candinator 发送选举命令的音讯类型
  • MsgVoteResp MessageType = 6 // 其余节点响应 candinator 选举命令的音讯类型
  • MsgSnap MessageType = 7 // leader 向 follower 发送快照数据的音讯类型
  • MsgHeartbeat MessageType = 8 // leader 向 follower 发送的心跳音讯
  • MsgHeartbeatResp MessageType = 9 // follower 响应的心跳音讯
  • MsgUnreachable MessageType = 10 // 音讯不可达,本地音讯
  • MsgSnapStatus MessageType = 11 // 报告发送给 follower 节点的 Snap 音讯状态,是否发送胜利
  • MsgCheckQuorum MessageType = 12 // 这个也是本地音讯,用于 leader 判断以后存活节点的状态,如果不超过一半节点存活则降为 follower 节点
  • MsgTransferLeader MessageType = 13 // 转移 leader 权
  • MsgTimeoutNow MessageType = 14 // 当发送 MsgTransferLeader 的 follower 的日志与 leader 始终的时候,leader 发送 MsgTimeoutNow 给这个 follower,follower 开始进行选举
  • MsgReadIndex MessageType = 15 // follower 节点向 leader 节点申请获取 commit index 的地位
  • MsgReadIndexResp MessageType = 16 // leader 节点响应 MsgReadIndex
  • MsgPreVote MessageType = 17 // preVote 音讯是 Vote 音讯前可选的音讯,如果开启了 preVote,则 follower 在转成 candidator 前进行一次 preVote 与投票,这时候 Term 任期不会减少,防止因为网络分区造成有余 1 / 2 的分区节点的频繁选主
  • MsgPreVoteResp MessageType = 18 // 其余节点对 MsgPreVote 的响应

To: 音讯是发送给的节点的 ID

From: 音讯发送方节点的 ID

Term: 以后的任期

LogTerm:当 leader 发送给 follower 节点日志的时候,follower 节点回绝的时候,会匹配到一个适合的日志地位尝试让 leader 开始同步,这个日志点的日志对应的 Term 就是 LogTerm

Index: 音讯在日志中的 Index

Entries: 日志记录

Commit: 音讯发送节点的日志 commit 的地位

Snapshot:在传输快照时的快照信息

Reject:节点是否回绝收到的音讯;例如,follower 收到 leader 的 MsgApp 音讯的时候,发现日志不能间接追加,就会回绝掉 leader 的这个日志同步音讯

RejectHint:follower 回绝掉 leader 节点音讯后,计算出来的一个可能匹配 leader 日志的索引地位

Context:上下文

raftpb.Entry

Entry: 就是常说的日志

type Entry struct {
    Term  uint64
    Index uint64
    Type  EntryType
    Data  []byte}

Term:这条日志对应的任期,每个日志都是由 leader 同步过去的,而 leader 都有一个任期,这个就是同步日志时 leader 的任期

Index:索引,也是每条日志的一个标识

Type:日志的类型,EntryNormal 示意惯例日志;EntryConfChange/EntryConfChangeV2 示意配置变更的日志

Data:就是日志存储的数据

简略看两个 demo

leader 和 follower 日志不同步的状况
idx               1 2 3 4 5 6 7 8 9
                                    -----------------
term (Leader)     1 3 3 3 5 5 5 5 5
term (Follower)   1 1 1 1 2 2     

leader 和 follower 日志同步的状况
idx               1 2 3 4 5 6 7 8 9
                                    -----------------
term (Leader)     1 3 3 3 5 5 5 5 5
term (Follower)   1 3 3 3 5 5 5 5 5

leader 和 follower 节点日志的同步就是通过 Entries 外面每个 Entry 的 Index 和 Term 是否雷同来判断的

留神

  1. 前面所有的数据增删改查、配置增删、选举等音讯统称为消息日志或日志
  2. 在日志同步这里,咱们会疏忽 wal 日志和 snapshot 相干的内容

源码解析

构造体介绍

raftNode

type raftNode struct {
  // 接管 kvstore 的存储数据日志
    proposeC    <-chan string            // proposed messages (k,v)
  // 接管 kvHttpApi 的配置变更日志
    confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
  // 日志同步到 kvstore
    commitC     chan<- *commit           // entries committed to log (k,v)
  // 模块间出错的音讯告诉通道
    errorC      chan<- error             // errors from raft session

  // 节点的 id
    id          int      // client ID for raft session
  // 以后集群下的所有节点 ip:ports
    peers       []string // raft peer URLs
  // 以后节点是否接入一个集群,启动时候依据这个判断是重启还是启动一个新的节点
    join        bool     // node is joining an existing cluster
  // wal 日志目录
    waldir      string   // path to WAL directory
  // snapshot 目录
    snapdir     string   // path to snapshot directory
  // 获取 snapshot 的办法
    getSnapshot func() ([]byte, error)

  // 用于集群的状态
    confState     raftpb.ConfState
  // snapshot 的日志的 Index
    snapshotIndex uint64
  // 曾经 applied 的日志的 Index
    appliedIndex  uint64

    // raft backing for the commit/error channel
  // node 的实例,实现了 Node 接口的办法
    node        raft.Node
  // storage 实例
    raftStorage *raft.MemoryStorage
  // wal 实例
    wal         *wal.WAL

  // snapshot 实例,治理 snapshot
    snapshotter      *snap.Snapshotter
  // 与 snapshot 交互的通道,判断 snapshot 实例是否创立实现
    snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready

  // 两次 snapshot 之间的 apply 的日志最小条数
  // 当上一次 snapshot 之后,apply 的日志超过这个数量,就会开启新一轮 snapshot,用于及时开释 wal 和 raftLog 中的存储压力
    snapCount uint64
  // 网络组件
    transport *rafthttp.Transport
  // 通道告诉敞开 serveChannel,敞开网络组件
    stopc     chan struct{} // signals proposal channel closed
  // 敞开 raft node 的 http 服务
    httpstopc chan struct{} // signals http server to shutdown
  // raft node 的 http 敞开胜利后告诉其余模块的通道
    httpdonec chan struct{} // signals http server shutdown complete

  // 日志组件
    logger *zap.Logger
}

raft

type raft struct {
  // 节点 id
    id uint64

  // 以后节点的 Term 任期
    Term uint64
  // 以后节点在选举时,投票给了谁,初始化时为 0,也就是谁都没有投给
    Vote uint64

  // 与 readIndex 申请无关,这里不多做介绍
    readStates []ReadState

    // the log
    raftLog *raftLog

  // 单条音讯最大的 size
    maxMsgSize         uint64
  // 最大的 uncommit 日志数量,当 uncommit 日志数量大于这个值的时候,就不再追加日志了
    maxUncommittedSize uint64
    // TODO(tbg): rename to trk.
  // 集中群各个节点状态,蕴含了节点日志复制状况等,上面会独自介绍一下
    prs tracker.ProgressTracker

  // 状态,follower、candidator、leader 等
    state StateType

    // isLearner is true if the local raft node is a learner.
    isLearner bool

  // 记录了以后节点待发送的音讯,这里的音讯会被及时生产
    msgs []pb.Message

    // the leader id
    lead uint64
    // leadTransferee is id of the leader transfer target when its value is not zero.
    // Follow the procedure defined in raft thesis 3.10.
  // leader 转换对象的 id
    leadTransferee uint64
    // Only one conf change may be pending (in the log, but not yet
    // applied) at a time. This is enforced via pendingConfIndex, which
    // is set to a value >= the log index of the latest pending
    // configuration change (if any). Config changes are only allowed to
    // be proposed if the leader's applied index is greater than this
    // value.
    pendingConfIndex uint64
    // an estimate of the size of the uncommitted tail of the Raft log. Used to
    // prevent unbounded log growth. Only maintained by the leader. Reset on
    // term changes.
  // 未 commit 的日志的数量
    uncommittedSize uint64

    readOnly *readOnly

    // number of ticks since it reached last electionTimeout when it is leader
    // or candidate.
    // number of ticks since it reached last electionTimeout or received a
    // valid message from current leader when it is a follower.
    electionElapsed int

    // number of ticks since it reached last heartbeatTimeout.
    // only leader keeps heartbeatElapsed.
    heartbeatElapsed int

  // 是否开启节点探活
    checkQuorum bool
  // 是否开启 preVote
    preVote     bool

    heartbeatTimeout int
    electionTimeout  int
    // randomizedElectionTimeout is a random number between
    // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
    // when raft changes its state to follower or candidate.
    randomizedElectionTimeout int
  // 是否不容许数据转发给 leader,开启的话,follower 节点收到的数据日志,就间接抛弃掉,而不会转发给 leader
    disableProposalForwarding bool

  // 逻辑时钟办法,leader 对应 tickHeartbeat,follower 和 candidate 对应 tickElection
    tick func()
  // 解决音讯的办法,leader 对应 stepLeader,follower 对应 stepFollower,candidate 对应 stepCandidate
    step stepFunc

    logger Logger

    // pendingReadIndexMessages is used to store messages of type MsgReadIndex
    // that can't be answered as new leader didn't committed any log in
    // current term. Those will be handled as fast as first log is committed in
    // current term.
    pendingReadIndexMessages []pb.Message}

这里有两个点解释下:

checkQuorum:这个开关是用于 leader 判断各个节点的沉闷状态的,leader 给 follower 发送信息的时候都会设置一个沉闷态,然而如果呈现网络分区的状况下,leader 所在的分区小于 1 / 2 节点,那么这个 leader 也就没用了,能够被动将本人降级为 follower 节点

preVote:preVote 也是为了网络分区的状况设置的,当网络分区后,某个分区外面的节点数小于 1 / 2 节点数,则 follower 会频繁的发动选举,发动选举时就会将本人的 Term +1,然而并不会选举胜利,所以会陷入一个循环:发动选举 ->term+1-> 选举失败 -> 发动选举 ->term+1……,当网络分区接触时,这个 Term 可能曾经很大了,合并后真正的 leader 的 Term 可能都没有分区外面的 follower 的 Term 大,就会影响日志的准确性了,所以网络分区中的节点会先发动 preVote,这时候 Term 不变,preVote 胜利后才会真正的发动选举流程

raftLog

raftLog 是节点中治理日志的组件

type raftLog struct {
    // storage contains all stable entries since the last snapshot.
  // storage 组件,外面存储了 snapshot 之后的所有的 stable 日志记录
    storage Storage

    // unstable contains all unstable entries and snapshot.
    // they will be saved into storage.
  // 记录了所有 unstable 的日志记录和 snapshot
    unstable unstable

    // committed is the highest log position that is known to be in
    // stable storage on a quorum of nodes.
  // commit 日志的最大的 Index 的值,这个是统计的 storage 组件外面的
    committed uint64
    // applied is the highest log position that the application has
    // been instructed to apply to its state machine.
    // Invariant: applied <= committed
  // 曾经 applied 日志的最大的 Index 的值
    applied uint64

    logger Logger

    // maxNextEntsSize is the maximum number aggregate byte size of the messages
    // returned from calls to nextEnts.
    maxNextEntsSize uint64
}

// Invariant: applied <= committed

在官网的 applied 字段的正文上,有这么一段话:applied <= committed 恒成立,这是为什么

咱们先简略理解一下日志的生命周期,前面跟踪日志同步的时候再具体介绍

用户提交一个创立数据的申请 -> kvstore 生成一条数据日志 -> 日志存储到 unstable 构造中 -> 日志追加到 storage 外面 -> 日志同步给其余节点 -> 节点同步实现,commit 日志,更新 committed 地位 -> kvstore 存储 -> 更新 applied 地位

启动流程

func main() {cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
    id := flag.Int("id", 1, "node ID")
    kvport := flag.Int("port", 9121, "key-value server port")
    join := flag.Bool("join", false, "join an existing cluster")
    flag.Parse()

  // 创立 proposeC 和 confChangeC 通道,这两个通道用于 kvstore raftNode http 三个模块的交互,所以在里面创立
    proposeC := make(chan string)
    defer close(proposeC)
    confChangeC := make(chan raftpb.ConfChange)
    defer close(confChangeC)

    // raft provides a commit stream for the proposals from the http api
    var kvs *kvstore
    getSnapshot := func() ([]byte, error) {return kvs.getSnapshot() }
  // 启动 raftNode 模块
    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

  // 启动 kvstore 模块
    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

    // the key-value http handler will propose updates to raft
  // 启动 httpKVApi 模块,用于接管申请
    serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

raftexample 的启动流程是比较简单的,将所有模块启动,并创立了各个模块之间的交互通道,便于各个模块间通信,并且实现了个股模块之间的解耦

  1. raftNode: raftexample 的外围模块,提供了 raft 协定,选主、日志同步等能力
  2. kvstore: raftexample 的存储模块,最终提交的日志,都会存储在这外面,在 raftexample 里,这个存储系统实质是个 map
  3. httpKvApi: 提供的与外界交互的 api,能够通过 httpapi 来实现存储数据的创立批改和删除以及集群节点的减少和删除

咱们这里依照 httpKvApi->kvstore->raftNode 由简入深的节奏解析

httpKVAPI 模块

httpKvApi 是提供了一个 http 服务,通过不同的 method 来操作不同的对象(数据和集群节点),实现了数据和节点的增删改查

启动

func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
    // 定义 http server
  srv := http.Server{Addr: ":" + strconv.Itoa(port),
        Handler: &httpKVAPI{
            store:       kv,
            confChangeC: confChangeC,
        },
    }
    go func() {
    // 启动 http server
        if err := srv.ListenAndServe(); err != nil {log.Fatal(err)
        }
    }()

    // exit when raft goes down
  // 与上层的 raft 通信,如果 raft 挂了,则本模块也要退出
    if err, ok := <-errorC; ok {log.Fatal(err)
    }
}

启动的逻辑比较简单

异步启动了一个 http 服务,并且阻塞监听 error chan,第一防止过程退出,第二档 raft 模块退出后,http 也能够及时退出

申请解决

申请解决对立集中在了 ServeHTTP 这一个办法外面

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    key := r.RequestURI
    defer r.Body.Close()
    switch r.Method {
    // Put 形式,则表明是数据减少或批改
    case http.MethodPut:
        v, err := io.ReadAll(r.Body)
        if err != nil {log.Printf("Failed to read on PUT (%v)\n", err)
            http.Error(w, "Failed on PUT", http.StatusBadRequest)
            return
        }

        h.store.Propose(key, string(v))

        // Optimistic-- no waiting for ack from raft. Value is not yet
        // committed so a subsequent GET on the key may return old value
    // 这里间接返回,而底层则会进行数据一致性的解决等操作,所以如果立刻进行 GET 申请的话,还是有可能获取老的数据
        w.WriteHeader(http.StatusNoContent)
    case http.MethodGet:
    // Get 形式,间接到 kvstore 外面获取数据的 value
        if v, ok := h.store.Lookup(key); ok {w.Write([]byte(v))
        } else {http.Error(w, "Failed to GET", http.StatusNotFound)
        }
    case http.MethodPost:
    // Post 形式是减少集群节点,解析进去 nodeId,并通过 confChangeC 传给 raftNode
        url, err := io.ReadAll(r.Body)
        if err != nil {log.Printf("Failed to read on POST (%v)\n", err)
            http.Error(w, "Failed on POST", http.StatusBadRequest)
            return
        }

        nodeId, err := strconv.ParseUint(key[1:], 0, 64)
        if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)
            http.Error(w, "Failed on POST", http.StatusBadRequest)
            return
        }

        cc := raftpb.ConfChange{
            Type:    raftpb.ConfChangeAddNode,
            NodeID:  nodeId,
            Context: url,
        }
        h.confChangeC <- cc
        // As above, optimistic that raft will apply the conf change
        w.WriteHeader(http.StatusNoContent)
    case http.MethodDelete:
    // Delete 形式则是删除一个集群节点,并通过 confChangeC 传给 raftNode 来解决
        nodeId, err := strconv.ParseUint(key[1:], 0, 64)
        if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)
            http.Error(w, "Failed on DELETE", http.StatusBadRequest)
            return
        }

        cc := raftpb.ConfChange{
            Type:   raftpb.ConfChangeRemoveNode,
            NodeID: nodeId,
        }
        h.confChangeC <- cc

        // As above, optimistic that raft will apply the conf change
        w.WriteHeader(http.StatusNoContent)
    default:
        w.Header().Set("Allow", http.MethodPut)
        w.Header().Add("Allow", http.MethodGet)
        w.Header().Add("Allow", http.MethodPost)
        w.Header().Add("Allow", http.MethodDelete)
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

下面就是 httpKvApi 的所有解决的状况了

  1. Put 申请:增加或批改 kvstore 外面存储的数据,但这里并不是间接批改,而是调用h.store.Propose(即kvstore.Propose)来解决,kvstore.Propose 则是通过 proposeC 将数据给到 raftNode 来保证数据一致性后再提交保留到 kvstore 外面,这里前面会具体解析,也是 raft 的外围
  2. Get 申请:这个办法就比较简单了,间接到 kvstore 外面读取数据,而 Put 办法则是增加和批改数据,然而 Put 不间接批改,所以如果 Put 后立即获取某个 key 的 value,则有可能 raftNode 还没有解决完提交到 kvstore 外面,而导致读取的还是老的数据
  3. Post 申请:增加一个集群节点,这个节点会被 leader 通晓并将此节点退出 follower
  4. Delete 申请:删除一个节点,也是交给 leader 来解决

kvstore 模块

创立 kvstore

func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore {
  // 实例化 kvstore,example 外面存储系统就是一个 map
    s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
  // 加载 snapshot
    snapshot, err := s.loadSnapshot()
    if err != nil {log.Panic(err)
    }
  // 如果 snapshot 不为空,则先从 snapshot 外面把数据恢复
    if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
        if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)
        }
    }
    // read commits from raft into kvStore map until error
  // 开启一个 goroutine,读取从 raftNode 外面提交的 commit 数据
    go s.readCommits(commitC, errorC)
    return s
}

// 加载 snapshot
func (s *kvstore) loadSnapshot() (*raftpb.Snapshot, error) {snapshot, err := s.snapshotter.Load()
  // ErrNoSnapshot 示意没有 snapshot,这里消化掉 err,防止下层模块退出
    if err == snap.ErrNoSnapshot {return nil, nil}
    if err != nil {return nil, err}
  // 找到了,则返回
    return snapshot, nil
}

func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {return s.loadMatching(func(*raftpb.Snapshot) bool {return true})
}

func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
  // 这里返回 snapshot 的文件列表,依照工夫排序,也就是从新到旧排序
    names, err := s.snapNames()
    if err != nil {return nil, err}
    var snap *raftpb.Snapshot
  // 遍历 snapshot 文件,其实也就是读取最新的文件
    for _, name := range names {
    // loadSnap 就不追了,就是读取 snapshot 文件的数据,并反序列化为构造体对象
        if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {return snap, nil}
    }
  // 没找到获取没有适合的 snapshot,就返回 ErrNoSnapshot
    return nil, ErrNoSnapshot
}

// 从 snapshot 外面复原数据到 kvstore,这里的入参是下面 snapshot.Data,也就是[]byte
func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {var store map[string]string
    if err := json.Unmarshal(snapshot, &store); err != nil {return err}
  // 反序列化为 map 后,间接赋值给 kvstore
    s.mu.Lock()
    defer s.mu.Unlock()
    s.kvStore = store
    return nil
}

创立的逻辑:

  1. 首先实例化 kvstore 这个构造体,并实例化存储系统,在 example 外面也就是一个 map
  2. 而后本地寻找是否有 snapshot,并读取最新的 snapshot,反序列化为 snapshot 构造体
  3. 如果找到了 snapshot,则将 snapshot.Data 反序列化为 map,给到 kvstore,也就从 snapshot 外面复原了 kvstore 的存储数据
  4. 最初开启一个 goroutine,通过 commitC 与 raftNode 交互,读取数据并存储

数据存储

下面说了,kvstore 会开启一个通道 chan 与 raftNode 通信,读取数据并存储,这里看下具体的逻辑

func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
    for commit := range commitC {
    // 如果读取到 nil,则从 snapshot 外面再次复原数据
        if commit == nil {
            // signaled to load snapshot
            snapshot, err := s.loadSnapshot()
            if err != nil {log.Panic(err)
            }
            if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
                if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)
                }
            }
            continue
        }

    // 读取数据
        for _, data := range commit.data {
            var dataKv kv
            dec := gob.NewDecoder(bytes.NewBufferString(data))
            if err := dec.Decode(&dataKv); err != nil {log.Fatalf("raftexample: could not decode message (%v)", err)
            }
            s.mu.Lock()
            s.kvStore[dataKv.Key] = dataKv.Val
            s.mu.Unlock()}
    // 敞开 commit 的 chan,以告诉下层模块存储实现
        close(commit.applyDoneC)
    }
  // 如果遇到 error chan,跟 httpkvapi 模块一样,退出以后模块
    if err, ok := <-errorC; ok {log.Fatal(err)
    }
}

commitC 这个通道和解决逻辑就比较简单了

这个 goroutine 就卡在这个 chan,有数据过去就进行解决,并告诉下层模块解决实现

如果 raftNode 遇到异样退出了,则通过 error chan 告诉,kvstore 模块也就退出了

数据查找

func (s *kvstore) Lookup(key string) (string, bool) {s.mu.RLock()
    defer s.mu.RUnlock()
    v, ok := s.kvStore[key]
    return v, ok
}

查找就比较简单了,数据结构就是一个 map,间接从 map 外面读数据就行了

数据提交

后面说过数据存储,那么数据提交和数据存储是什么关系,为什么会有两个办法来解决数据呢?

其实 raft 协定外面咱们能够晓得,当用户提交一个数据创立 / 批改的申请的时候,首先把数据提交过去,而后 raftNode 会把数据告诉到上面的各个节点,当数据有一半节点以上接管到的时候,才会真正存储到存储系统外面

func (s *kvstore) Propose(k string, v string) {
    var buf bytes.Buffer
    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {log.Fatal(err)
    }
    s.proposeC <- buf.String()}

所以,这里的数据提交,其实就是通过 proposeC 给到 raftNode,raftNode 解决完后,再通过 commitC 给到 kvstore 存储起来

总结 & 备注

  1. 这里的 map+lock 形式,效率会低很多,不如间接用 sync.Map;当然这里只是个 example,也就不须要思考那么多
  2. 数据提交存储流程:当用户通过 httpkvapi 创立一个数据批改 / 创立的申请的时候,kvstore 会先把这个数据通过 proposeC 转发给 raftNode,而后 raftNode 解决实现后才会真正存储到数据库(也就是本身的 map)

raftNode 模块

启动 raftNode

func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
    confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {commitC := make(chan *commit)
    errorC := make(chan error)

    rc := &raftNode{
        proposeC:    proposeC,
        confChangeC: confChangeC,
        commitC:     commitC,
        errorC:      errorC,
        id:          id,
        peers:       peers,
        join:        join,
        waldir:      fmt.Sprintf("raftexample-%d", id),
        snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
        getSnapshot: getSnapshot,
        snapCount:   defaultSnapshotCount,
        stopc:       make(chan struct{}),
        httpstopc:   make(chan struct{}),
        httpdonec:   make(chan struct{}),

        logger: zap.NewExample(),

        snapshotterReady: make(chan *snap.Snapshotter, 1),
        // rest of structure populated after WAL replay
    }
  // 异步启动 raft
    go rc.startRaft()
  // 返回 commitC,errorC rc.snapshotterReady,用于跟其余模块通信
    return commitC, errorC, rc.snapshotterReady
}

func (rc *raftNode) startRaft() {if !fileutil.Exist(rc.snapdir) {if err := os.Mkdir(rc.snapdir, 0750); err != nil {log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
        }
    }
    rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)

    oldwal := wal.Exist(rc.waldir)
    rc.wal = rc.replayWAL()

    // signal replay has finishei
  // 告诉其余模块 snapshotter 初始化实现
    rc.snapshotterReady <- rc.snapshotter

    rpeers := make([]raft.Peer, len(rc.peers))
    for i := range rpeers {rpeers[i] = raft.Peer{ID: uint64(i + 1)}
    }
  // 初始化 raft 的相干配置,用于启动 raft
    c := &raft.Config{ID:                        uint64(rc.id),
        ElectionTick:              10,
        HeartbeatTick:             1,
        Storage:                   rc.raftStorage,
        MaxSizePerMsg:             1024 * 1024,
        MaxInflightMsgs:           256,
        MaxUncommittedEntriesSize: 1 << 30,
    }

  // 依据 oldwal 和 join,判断是新启动的节点还是重启的节点
  // 如果是重启,则能够从 snapshot 外面复原数据
    if oldwal || rc.join {rc.node = raft.RestartNode(c)
    } else {rc.node = raft.StartNode(c, rpeers)
    }

  // 初始化网络组件
    rc.transport = &rafthttp.Transport{
        Logger:      rc.logger,
        ID:          types.ID(rc.id),
        ClusterID:   0x1000,
        Raft:        rc,
        ServerStats: stats.NewServerStats("",""),
        LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
        ErrorC:      make(chan error),
    }

    rc.transport.Start()
  // 启动与各个节点的网络 pipeline 通信通道
    for i := range rc.peers {
        if i+1 != rc.id {rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
        }
    }

    go rc.serveRaft()
    go rc.serveChannels()}

raftNode 启动流程:

  1. 启动 raft,返回 commitC 与 kvstore 通信,errorC 与其余模块通信,以告诉异样及时退出,snapshotterReady chan 与 kvstore 通信以告诉 snapshot 初始化实现
  2. 初始化 snapshotter,回放 wal 日志
  3. 初始化 raft.Config,依据 wal 目录是否存在与 join 配置来判断是新节点还是旧的节点,旧的节点则从 snapshot 外面复原数据即可
  4. 初始化网络组件,并与各个节点进行通信,初始化各个节点的 pipeline 通道
  5. 启动 raft server 服务
  6. 启动 raftNode 的各个通道,与各个模块间进行通信
启动 node
func StartNode(c *Config, peers []Peer) Node {if len(peers) == 0 {panic("no peers given; use RestartNode instead")
    }
  // 初始化 node 节点,并初始化配置
    rn, err := NewRawNode(c)
    if err != nil {panic(err)
    }
  // 将各个节点的信息存储到 raftLog 外面,期待日志同步,而后周知各个节点配置变更的音讯
    err = rn.Bootstrap(peers)
    if err != nil {c.Logger.Warningf("error occurred during starting a new node: %v", err)
    }

  // 实例化 node 节点
    n := newNode(rn)

  // node 就开始在后盾异步运行了,直至退出
    go n.run()
    return &n
}

func NewRawNode(config *Config) (*RawNode, error) {
  // 依据配置,实例化 raft
    r := newRaft(config)
    rn := &RawNode{raft: r,}
  // 初始化 Leader, State, Term Vote CommitIndex 等 raft 本身属性
    rn.prevSoftSt = r.softState()
    rn.prevHardSt = r.hardState()
    return rn, nil
}

func newRaft(c *Config) *raft {if err := c.validate(); err != nil {panic(err.Error())
    }
    raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
    hs, cs, err := c.Storage.InitialState()
    if err != nil {panic(err) // TODO(bdarnell)
    }

    r := &raft{
        id:                        c.ID,
        lead:                      None,
        isLearner:                 false,
        raftLog:                   raftlog,
        maxMsgSize:                c.MaxSizePerMsg,
        maxUncommittedSize:        c.MaxUncommittedEntriesSize,
        prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs),
        electionTimeout:           c.ElectionTick,
        heartbeatTimeout:          c.HeartbeatTick,
        logger:                    c.Logger,
        checkQuorum:               c.CheckQuorum,
        preVote:                   c.PreVote,
        readOnly:                  newReadOnly(c.ReadOnlyOption),
        disableProposalForwarding: c.DisableProposalForwarding,
    }

  // 初始化了各个节点在以后节点的状态及存储信息等,包含投票信息,日志 commit 节点,是否沉闷等
    cfg, prs, err := confchange.Restore(confchange.Changer{
        Tracker:   r.prs,
        LastIndex: raftlog.lastIndex(),}, cs)
    if err != nil {panic(err)
    }
    assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))

    if !IsEmptyHardState(hs) {r.loadState(hs)
    }
  // 更新 raftLog 的 apply index
    if c.Applied > 0 {raftlog.appliedTo(c.Applied)
    }
  // 启动后,就会变更集群中的 follower 节点
    r.becomeFollower(r.Term, None)

    var nodesStrs []string
    for _, n := range r.prs.VoterNodes() {nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
    }

    r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
        r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
    return r
}

// newNode 就初始化了各个 chan,与各个模块进行通信
func newNode(rn *RawNode) node {
    return node{propc:      make(chan msgWithResult),
        recvc:      make(chan pb.Message),
        confc:      make(chan pb.ConfChangeV2),
        confstatec: make(chan pb.ConfState),
        readyc:     make(chan Ready),
        advancec:   make(chan struct{}),
        // make tickc a buffered chan, so raft node can buffer some ticks when the node
        // is busy processing raft messages. Raft node will resume process buffered
        // ticks when it becomes idle.
        tickc:  make(chan struct{}, 128),
        done:   make(chan struct{}),
        stop:   make(chan struct{}),
        status: make(chan chan Status),
        rn:     rn,
    }
}

Node 节点的启动流程:

  1. 实例化 node 节点,并初始化 softState 和 hardState,也就是 Leader, State, Term Vote CommitIndex 等 raft 本身属性
  2. 实例化 raft,初始化了 raftLog,而后捞取 storage 的 hardState 和配置信息
  3. 初始化了各个节点在以后节点的状态及存储信息等,包含投票信息,日志 commit 节点,是否沉闷等
  4. 依据 storage 捞取的 hardState 更新节点的 hardState
  5. 降级为 follower
启动 serverRaft
func (rc *raftNode) serveRaft() {url, err := url.Parse(rc.peers[rc.id-1])
    if err != nil {log.Fatalf("raftexample: Failed parsing URL (%v)", err)
    }

    ln, err := newStoppableListener(url.Host, rc.httpstopc)
    if err != nil {log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
    }

    err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
    select {
    case <-rc.httpstopc:
    default:
        log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
    }
    close(rc.httpdonec)
}

serverRaft 就是启动了一个 http 服务,用于各个节点间通信;这里跟 httpKvApi 不同,httpKvApi 只有是用于与集群通信,而 raft server 则是各个节点间选主,日志同步等到通信

启动流程介绍完了后,各个对象也都筹备好了,状态也都就绪了,前面就是开始正式的干活了 -leader 选举和日志同步

leader 选举

raftNode 选主是由逻辑时钟推动的,逻辑时钟的逻辑,后面曾经介绍了,这里跟着代码看看具体的实现

leader 会定时给 follower 发送心跳,同时 follower 会有个定时器查看心跳距离,如果心跳距离超过设定的工夫,就会触发选主了

定时器触发
func (rc *raftNode) serveChannels() {ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
  ......
  for {
        select {
        case <-ticker.C:
            rc.node.Tick()
    ......
    }
  }
}

func (n *node) Tick() {
    select {case n.tickc <- struct{}{}:
    case <-n.done:
    default:
        n.rn.raft.logger.Warningf("%x A tick missed to fire. Node blocks too long!", n.rn.raft.id)
    }
}
定时器接管解决

定时器触发是在 serveChannels 外面实现的,而后会通过 tickc 给到node.run 办法解决

func (n *node) run() {
  ......
  switch {
    ......
    case <-n.tickc:
            n.rn.Tick()
    ......
  }
}

func (rn *RawNode) Tick() {rn.raft.tick()
}

func (r *raft) tickElection() {
    r.electionElapsed++

  // 判断心跳是否超时,每次 tick electionElapsed++,如果 electionElapsed >= raft 初始化时的 randomizedElectionTimeout
  // 则认为心跳超时了,这时候就能够进行选主了
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {r.logger.Debugf("error occurred during election: %v", err)
        }
    }
}

// 心跳超时判断
func (r *raft) pastElectionTimeout() bool {return r.electionElapsed >= r.randomizedElectionTimeout}

follower 节点每次逻辑时钟推动,就会给 electionElapsed+1,而在 raft 初始化的时候,会设置一个 randomizedElectionTimeout,当逻辑时钟推动的次数超过 randomizedElectionTimeout 的时候,就会触发 leader 选举流程了

那么什么时候 electionElapsed 会被重置呢

func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
    ......
    case pb.MsgHeartbeat:
        r.electionElapsed = 0
        r.lead = m.From
        r.handleHeartbeat(m)
  ......
}

当节点启动的时候,就会切换成 follower 的状态,当 follower 收到 leader 的 MsgHeartbeat 时,就会重置 electionElapsed

这里当判断 electionElapsed > randomizedElectionTimeout,也就是 leader 心跳超时的时候,就开始发动了选主的投票流程了

发动投票

leader 选举这里咱们先不思考投票音讯的日志解决

func (r *raft) Step(m pb.Message) error {
    ......
    switch m.Type {
    case pb.MsgHup:
    // preVote 是个开关,用于选举前置判断,例如以后节点被网络分区等状况造成的异样,只有 preVote 通过后才会发动投票
    // preVote 和 vote 逻辑差不多,就不多剖析了
        if r.preVote {r.hup(campaignPreElection)
        } else {r.hup(campaignElection)
        }
  ......
  }
}


func (r *raft) hup(t CampaignType) {
  // 如果以后节点是 leader,则疏忽
    if r.state == StateLeader {r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        return
    }
    // 判断以后节点是否能够晋升为 leader
    if !r.promotable() {r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
        return
    }
  // 获取未提交的 raftLog
    ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
    if err != nil {r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
    }
    if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
        return
    }

  // 开始竞选
    r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
    r.campaign(t)
}
func (r *raft) campaign(t CampaignType) {if !r.promotable() {// This path should not be hit (callers are supposed to check), but
        // better safe than sorry.
        r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
    }
    var term uint64
    var voteMsg pb.MessageType
  // 更新 raft 的状态
    if t == campaignPreElection {r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        // PreVote RPCs are sent for the next term before we've incremented r.Term.
        term = r.Term + 1
    } else {r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
  // 给本身投票,并记录投票后果,判断是否能升级
    if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
        // We won the election after voting for ourselves (which must mean that
        // this is a single-node cluster). Advance to the next state.
    // 这里则是 preVote 胜利了,则是发动真正的投票环节
        if t == campaignPreElection {r.campaign(campaignElection)
        } else {
      // 投票胜利了,则晋升为 leader
            r.becomeLeader()}
        return
    }
  
  // 投票后果未满足降职后果,则开始周知其余节点进行投票
    var ids []uint64
    {idMap := r.prs.Voters.IDs()
        ids = make([]uint64, 0, len(idMap))
        for id := range idMap {ids = append(ids, id)
        }
        sort.Slice(ids, func(i, j int) bool {return ids[i] < ids[j] })
    }
    for _, id := range ids {
        if id == r.id {continue}
        r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
            r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

        var ctx []byte
        if t == campaignTransfer {ctx = []byte(t)
        }
    // 将投票的音讯发送给其余节点,这里只是把音讯存储起来,期待其余 goroutine 生产发送
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

这里咱们先注意一下,当节点开启了preVote 后,竞选类型就是campaignPreElection,而后晋升为PreCandidate;否则竞选类型是campaignElection,继而晋升为 Candidate,咱们看下这两个角色别离干了什么

func (r *raft) becomeCandidate() {// TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateLeader {panic("invalid transition [leader -> candidate]")
    }
    r.step = stepCandidate
    r.reset(r.Term + 1)
    r.tick = r.tickElection
    r.Vote = r.id
    r.state = StateCandidate
    r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

func (r *raft) becomePreCandidate() {// TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateLeader {panic("invalid transition [leader -> pre-candidate]")
    }
    // Becoming a pre-candidate changes our step functions and state,
    // but doesn't change anything else. In particular it does not increase
    // r.Term or change r.Vote.
    r.step = stepCandidate
    r.prs.ResetVotes()
    r.tick = r.tickElection
    r.lead = None
    r.state = StatePreCandidate
    r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}

能够看到,PreCandidate 这里的 Term 没有变动,而 Candidate 的 Term + 1,跟咱们匹配上咱们后面说的 preVote的作用了 - 防止网络分区造成的频繁选主

poll的作用是记录某个 id 给本人的投票后果,并判断整体投票后果是否实现

func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
    if v {r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
    } else {r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
    }
    r.prs.RecordVote(id, v)
    granted, rejected, result = r.prs.TallyVotes()
    r.logger.Infof("%d check poll result, votes = %d, rejected = %d, voteResult = %d, record = %v, voters = %v",
        r.id, granted, rejected, result, r.prs.Votes, r.prs.Voters)
    return
}
func (r *raft) send(m pb.Message) {
    if m.From == None {m.From = r.id}
    ......
  // 把音讯追加到 slice 外面,期待其余 goroutine 生产
    r.msgs = append(r.msgs, m)
}

选主流程的逻辑如下

  1. 首先判断本身是否是 leader 或本身是否能够降职
  2. 是否开启了 preVote 环节,开启的话,则先走一遍 preVote 流程,跟 vote 流程差不多
  3. 而后批改本身节点的状态和信息,降职成为 Candidate
  4. 给本身投个票,并计算投票后果是否胜利,如果胜利则间接晋升为 leader,开启 leader 相干的流程
  5. 如果投票还未完结,则开始给其余节点发送投票信息
  6. 这里的投票信息只是寄存在 raft.msgs 这个 slice 外面了,期待其余 goroutine 生产

那么到此,这里的发动投票环节完结,然而投票音讯还没有收回去,也没有接管投票后果,这部分逻辑就还要回归到 raftNode.serveChannelsnode.run 来看了

投票音讯发送
func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready

    r := n.rn.raft

    lead := None

    for {
        if advancec != nil {readyc = nil} else if n.rn.HasReady() {
      // 判断是否有音讯须要解决
            // Populate a Ready. Note that this Ready is not guaranteed to
            // actually be handled. We will arm readyc, but there's no guarantee
            // that we will actually send on it. It's possible that we will
            // service another channel instead, loop around, and then populate
            // the Ready again. We could instead force the previous Ready to be
            // handled first, but it's generally good to emit larger Readys plus
            // it simplifies testing (by emitting less frequently and more
            // predictably).
      // 获取 msg 和 raft 状态,组装成 ready 构造
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }

        .......

        select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }
    // 接管到 follower 的投票后果
    // 这里是网络组件相干的解决,当节点收到其余的网络申请的时候,会通过 recvc 通道传递过去解决
        case m := <-n.recvc:
            // filter out response message from unknown From.
            if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
        // 从新进入 Step 函数解决,留神这里的 m.Type == MsgVoteResp
                r.Step(m)
            }
        ......
    // 在这里,将下面组装好的 ready 构造体,通过 readyc 传给 raftNode.serveChannels 解决
        case readyc <- rd:
      // 更新 raft 状态和删除 msgs
            n.rn.acceptReady(rd)
            advancec = n.advancec
        case <-advancec:
            n.rn.Advance(rd)
            rd = Ready{}
            advancec = nil
        case c := <-n.status:
            c <- getStatus(r)
        case <-n.stop:
            close(n.done)
            return
        }
    }
}
投票后果解决
func (r *raft) Step(m pb.Message) error {
    ......

    switch m.Type {
    .....

    default:
        err := r.step(r, m)
        if err != nil {return err}
    }
    return nil
}

这里从新进入 Step 函数来解决音讯,然而这时候,收到的音讯类型是 MsgVoteResp,间接执行 default 步骤了

同时,此时节点的状态曾经发动投票了,所以节点晋升为 Candidate, r.step 对应的也就是 r.stepCandidate 函数了

func stepCandidate(r *raft, m pb.Message) error {
    ......
    case myVoteRespType:
    // 记录 follower 节点的投票后果,并计算投票的后果(输或赢)gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
        r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
        switch res {
    // 博得选举,则晋升为 leader 节点
        case quorum.VoteWon:
            if r.state == StatePreCandidate {r.campaign(campaignElection)
            } else {r.becomeLeader()
                r.bcastAppend()}
    // 输掉抉择,则降级为 follower
        case quorum.VoteLost:
            // pb.MsgPreVoteResp contains future term of pre-candidate
            // m.Term > r.Term; reuse r.Term
            r.becomeFollower(r.Term, None)
    // 其余状况下,就是票数还有余与判断选举后果,则持续期待其余节点的投票
        }
    ......
    return nil
}

stepCandidate 则将 follower 传递过去的投票后果记录,并从新统计选主的后果,如果获得成功取得超过一半的票数,则晋升为 leader,并告诉到各个节点;如果输掉选主,则降级为 follower;其余状况,就是选主还在持续进行中,还有其余节点没有投票,则持续期待其余节点投票

总结
  1. raft 外面会有一个定时器,定时触发,来推动选主逻辑的运行
  2. 每次定时器触发的时候,会对属性 electionElapsed++,当 electionElapsed 的值超过初始化时创立的 randomizedElectionTimeout,则认为 leader 发送心跳超时,开始触发选主逻辑
  3. follower 节点判断 preVote 的属性是否设置为 true,如果为 true 的话,则执行 preVote 逻辑,这里是为了防止相似于网络分区造成的节点数量有余集群的 1 / 2 造成的频繁选主
  4. follower 节点将本身设置为 Candidate,同时开始给本身投票
  5. 判断投票后果是否足以博得选举,如果能够的话,则晋升为 leader,并公布告诉
  6. 投票后果不足以博得选举的时候,将投票工作分发给其余各个节点
  7. 其余各个节点的投票后果通过 node.recvc 这个通道传给 raftNode,并在run 函数外面解决
  8. 每次收到其余节点的投票后果后,从新执行一下投票的判断,晓得博得选举或输掉选举

日志同步

数据处理申请

数据处理是通过 httpKVAPI 来解决的,用户的 Put method 申请时,就是创立或批改数据

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    key := r.RequestURI
    defer r.Body.Close()
    switch r.Method {
    case http.MethodPut:
        v, err := io.ReadAll(r.Body)
        if err != nil {log.Printf("Failed to read on PUT (%v)\n", err)
            http.Error(w, "Failed on PUT", http.StatusBadRequest)
            return
        }

        h.store.Propose(key, string(v))

        // Optimistic-- no waiting for ack from raft. Value is not yet
        // committed so a subsequent GET on the key may return old value
        w.WriteHeader(http.StatusNoContent)
......
}
func (s *kvstore) Propose(k string, v string) {
    var buf bytes.Buffer
    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {log.Fatal(err)
    }
    s.proposeC <- buf.String()}

用于创立或批改数据的申请,并不是间接存储到 kvstore 的,而是通过 proposeC 转给 raftNode 来解决的

raftNode 接收数据
func (rc *raftNode) serveChannels() {
  ......
    // send proposals over raft
    go func() {confChangeCount := uint64(0)

        for rc.proposeC != nil && rc.confChangeC != nil {
            select {
      // 接管 kvstore 传递过去的数据
            case prop, ok := <-rc.proposeC:
                if !ok {rc.proposeC = nil} else {
                    // blocks until accepted by raft state machine
                    rc.node.Propose(context.TODO(), []byte(prop))
                }
      ......
            }
        }
        // client closed channel; shutdown raft if not already
        close(rc.stopc)
    }()
func (n *node) Propose(ctx context.Context, data []byte) error {return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

func (n *node) stepWait(ctx context.Context, m pb.Message) error {return n.stepWithWaitOption(ctx, m, true)
}

func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    ......
    ch := n.propc
    pm := msgWithResult{m: m}
    if wait {pm.result = make(chan error, 1)
    }
    select {
  // 将数据封装好后,传给 node.propc 这个通道,交给 node.run 办法解决
    case ch <- pm:
    // 这里是 true,也就是不 return
        if !wait {return nil}
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
  // block 直到有后果
    select {
    case err := <-pm.result:
        if err != nil {return err}
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
    return nil
}

raftNode.serveChannels 开启了一个 goroutine 循环检测是无数有数据音讯,有音讯后就开始封装并通过 chan 传递给node.run 来解决了,并 block 住,晓得node.run 返回处理结果

接下来看下 node.run 的解决

func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready

    r := n.rn.raft

    lead := None

    for {
        ......

        select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
    // 这里就跟下面的选主一样,选主也是一个音讯,日志同步也是一个音讯,所以都走到了这里
        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
      // 解决实现后,将后果通过通道传递给下面,开释后面的 block 逻辑
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }
        ......
        }
    }
}
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    ......

    default:
    // 调用本身注册的 step 办法来解决
        err := r.step(r, m)
        if err != nil {return err}
    }
    return nil
}

这里会调用 r.step() 来解决,而 step 这个办法对应不同的角色有不同的实现

follower 对应 stepFollower

leader 对应 stepLeader

咱们首先看下 stepFollower

func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
    case pb.MsgProp:
        if r.lead == None {r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
            return ErrProposalDropped
        } else if r.disableProposalForwarding {r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
            return ErrProposalDropped
        }
        m.To = r.lead
    // 将音讯转发给 leader
        r.send(m)
    ......
    return nil
}

依据 stepFollower 的逻辑能够看到,当 follower 拿到创立批改数据的音讯的时候,间接将这个申请转发给 leader 来解决

接下来看下stepLeader,对应的 leader 拿到建批改数据的音讯时的解决形式

func stepLeader(r *raft, m pb.Message) error {
    ......
    case pb.MsgProp:
        if len(m.Entries) == 0 {r.logger.Panicf("%x stepped empty MsgProp", r.id)
        }
        if r.prs.Progress[r.id] == nil {
            // If we are not currently a member of the range (i.e. this node
            // was removed from the configuration while serving as leader),
            // drop any new proposals.
            return ErrProposalDropped
        }
        
    ......

        if !r.appendEntry(m.Entries...) {return ErrProposalDropped}
        r.bcastAppend()
        return nil
    ......
    return nil
}

stepLeader 这里的外围逻辑就两步

  1. 尝试追加追加操作到 raftLog 里,如果追加失败则返回
  2. 告诉其余节点追加数据
追加 raftLog
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {li := r.raftLog.lastIndex()
  // 设置每个数据的 Term 和 Index,以示意程序
    for i := range es {es[i].Term = r.Term
        es[i].Index = li + 1 + uint64(i)
    }
    // Track the size of this uncommitted proposal.
  // 判断以后 cucommit 的数据大小 + 以后数据大小 是否大于 定于的最大 uncommitSize,如果大于则失败
    if !r.increaseUncommittedSize(es) {
        r.logger.Debugf(
            "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
            r.id,
        )
        // Drop the proposal.
        return false
    }
    // use latest "last" index after truncate/append
  // 往 raftLog 外面追加数据
    li = r.raftLog.append(es...)
  // 更新以后节点的 Match 和 Next,这里的 Match 示意以后节点的 raftLog 的 Index,如果给的数据的 Index < Match,则表明数据有误,会回绝追加
    r.prs.Progress[r.id].MaybeUpdate(li)
    // Regardless of maybeCommit's return, our caller will call bcastAppend.
  // 尝试提交,只有当超过 1 /2+ 1 节点 ack 这个数据后,才会提交
    r.maybeCommit()
    return true
}

func (r *raft) maybeCommit() bool {
  // 这里就返回所有节点 ack 的 commit log 的 Index
    mci := r.prs.Committed()
    return r.raftLog.maybeCommit(mci, r.Term)
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
  // 当 ack 的 Index 大于 committed 的时候,才会提交
    if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {l.commitTo(maxIndex)
        return true
    }
    return false
}

当追加 raftLog 的时候,同时尝试提交这条数据到存储系统,这时候也会跟选主时一样,判断一下整个集群是否超过 1 /2+ 1 节点收到数据并批准

告诉其余节点追加数据

追加完 raftLog 之后,其余节点还都没有解决,这时候数据也不会 commit 到存储系统,所以要先告诉一下其余节点,让他们先存储上,而后 leader 能力 commit

func (r *raft) bcastAppend() {
  // 循环发送给其余节点
    r.prs.Visit(func(id uint64, _ *tracker.Progress) {
        if id == r.id {return}
        r.sendAppend(id)
    })
}

func (r *raft) sendAppend(to uint64) {r.maybeSendAppend(to, true)
}

func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {pr := r.prs.Progress[to]
    if pr.IsPaused() {return false}
    m := pb.Message{}
    m.To = to

    term, errt := r.raftLog.term(pr.Next - 1)
    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    if len(ents) == 0 && !sendIfEmpty {return false}

    ......
    // 定义数据音讯的 Type,Index,Term 和追加的数据
        m.Type = pb.MsgApp
        m.Index = pr.Next - 1
        m.LogTerm = term
        m.Entries = ents
        m.Commit = r.raftLog.committed
        if n := len(m.Entries); n != 0 {
            switch pr.State {
            // optimistically increase the next when in StateReplicate
            case tracker.StateReplicate:
                last := m.Entries[n-1].Index
                pr.OptimisticUpdate(last)
                pr.Inflights.Add(last)
            case tracker.StateProbe:
                pr.ProbeSent = true
            default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
            }
        }
    // 发送给塔器节点
    r.send(m)
    return true
}

发送给其余节点的时候,组装好数据音讯,就间接调用网络组件进行发送就行了

接下来就是接管 follower 节点返回的音讯了,leader 发送的音讯类型是 MsgApp,那么 follower 节点返回的音讯类型就是 MsgAppResp

那就持续回到node.run 办法看下收到 follower 节点的音讯的解决

解决 follower 的音讯解决响应

又见到老朋友

func (n *node) run() {
    ......

    for {
        ......

        select {
        ......
        case m := <-n.recvc:
            // filter out response message from unknown From.
            if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {r.Step(m)
            }
    ......
    }
  }
}

node.runcase m := <-n.recvc: 在选主的时候曾经见到过了,这个也是解决其余节点网络申请或详情的入口

func (r *raft) Step(m pb.Message) error {
    ......

    switch m.Type {
    ......

    default:
        err := r.step(r, m)
        if err != nil {return err}
    }
    return nil
}

func stepLeader(r *raft, m pb.Message) error {
    // These message types do not require any progress for m.From.
    switch m.Type {
    ......
    // All other message types require a progress for m.From (pr).
    pr := r.prs.Progress[m.From]
    if pr == nil {r.logger.Debugf("%x no progress available for %x", r.id, m.From)
        return nil
    }
    switch m.Type {
    case pb.MsgAppResp:
        pr.RecentActive = true

        if m.Reject {
            // 这时候数据音讯的追加被 follower 回绝了,同时 follower 计算出来一个可能的 Index 点
            r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
                r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
            nextProbeIdx := m.RejectHint
            if m.LogTerm > 0 {
                // 依据 follower 返回的 Index 点和 Term,找到可能的 Index 去改正 follower 的数据
                nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
            }
      // reject 的 follower 节点,状态先批改为 StateProbe,表明这个 follower 的 lastIndex 须要先去探测,不明
      // 批改这个 follower 节点的 Next
            if pr.MaybeDecrTo(m.Index, nextProbeIdx) {r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
                if pr.State == tracker.StateReplicate {pr.BecomeProbe()
                }
        // 从批改后的 Index 日志点,持续尝试往 follower 追加数据
                r.sendAppend(m.From)
            }
        } else {oldPaused := pr.IsPaused()
      // 尝试更新 follower 节点在 Index 和 Next 地位
            if pr.MaybeUpdate(m.Index) {
                switch {
        // follower 节点追加胜利的话,就不必持续探测了,状态变成 StateReplicate,示意失常的 follower
                case pr.State == tracker.StateProbe:
                    pr.BecomeReplicate()
                case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
                    // TODO(tbg): we should also enter this branch if a snapshot is
                    // received that is below pr.PendingSnapshot but which makes it
                    // possible to use the log again.
                    r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
                    // Transition back to replicating state via probing state
                    // (which takes the snapshot into account). If we didn't
                    // move to replicating state, that would only happen with
                    // the next round of appends (but there may not be a next
                    // round for a while, exposing an inconsistent RaftStatus).
                    pr.BecomeProbe()
                    pr.BecomeReplicate()
                case pr.State == tracker.StateReplicate:
                    pr.Inflights.FreeLE(m.Index)
                }

        // 尝试将数据进行提交
                if r.maybeCommit() {
                    // committed index has progressed for the term, so it is safe
                    // to respond to pending read index requests
                    releasePendingReadIndexMessages(r)
          // 再一次播送,这时候播送的数据的 Commit 和 Index 及 Entries 都有变动,目标是为了让 follower 提交数据
                    r.bcastAppend()} else if oldPaused {
                    // If we were paused before, this node may be missing the
                    // latest commit index, so send it.
                    r.sendAppend(m.From)
                }
                // We've updated flow control information above, which may
                // allow us to send multiple (size-limited) in-flight messages
                // at once (such as when transitioning from probe to
                // replicate, or when freeTo() covers multiple messages). If
                // we have more entries to send, send as many messages as we
                // can (without sending empty messages for the commit index)
                for r.maybeSendAppend(m.From, false) { }
                // Transfer leadership is in progress.
        // 节点迁徙相干
                if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
                    r.sendTimeoutNow(m.From)
                }
            }
        }
    ......
    }
    return nil
}

// 尝试更新 follower 的 Match 和 Next 的索引值
func (pr *Progress) MaybeUpdate(n uint64) bool {
    var updated bool
    if pr.Match < n {
        pr.Match = n
        updated = true
        pr.ProbeAcked()}
    pr.Next = max(pr.Next, n+1)
    return updated
}
  
func (r *raft) maybeCommit() bool {
  // MaybeUpdate 中会保留每个响应音讯的节点的 Match,这里就依据 Match 计算出来 1 /2+ 1 以上节点共识的 Match 的 Index 点
    mci := r.prs.Committed()
  // 依据计算出来的 Index 点,尝试 commit
    return r.raftLog.maybeCommit(mci, r.Term)
}
  
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
  // 当 Index 大于 提交的 Index 点的时候 并且 对应的 Term == raftLog.Term,才会进行提交
    if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {l.commitTo(maxIndex)
        return true
    }
    return false
}
  
func (l *raftLog) commitTo(tocommit uint64) {
    // never decrease commit
    if l.committed < tocommit {if l.lastIndex() < tocommit {l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
        }
    // 将 raftLog 的 commit 点批改
        l.committed = tocommit
    }
}

当 leader 收到 follower 的 MsgAppResp 的音讯的时候,就开始进行解决了

follower 返回的处理结果有两种状态

  • 胜利

    • 这时候尝试会更新返回音讯的节点在 leader 内存中存储的 Match 和 Next,也就是 leader 记录的 follower 节点的 raft log 的状态
    • 判断这条日志是否超过 1 /2+ 1 以上的节点接管并解决了
    • 如果超过了,则将这条日志 commit,而 commit 的逻辑,在这里仅仅只是更新了 raftLog 的 committed 的记录值(期待 node.run 来解决),同时向所有节点播送以此让 follower commit 数据
    • 没有超过则期待下一次解决
  • 回绝

    • 如果 follower 节点回绝,则 follower 节点的响应信息会携带可能匹配的 Index 和 Term
    • leader 节点依据 follower 节点的响应,找到可能 match 的 Index 地位,并跟 follower 协调
    • 反复后面两个步骤晓得 macth 到了 leader 和 follower 的日志 Index 地位,开始给 follower 同步日志
    • 最初再回到胜利状态的解决
follower 节点回绝追加

这里有一点须要阐明一下,当 follower 节点 MsgAppResp 的后果为 reject 的时候,阐明 leader 和 follower 日志不统一,所以这时候,leader 须要去探测 follower 与 leader 节点统一的 Index 点,并从探测并纠正后的点去开始追加,也就是 findConflictByTerm() 的逻辑

这里用源码中的图来展现一下

示例一:

idx               1 2 3 4 5 6 7 8 9
                                    -----------------
term (Leader)     1 3 3 3 5 5 5 5 5
term (Follower)   1 1 1 1 2 2       

假如这是初始状态的 Leader 节点日志和 Follower 节点日志

当 leader 节点须要将 Index = 9 的数据同步给 follower 的时候,follower 发现自己在 Index(7,8)都没有数据,也就是 Match 不上,就会 reject 这个申请,并找到比 Index = 9 点的 Term = 5 小的 Term 的数据,也就是 follower 的最初一条数据  Index = 6, Term = 2

这时候 leader 收到了 follower 节点的回绝音讯,和 Index,Term,如果 leader 从这个节点间接追加,必定也会失败,因为 Index = 6 的 leader 和 follower 的 Term 都不一样

所以 leader 会再做一个计算,leader.Term <= folllower.Term,也就找到 Index = 1 这个地位
综上 就是 findConflictByTerm() 这个函数的作用

示例二:

idx               1 2 3 4 5 6 7 8 9
                            -----------------
term (Leader)     1 3 3 3 3 3 3 3 7
term (Follower)   1 3 3 4 4 5 5 5 6

leader 须要追加 Index = 9 的数据到 follower

follower 回绝后,并返回 Term = 6 (leader.Term == 7 > 6)

leader 收到音讯后,对日志进行回溯,这时候发现 Index = 8 合乎(leader.Term == 3 < 6)而后 leader 从新开始追加

follower 又回绝了,因为 Index = 8 时,follower.Term = 5,而 leader.Term = 3,所以这时候 follower 须要找到一个小于等于 Term 3 的地位,也就会回溯了 Index = 3
这时候 leader 从这里持续追加日志就胜利了
commit 数据

看到下面可能会有点纳闷,commit 数据如同啥都没干,就是 commit+1,这里就要回到 node.run 来看了;回顾选主的流程,选主的音讯是放到了 raft.msgs 外面,而后node.run 外面的一个协程循环读取,而后开始解决的,那么 commit 数据的逻辑是不是一样的呢

func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready

    r := n.rn.raft

    lead := None

    for {
        if advancec != nil {readyc = nil} else if n.rn.HasReady() {rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }
  ......
    select {
      ......
      // 将数据通过 readyc 发送给 serveChannels 来解决
      case readyc <- rd:
                n.rn.acceptReady(rd)
                advancec = n.advancec
      ......
    }
    }
}

func (rn *RawNode) HasReady() bool {
    r := rn.raft
    ......
  // 这里判断,是否有音讯(对应抉择的音讯)// hasNextEnts 则判断有没有 commit 然而还没有 apply 的数据
    if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {return true}
    if len(r.readStates) != 0 {return true}
    return false
}

// 判断是否有 commit 但为 apply 的数据
func (l *raftLog) hasNextEnts() bool {
  // 判断 commit 的 Index 是不是大于 apply 的 Index
    off := max(l.applied+1, l.firstIndex())
    return l.committed+1 > off
}

node.run 这里循环判断是否有须要 commit 的日志或追加的 msgs,如果有的话,则通过 readyc 发送给 serveChannels 来解决

接下来看下serveChannels 的解决

func (rc *raftNode) serveChannels() {

    ......

    // event loop on raft state machine updates
    for {
        select {
        ......

        // store raft entries to wal, then publish over commit channel
        case rd := <-rc.node.Ready():
            // Must save the snapshot file and WAL snapshot entry before saving any other entries
            // or hardstate to ensure that recovery after a snapshot restore is possible.
      ......

      // 存储到 storage
            rc.raftStorage.Append(rd.Entries)
      // 这里没有 Messages 了,都是 Entries,所以这里能够疏忽
            rc.transport.Send(rc.processMessages(rd.Messages))
      // apply 到 kvstore
            applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
            if !ok {rc.stop()
                return
            }
            rc.maybeTriggerSnapshot(applyDoneC)
            rc.node.Advance()

        case err := <-rc.transport.ErrorC:
            rc.writeError(err)
            return

        case <-rc.stopc:
            rc.stop()
            return
        }
    }
}

serveChannels 接管到 node.run 捞取到的 Entries 后,就开始将日志追加到 wal 和 storeage 外面,而后通过publishEntries 存储到 kvstore

接下来看下publishEntries 做了什么

func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) {if len(ents) == 0 {return nil, true}

    data := make([]string, 0, len(ents))
  // 组装 data
    for i := range ents {switch ents[i].Type {
        case raftpb.EntryNormal:
            if len(ents[i].Data) == 0 {
                // ignore empty messages
                break
            }
            s := string(ents[i].Data)
            data = append(data, s)
        ......
        }
    }
    var applyDoneC chan struct{}

    if len(data) > 0 {applyDoneC = make(chan struct{}, 1)
        select {
    // 将组装好的 data 通过 commitc 传给 kvstore 解决,当 kvstore 解决实现后,再通过 applyDoneC 通道告诉过去
        case rc.commitC <- &commit{data, applyDoneC}:
        case <-rc.stopc:
            return nil, false
        }
    }

    // after commit, update appliedIndex
    rc.appliedIndex = ents[len(ents)-1].Index

    return applyDoneC, true
}

publishEntries 将数据组装好后,通过 commitC 传给 kvstore,而后更新掉 raftNode.appliedIndex,同时返回一个 chan,以便 kvstore 告诉 raftNode 数据存储胜利

readCommits 就比较简单了,收到数据存储到本身即可,同时敞开下层 raftNode 传递过去的通道,已告诉下层 raftNode 存储实现

func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
    for commit := range commitC {
    // commit == nil 的时候,kvstore 从 snapshot 文件外面复原数据
        if commit == nil {
            // signaled to load snapshot
            snapshot, err := s.loadSnapshot()
            if err != nil {log.Panic(err)
            }
            if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
                if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)
                }
            }
            continue
        }
        // 将数据存储到 kvstore 外面
        for _, data := range commit.data {
            var dataKv kv
            dec := gob.NewDecoder(bytes.NewBufferString(data))
            if err := dec.Decode(&dataKv); err != nil {log.Fatalf("raftexample: could not decode message (%v)", err)
            }
            s.mu.Lock()
            s.kvStore[dataKv.Key] = dataKv.Val
            s.mu.Unlock()}
    // 敞开 applyDoneC,以告诉下层模块解决实现
        close(commit.applyDoneC)
    }
    if err, ok := <-errorC; ok {log.Fatal(err)
    }
}

总结

咱们外围目标是为了理解 raft 协定,所以这里仅仅解析了 leader 选举和日志同步的能力,还有 wal 日志、snapshot、网络组件,咱们都没有解析,有趣味的同学能够本人追踪看看

整个 raftExample 的实现比拟简答,追踪起来也比拟容易,外围的代码逻辑次要集中在了 node.runraftNode.serveChannelsstepXXX 外面,代码分层也比拟清晰

正文完
 0