说到raftexample,很多人可能很生疏,我晓得raft,我也晓得example,哪来的raftexample?这里做下简略的介绍,raftexample是etcd外面 raft模块实现的简略示例,它实现了一个简略的基于raft协定的kvstore存储集群零碎,并提供了rest api以供应用
而raftexample仅有以下几个文件,几百行代码,通过浏览,能够帮忙咱们更好的了解raft协定,投入产出比超大
演示动画
官网举荐的动画演示地址:http://thesecretlivesofdata.c...
leader选举
日志同步
概念解析
逻辑时钟
逻辑时钟其实是一个定时器 time.Tick
,每隔一段时间触发一次,是推动raft选主的外围,在这外面有几个属性先理解下
electionElapsed:逻辑时钟推动计数,follower和leader没推动一次逻辑时钟,这个数值就会+1;follower收到leader的心跳音讯后会重置为0;leader则在每次发送心跳前重置为0
heartbeatElapsed:leader的逻辑时钟推动计数,不仅仅会减少electionElapsed计数,还会减少heartbeatElapsed的计数
heartbeatTimeout:当leader的heartbeatElapsed计数达到heartbeatTimeout预约义的值的时候,就会向各个节点发送一次心跳
electionTimeout:当leader的electionElapsed的逻辑时钟推动次数超过这个值的时候,如果leader同时开启了checkQuorum来判断以后集群各节点的存活状态时,这时候leader就会进行探活(探活不会发动网络申请,靠本身存储的各个节点状态)
randomizedElectionTimeout:随机生产的一个值,当follower的electionElapsed计数达到这个值的时候,就会开始发动新一轮选举
所以,能够看进去,逻辑时钟次要是推动leader心跳和探活、follower的选举的
follower发动选举的条件:electionElapsed >= randomizedElectionTimeout
leader发送心跳的条件:heartbeatElapsed >= heartbeatTimeout
leader发动集群节点探活的条件:electionElapsed > electionTimeout
这里就有一个问题了,leader节点为什么会有electionElapsed 和 heartbeatElapsed 两个计数,一个不就能够了吗?
其实是因为,当leader发动探活或者计数满足探活条件的时候,electionElapsed 就会被置为0,所以对于leader而言,electionElapsed 跟 heartbeatElapsed 的值并不统一,也不同步
raft.Peer
Peer: 集群中的节点,每一个退出集群的利用过程,都是一个节点
type Peer struct { ID uint64 Context []byte}
ID: 节点的id,过程初始化的时候,会给集群中的每个节点调配一个ID
Context: 上下文
raftpb.Message
Message: 这是raftexample(前面简称raft来代替)中的一个重要的构造体,是所有音讯的形象,包含且不限于选举/增加数据/配置变更,都是一种音讯类型
type Message struct { Type MessageType To uint64 From uint64 Term uint64 LogTerm uint64 Index uint64 Entries []Entry Commit uint64 Snapshot Snapshot Reject bool RejectHint uint64 Context []byte }
Type: 音讯类型,raft中就是依据不同的音讯类型来实现不同的逻辑解决的
- MsgHup MessageType = 0 // follower 节点认为leader挂了的时候,发动选举
- MsgBeat MessageType = 1 // leader才会发送的本地音讯,raft初始化时会定义一个心跳超时的次数,当逻辑时钟推动的次数超过定义的心跳超时次数后,就会触发这个音讯,而后leader会向follower发送MsgHeartbeat音讯
- MsgProp MessageType = 2 // 写入数据或批改集群配置的音讯类型
- MsgApp MessageType = 3 // leader向follower播送追加日志的音讯
- MsgAppResp MessageType = 4 // follower 响应leader的追加日志申请的音讯类型
- MsgVote MessageType = 5 // candinator 发送选举命令的音讯类型
- MsgVoteResp MessageType = 6 // 其余节点响应candinator 选举命令的音讯类型
- MsgSnap MessageType = 7 // leader向follower发送快照数据的音讯类型
- MsgHeartbeat MessageType = 8 // leader向follower发送的心跳音讯
- MsgHeartbeatResp MessageType = 9 // follower响应的心跳音讯
- MsgUnreachable MessageType = 10 // 音讯不可达,本地音讯
- MsgSnapStatus MessageType = 11 // 报告发送给follower节点的Snap音讯状态,是否发送胜利
- MsgCheckQuorum MessageType = 12 // 这个也是本地音讯,用于leader判断以后存活节点的状态,如果不超过一半节点存活则降为follower节点
- MsgTransferLeader MessageType = 13 // 转移leader权
- MsgTimeoutNow MessageType = 14 // 当发送MsgTransferLeader的follower的日志与leader始终的时候,leader发送MsgTimeoutNow给这个follower,follower开始进行选举
- MsgReadIndex MessageType = 15 // follower节点向leader节点申请获取commit index的地位
- MsgReadIndexResp MessageType = 16 // leader节点响应MsgReadIndex
- MsgPreVote MessageType = 17 // preVote音讯是Vote音讯前可选的音讯,如果开启了preVote,则follower在转成candidator前进行一次preVote与投票,这时候Term任期不会减少,防止因为网络分区造成有余1/2的分区节点的频繁选主
- MsgPreVoteResp MessageType = 18 // 其余节点对MsgPreVote的响应
To: 音讯是发送给的节点的ID
From: 音讯发送方节点的ID
Term: 以后的任期
LogTerm:当leader发送给follower节点日志的时候,follower节点回绝的时候,会匹配到一个适合的日志地位尝试让leader开始同步,这个日志点的日志对应的Term就是LogTerm
Index: 音讯在日志中的Index
Entries: 日志记录
Commit: 音讯发送节点的日志commit的地位
Snapshot:在传输快照时的快照信息
Reject:节点是否回绝收到的音讯;例如,follower收到leader的MsgApp音讯的时候,发现日志不能间接追加,就会回绝掉leader的这个日志同步音讯
RejectHint:follower回绝掉leader节点音讯后,计算出来的一个可能匹配leader日志的索引地位
Context:上下文
raftpb.Entry
Entry: 就是常说的日志
type Entry struct { Term uint64 Index uint64 Type EntryType Data []byte}
Term:这条日志对应的任期,每个日志都是由leader同步过去的,而leader都有一个任期,这个就是同步日志时leader的任期
Index:索引,也是每条日志的一个标识
Type:日志的类型,EntryNormal 示意惯例日志;EntryConfChange/EntryConfChangeV2 示意配置变更的日志
Data:就是日志存储的数据
简略看两个demo
leader和follower 日志不同步的状况idx 1 2 3 4 5 6 7 8 9 -----------------term (Leader) 1 3 3 3 5 5 5 5 5term (Follower) 1 1 1 1 2 2 leader和follower 日志同步的状况idx 1 2 3 4 5 6 7 8 9 -----------------term (Leader) 1 3 3 3 5 5 5 5 5term (Follower) 1 3 3 3 5 5 5 5 5
leader和follower节点日志的同步就是通过Entries 外面每个Entry的Index和Term是否雷同来判断的
留神
- 前面所有的数据增删改查、配置增删、选举等音讯统称为消息日志或日志
- 在日志同步这里,咱们会疏忽wal日志和snapshot相干的内容
源码解析
构造体介绍
raftNode
type raftNode struct { // 接管kvstore的存储数据日志 proposeC <-chan string // proposed messages (k,v) // 接管kvHttpApi的配置变更日志 confChangeC <-chan raftpb.ConfChange // proposed cluster config changes // 日志同步到kvstore commitC chan<- *commit // entries committed to log (k,v) // 模块间出错的音讯告诉通道 errorC chan<- error // errors from raft session // 节点的id id int // client ID for raft session // 以后集群下的所有节点ip:ports peers []string // raft peer URLs // 以后节点是否接入一个集群,启动时候依据这个判断是重启还是启动一个新的节点 join bool // node is joining an existing cluster // wal 日志目录 waldir string // path to WAL directory // snapshot 目录 snapdir string // path to snapshot directory // 获取snapshot的办法 getSnapshot func() ([]byte, error) // 用于集群的状态 confState raftpb.ConfState // snapshot 的日志的Index snapshotIndex uint64 // 曾经applied的日志的Index appliedIndex uint64 // raft backing for the commit/error channel // node的实例,实现了Node接口的办法 node raft.Node // storage实例 raftStorage *raft.MemoryStorage // wal实例 wal *wal.WAL // snapshot实例,治理snapshot snapshotter *snap.Snapshotter // 与snapshot交互的通道,判断snapshot实例是否创立实现 snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready // 两次snapshot之间的apply的日志最小条数 // 当上一次snapshot之后,apply的日志超过这个数量,就会开启新一轮snapshot,用于及时开释wal和raftLog中的存储压力 snapCount uint64 // 网络组件 transport *rafthttp.Transport // 通道告诉敞开serveChannel,敞开网络组件 stopc chan struct{} // signals proposal channel closed // 敞开raft node的http服务 httpstopc chan struct{} // signals http server to shutdown // raft node 的http敞开胜利后告诉其余模块的通道 httpdonec chan struct{} // signals http server shutdown complete // 日志组件 logger *zap.Logger}
raft
type raft struct { // 节点id id uint64 // 以后节点的Term 任期 Term uint64 // 以后节点在选举时,投票给了谁,初始化时为0,也就是谁都没有投给 Vote uint64 // 与readIndex申请无关,这里不多做介绍 readStates []ReadState // the log raftLog *raftLog // 单条音讯最大的size maxMsgSize uint64 // 最大的uncommit 日志数量,当uncommit日志数量大于这个值的时候,就不再追加日志了 maxUncommittedSize uint64 // TODO(tbg): rename to trk. // 集中群各个节点状态,蕴含了节点日志复制状况等,上面会独自介绍一下 prs tracker.ProgressTracker // 状态,follower、candidator、leader等 state StateType // isLearner is true if the local raft node is a learner. isLearner bool // 记录了以后节点待发送的音讯,这里的音讯会被及时生产 msgs []pb.Message // the leader id lead uint64 // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. // leader转换对象的id leadTransferee uint64 // Only one conf change may be pending (in the log, but not yet // applied) at a time. This is enforced via pendingConfIndex, which // is set to a value >= the log index of the latest pending // configuration change (if any). Config changes are only allowed to // be proposed if the leader's applied index is greater than this // value. pendingConfIndex uint64 // an estimate of the size of the uncommitted tail of the Raft log. Used to // prevent unbounded log growth. Only maintained by the leader. Reset on // term changes. // 未commit的日志的数量 uncommittedSize uint64 readOnly *readOnly // number of ticks since it reached last electionTimeout when it is leader // or candidate. // number of ticks since it reached last electionTimeout or received a // valid message from current leader when it is a follower. electionElapsed int // number of ticks since it reached last heartbeatTimeout. // only leader keeps heartbeatElapsed. heartbeatElapsed int // 是否开启节点探活 checkQuorum bool // 是否开启preVote preVote bool heartbeatTimeout int electionTimeout int // randomizedElectionTimeout is a random number between // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. randomizedElectionTimeout int // 是否不容许数据转发给leader,开启的话,follower节点收到的数据日志,就间接抛弃掉,而不会转发给leader disableProposalForwarding bool // 逻辑时钟办法,leader对应tickHeartbeat,follower和candidate对应tickElection tick func() // 解决音讯的办法,leader对应stepLeader,follower对应stepFollower,candidate对应stepCandidate step stepFunc logger Logger // pendingReadIndexMessages is used to store messages of type MsgReadIndex // that can't be answered as new leader didn't committed any log in // current term. Those will be handled as fast as first log is committed in // current term. pendingReadIndexMessages []pb.Message}
这里有两个点解释下:
checkQuorum:这个开关是用于leader判断各个节点的沉闷状态的,leader给follower发送信息的时候都会设置一个沉闷态,然而如果呈现网络分区的状况下,leader所在的分区小于1/2节点,那么这个leader也就没用了,能够被动将本人降级为follower节点
preVote:preVote也是为了网络分区的状况设置的,当网络分区后,某个分区外面的节点数小于1/2节点数,则follower会频繁的发动选举,发动选举时就会将本人的Term +1,然而并不会选举胜利,所以会陷入一个循环:发动选举->term+1->选举失败->发动选举->term+1......,当网络分区接触时,这个Term可能曾经很大了,合并后真正的leader的Term可能都没有分区外面的follower的Term大,就会影响日志的准确性了,所以网络分区中的节点会先发动preVote,这时候Term不变,preVote胜利后才会真正的发动选举流程
raftLog
raftLog是节点中治理日志的组件
type raftLog struct { // storage contains all stable entries since the last snapshot. // storage组件,外面存储了snapshot之后的所有的stable日志记录 storage Storage // unstable contains all unstable entries and snapshot. // they will be saved into storage. // 记录了所有unstable的日志记录和snapshot unstable unstable // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. // commit 日志的最大的Index的值,这个是统计的storage组件外面的 committed uint64 // applied is the highest log position that the application has // been instructed to apply to its state machine. // Invariant: applied <= committed // 曾经applied日志的最大的Index的值 applied uint64 logger Logger // maxNextEntsSize is the maximum number aggregate byte size of the messages // returned from calls to nextEnts. maxNextEntsSize uint64}
// Invariant: applied <= committed
在官网的applied字段的正文上,有这么一段话:applied <= committed 恒成立,这是为什么
咱们先简略理解一下日志的生命周期,前面跟踪日志同步的时候再具体介绍
用户提交一个创立数据的申请 -> kvstore生成一条数据日志 ->日志存储到unstable构造中 -> 日志追加到storage外面 -> 日志同步给其余节点 -> 节点同步实现,commit 日志,更新 committed 地位 -> kvstore 存储 -> 更新 applied 地位
启动流程
func main() { cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") id := flag.Int("id", 1, "node ID") kvport := flag.Int("port", 9121, "key-value server port") join := flag.Bool("join", false, "join an existing cluster") flag.Parse() // 创立proposeC和confChangeC通道,这两个通道用于kvstore raftNode http三个模块的交互,所以在里面创立 proposeC := make(chan string) defer close(proposeC) confChangeC := make(chan raftpb.ConfChange) defer close(confChangeC) // raft provides a commit stream for the proposals from the http api var kvs *kvstore getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() } // 启动raftNode模块 commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) // 启动kvstore模块 kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // the key-value http handler will propose updates to raft // 启动httpKVApi模块,用于接管申请 serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)}
raftexample的启动流程是比较简单的,将所有模块启动,并创立了各个模块之间的交互通道,便于各个模块间通信,并且实现了个股模块之间的解耦
- raftNode: raftexample的外围模块,提供了raft协定,选主、日志同步等能力
- kvstore: raftexample 的存储模块,最终提交的日志,都会存储在这外面,在raftexample里,这个存储系统实质是个map
- httpKvApi: 提供的与外界交互的api,能够通过httpapi来实现存储数据的创立批改和删除以及集群节点的减少和删除
咱们这里依照httpKvApi->kvstore->raftNode 由简入深的节奏解析
httpKVAPI模块
httpKvApi是提供了一个http服务,通过不同的method来操作不同的对象(数据和集群节点),实现了数据和节点的增删改查
启动
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) { // 定义http server srv := http.Server{ Addr: ":" + strconv.Itoa(port), Handler: &httpKVAPI{ store: kv, confChangeC: confChangeC, }, } go func() { // 启动http server if err := srv.ListenAndServe(); err != nil { log.Fatal(err) } }() // exit when raft goes down // 与上层的raft通信,如果raft挂了,则本模块也要退出 if err, ok := <-errorC; ok { log.Fatal(err) }}
启动的逻辑比较简单
异步启动了一个http服务,并且阻塞监听error chan,第一防止过程退出,第二档raft模块退出后,http也能够及时退出
申请解决
申请解决对立集中在了ServeHTTP这一个办法外面
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { key := r.RequestURI defer r.Body.Close() switch r.Method { // Put形式,则表明是数据减少或批改 case http.MethodPut: v, err := io.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on PUT (%v)\n", err) http.Error(w, "Failed on PUT", http.StatusBadRequest) return } h.store.Propose(key, string(v)) // Optimistic-- no waiting for ack from raft. Value is not yet // committed so a subsequent GET on the key may return old value // 这里间接返回,而底层则会进行数据一致性的解决等操作,所以如果立刻进行GET申请的话,还是有可能获取老的数据 w.WriteHeader(http.StatusNoContent) case http.MethodGet: // Get形式,间接到kvstore外面获取数据的value if v, ok := h.store.Lookup(key); ok { w.Write([]byte(v)) } else { http.Error(w, "Failed to GET", http.StatusNotFound) } case http.MethodPost: // Post形式是减少集群节点,解析进去nodeId,并通过confChangeC传给raftNode url, err := io.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on POST (%v)\n", err) http.Error(w, "Failed on POST", http.StatusBadRequest) return } nodeId, err := strconv.ParseUint(key[1:], 0, 64) if err != nil { log.Printf("Failed to convert ID for conf change (%v)\n", err) http.Error(w, "Failed on POST", http.StatusBadRequest) return } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, NodeID: nodeId, Context: url, } h.confChangeC <- cc // As above, optimistic that raft will apply the conf change w.WriteHeader(http.StatusNoContent) case http.MethodDelete: // Delete形式则是删除一个集群节点,并通过confChangeC传给raftNode来解决 nodeId, err := strconv.ParseUint(key[1:], 0, 64) if err != nil { log.Printf("Failed to convert ID for conf change (%v)\n", err) http.Error(w, "Failed on DELETE", http.StatusBadRequest) return } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, NodeID: nodeId, } h.confChangeC <- cc // As above, optimistic that raft will apply the conf change w.WriteHeader(http.StatusNoContent) default: w.Header().Set("Allow", http.MethodPut) w.Header().Add("Allow", http.MethodGet) w.Header().Add("Allow", http.MethodPost) w.Header().Add("Allow", http.MethodDelete) http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) }}
下面就是httpKvApi的所有解决的状况了
- Put申请:增加或批改kvstore外面存储的数据,但这里并不是间接批改,而是调用
h.store.Propose
(即kvstore.Propose
) 来解决,kvstore.Propose
则是通过proposeC将数据给到raftNode来保证数据一致性后再提交保留到kvstore外面,这里前面会具体解析,也是raft的外围 - Get申请:这个办法就比较简单了,间接到kvstore外面读取数据,而Put办法则是增加和批改数据,然而Put不间接批改,所以如果Put后立即获取某个key的value,则有可能raftNode还没有解决完提交到kvstore外面,而导致读取的还是老的数据
- Post申请:增加一个集群节点,这个节点会被leader通晓并将此节点退出follower
- Delete申请:删除一个节点,也是交给leader来解决
kvstore模块
创立kvstore
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore { // 实例化kvstore,example外面存储系统就是一个map s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter} // 加载snapshot snapshot, err := s.loadSnapshot() if err != nil { log.Panic(err) } // 如果snapshot不为空,则先从snapshot外面把数据恢复 if snapshot != nil { log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) if err := s.recoverFromSnapshot(snapshot.Data); err != nil { log.Panic(err) } } // read commits from raft into kvStore map until error // 开启一个goroutine,读取从raftNode外面提交的commit数据 go s.readCommits(commitC, errorC) return s}// 加载snapshotfunc (s *kvstore) loadSnapshot() (*raftpb.Snapshot, error) { snapshot, err := s.snapshotter.Load() // ErrNoSnapshot 示意没有snapshot,这里消化掉err,防止下层模块退出 if err == snap.ErrNoSnapshot { return nil, nil } if err != nil { return nil, err } // 找到了,则返回 return snapshot, nil}func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { return s.loadMatching(func(*raftpb.Snapshot) bool { return true })}func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) { // 这里返回snapshot的文件列表,依照工夫排序,也就是从新到旧排序 names, err := s.snapNames() if err != nil { return nil, err } var snap *raftpb.Snapshot // 遍历snapshot文件,其实也就是读取最新的文件 for _, name := range names { // loadSnap就不追了,就是读取snapshot文件的数据,并反序列化为构造体对象 if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) { return snap, nil } } // 没找到获取没有适合的snapshot,就返回ErrNoSnapshot return nil, ErrNoSnapshot}// 从snapshot外面复原数据到kvstore,这里的入参是下面snapshot.Data,也就是[]bytefunc (s *kvstore) recoverFromSnapshot(snapshot []byte) error { var store map[string]string if err := json.Unmarshal(snapshot, &store); err != nil { return err } // 反序列化为map后,间接赋值给kvstore s.mu.Lock() defer s.mu.Unlock() s.kvStore = store return nil}
创立的逻辑:
- 首先实例化kvstore这个构造体,并实例化存储系统,在example外面也就是一个map
- 而后本地寻找是否有snapshot,并读取最新的snapshot,反序列化为snapshot构造体
- 如果找到了snapshot,则将snapshot.Data反序列化为map,给到kvstore,也就从snapshot外面复原了kvstore的存储数据
- 最初开启一个goroutine,通过commitC与raftNode交互,读取数据并存储
数据存储
下面说了,kvstore会开启一个通道chan与raftNode通信,读取数据并存储,这里看下具体的逻辑
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) { for commit := range commitC { // 如果读取到nil,则从snapshot外面再次复原数据 if commit == nil { // signaled to load snapshot snapshot, err := s.loadSnapshot() if err != nil { log.Panic(err) } if snapshot != nil { log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) if err := s.recoverFromSnapshot(snapshot.Data); err != nil { log.Panic(err) } } continue } // 读取数据 for _, data := range commit.data { var dataKv kv dec := gob.NewDecoder(bytes.NewBufferString(data)) if err := dec.Decode(&dataKv); err != nil { log.Fatalf("raftexample: could not decode message (%v)", err) } s.mu.Lock() s.kvStore[dataKv.Key] = dataKv.Val s.mu.Unlock() } // 敞开commit的chan,以告诉下层模块存储实现 close(commit.applyDoneC) } // 如果遇到error chan,跟httpkvapi模块一样,退出以后模块 if err, ok := <-errorC; ok { log.Fatal(err) }}
commitC 这个通道和解决逻辑就比较简单了
这个goroutine就卡在这个chan,有数据过去就进行解决,并告诉下层模块解决实现
如果raftNode遇到异样退出了,则通过error chan告诉,kvstore模块也就退出了
数据查找
func (s *kvstore) Lookup(key string) (string, bool) { s.mu.RLock() defer s.mu.RUnlock() v, ok := s.kvStore[key] return v, ok}
查找就比较简单了,数据结构就是一个map,间接从map外面读数据就行了
数据提交
后面说过数据存储,那么数据提交和数据存储是什么关系,为什么会有两个办法来解决数据呢?
其实raft协定外面咱们能够晓得,当用户提交一个数据创立/批改的申请的时候,首先把数据提交过去,而后raftNode会把数据告诉到上面的各个节点,当数据有一半节点以上接管到的时候,才会真正存储到存储系统外面
func (s *kvstore) Propose(k string, v string) { var buf bytes.Buffer if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil { log.Fatal(err) } s.proposeC <- buf.String()}
所以,这里的数据提交,其实就是通过proposeC给到raftNode,raftNode解决完后,再通过commitC给到kvstore存储起来
总结&备注
- 这里的map+lock形式,效率会低很多,不如间接用sync.Map;当然这里只是个example,也就不须要思考那么多
- 数据提交存储流程:当用户通过httpkvapi创立一个数据批改/创立的申请的时候,kvstore会先把这个数据通过proposeC转发给raftNode,而后raftNode解决实现后才会真正存储到数据库(也就是本身的map)
raftNode模块
启动raftNode
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) { commitC := make(chan *commit) errorC := make(chan error) rc := &raftNode{ proposeC: proposeC, confChangeC: confChangeC, commitC: commitC, errorC: errorC, id: id, peers: peers, join: join, waldir: fmt.Sprintf("raftexample-%d", id), snapdir: fmt.Sprintf("raftexample-%d-snap", id), getSnapshot: getSnapshot, snapCount: defaultSnapshotCount, stopc: make(chan struct{}), httpstopc: make(chan struct{}), httpdonec: make(chan struct{}), logger: zap.NewExample(), snapshotterReady: make(chan *snap.Snapshotter, 1), // rest of structure populated after WAL replay } // 异步启动raft go rc.startRaft() // 返回commitC,errorC rc.snapshotterReady,用于跟其余模块通信 return commitC, errorC, rc.snapshotterReady}func (rc *raftNode) startRaft() { if !fileutil.Exist(rc.snapdir) { if err := os.Mkdir(rc.snapdir, 0750); err != nil { log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) } } rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir) oldwal := wal.Exist(rc.waldir) rc.wal = rc.replayWAL() // signal replay has finishei // 告诉其余模块 snapshotter初始化实现 rc.snapshotterReady <- rc.snapshotter rpeers := make([]raft.Peer, len(rc.peers)) for i := range rpeers { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } // 初始化raft 的相干配置,用于启动raft c := &raft.Config{ ID: uint64(rc.id), ElectionTick: 10, HeartbeatTick: 1, Storage: rc.raftStorage, MaxSizePerMsg: 1024 * 1024, MaxInflightMsgs: 256, MaxUncommittedEntriesSize: 1 << 30, } // 依据oldwal 和 join,判断是新启动的节点还是重启的节点 // 如果是重启,则能够从snapshot外面复原数据 if oldwal || rc.join { rc.node = raft.RestartNode(c) } else { rc.node = raft.StartNode(c, rpeers) } // 初始化网络组件 rc.transport = &rafthttp.Transport{ Logger: rc.logger, ID: types.ID(rc.id), ClusterID: 0x1000, Raft: rc, ServerStats: stats.NewServerStats("", ""), LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)), ErrorC: make(chan error), } rc.transport.Start() // 启动与各个节点的网络pipeline通信通道 for i := range rc.peers { if i+1 != rc.id { rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) } } go rc.serveRaft() go rc.serveChannels()}
raftNode启动流程:
- 启动raft,返回commitC与kvstore通信,errorC与其余模块通信,以告诉异样及时退出,snapshotterReady chan与kvstore通信以告诉snapshot初始化实现
- 初始化snapshotter,回放wal日志
- 初始化raft.Config,依据wal目录是否存在与join配置来判断是新节点还是旧的节点,旧的节点则从snapshot外面复原数据即可
- 初始化网络组件,并与各个节点进行通信,初始化各个节点的pipeline通道
- 启动raft server服务
- 启动raftNode的各个通道,与各个模块间进行通信
启动node
func StartNode(c *Config, peers []Peer) Node { if len(peers) == 0 { panic("no peers given; use RestartNode instead") } // 初始化node节点,并初始化配置 rn, err := NewRawNode(c) if err != nil { panic(err) } // 将各个节点的信息存储到raftLog外面,期待日志同步,而后周知各个节点配置变更的音讯 err = rn.Bootstrap(peers) if err != nil { c.Logger.Warningf("error occurred during starting a new node: %v", err) } // 实例化node节点 n := newNode(rn) // node就开始在后盾异步运行了,直至退出 go n.run() return &n}func NewRawNode(config *Config) (*RawNode, error) { // 依据配置,实例化raft r := newRaft(config) rn := &RawNode{ raft: r, } // 初始化Leader, State, Term Vote CommitIndex 等raft本身属性 rn.prevSoftSt = r.softState() rn.prevHardSt = r.hardState() return rn, nil}func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) } r := &raft{ id: c.ID, lead: None, isLearner: false, raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, checkQuorum: c.CheckQuorum, preVote: c.PreVote, readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, } // 初始化了各个节点在以后节点的状态及存储信息等,包含投票信息,日志commit节点,是否沉闷等 cfg, prs, err := confchange.Restore(confchange.Changer{ Tracker: r.prs, LastIndex: raftlog.lastIndex(), }, cs) if err != nil { panic(err) } assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs)) if !IsEmptyHardState(hs) { r.loadState(hs) } // 更新raftLog的apply index if c.Applied > 0 { raftlog.appliedTo(c.Applied) } // 启动后,就会变更集群中的follower节点 r.becomeFollower(r.Term, None) var nodesStrs []string for _, n := range r.prs.VoterNodes() { nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) return r}// newNode就初始化了各个chan,与各个模块进行通信func newNode(rn *RawNode) node { return node{ propc: make(chan msgWithResult), recvc: make(chan pb.Message), confc: make(chan pb.ConfChangeV2), confstatec: make(chan pb.ConfState), readyc: make(chan Ready), advancec: make(chan struct{}), // make tickc a buffered chan, so raft node can buffer some ticks when the node // is busy processing raft messages. Raft node will resume process buffered // ticks when it becomes idle. tickc: make(chan struct{}, 128), done: make(chan struct{}), stop: make(chan struct{}), status: make(chan chan Status), rn: rn, }}
Node节点的启动流程:
- 实例化node节点,并初始化softState和hardState,也就是Leader, State, Term Vote CommitIndex 等raft本身属性
- 实例化raft,初始化了raftLog,而后捞取storage的hardState和配置信息
- 初始化了各个节点在以后节点的状态及存储信息等,包含投票信息,日志commit节点,是否沉闷等
- 依据storage捞取的hardState更新节点的hardState
- 降级为follower
启动serverRaft
func (rc *raftNode) serveRaft() { url, err := url.Parse(rc.peers[rc.id-1]) if err != nil { log.Fatalf("raftexample: Failed parsing URL (%v)", err) } ln, err := newStoppableListener(url.Host, rc.httpstopc) if err != nil { log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err) } err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln) select { case <-rc.httpstopc: default: log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err) } close(rc.httpdonec)}
serverRaft就是启动了一个http服务,用于各个节点间通信;这里跟httpKvApi不同,httpKvApi只有是用于与集群通信,而raft server则是各个节点间选主,日志同步等到通信
启动流程介绍完了后,各个对象也都筹备好了,状态也都就绪了,前面就是开始正式的干活了-leader选举和日志同步
leader选举
raftNode选主是由逻辑时钟推动的,逻辑时钟的逻辑,后面曾经介绍了,这里跟着代码看看具体的实现
leader会定时给follower发送心跳,同时follower会有个定时器查看心跳距离,如果心跳距离超过设定的工夫,就会触发选主了
定时器触发
func (rc *raftNode) serveChannels() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() ...... for { select { case <-ticker.C: rc.node.Tick() ...... } }}func (n *node) Tick() { select { case n.tickc <- struct{}{}: case <-n.done: default: n.rn.raft.logger.Warningf("%x A tick missed to fire. Node blocks too long!", n.rn.raft.id) }}
定时器接管解决
定时器触发是在serveChannels
外面实现的,而后会通过tickc给到node.run
办法解决
func (n *node) run() { ...... switch { ...... case <-n.tickc: n.rn.Tick() ...... }}func (rn *RawNode) Tick() { rn.raft.tick()}func (r *raft) tickElection() { r.electionElapsed++ // 判断心跳是否超时,每次tick electionElapsed++,如果electionElapsed >= raft初始化时的randomizedElectionTimeout // 则认为心跳超时了,这时候就能够进行选主了 if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil { r.logger.Debugf("error occurred during election: %v", err) } }}// 心跳超时判断func (r *raft) pastElectionTimeout() bool { return r.electionElapsed >= r.randomizedElectionTimeout}
follower节点每次逻辑时钟推动,就会给electionElapsed+1,而在raft初始化的时候,会设置一个randomizedElectionTimeout,当逻辑时钟推动的次数超过randomizedElectionTimeout的时候,就会触发leader选举流程了
那么什么时候electionElapsed会被重置呢
func stepFollower(r *raft, m pb.Message) error { switch m.Type { ...... case pb.MsgHeartbeat: r.electionElapsed = 0 r.lead = m.From r.handleHeartbeat(m) ......}
当节点启动的时候,就会切换成follower的状态,当follower收到leader的MsgHeartbeat时,就会重置electionElapsed
这里当判断electionElapsed > randomizedElectionTimeout,也就是leader心跳超时的时候,就开始发动了选主的投票流程了
发动投票
leader选举这里咱们先不思考投票音讯的日志解决
func (r *raft) Step(m pb.Message) error { ...... switch m.Type { case pb.MsgHup: // preVote 是个开关,用于选举前置判断,例如以后节点被网络分区等状况造成的异样,只有preVote通过后才会发动投票 // preVote 和 vote逻辑差不多,就不多剖析了 if r.preVote { r.hup(campaignPreElection) } else { r.hup(campaignElection) } ...... }}func (r *raft) hup(t CampaignType) { // 如果以后节点是leader,则疏忽 if r.state == StateLeader { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) return } // 判断以后节点是否能够晋升为leader if !r.promotable() { r.logger.Warningf("%x is unpromotable and can not campaign", r.id) return } // 获取未提交的raftLog ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) if err != nil { r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) } if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) return } // 开始竞选 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) r.campaign(t)}
func (r *raft) campaign(t CampaignType) { if !r.promotable() { // This path should not be hit (callers are supposed to check), but // better safe than sorry. r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id) } var term uint64 var voteMsg pb.MessageType // 更新raft的状态 if t == campaignPreElection { r.becomePreCandidate() voteMsg = pb.MsgPreVote // PreVote RPCs are sent for the next term before we've incremented r.Term. term = r.Term + 1 } else { r.becomeCandidate() voteMsg = pb.MsgVote term = r.Term } // 给本身投票,并记录投票后果,判断是否能升级 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. // 这里则是preVote胜利了,则是发动真正的投票环节 if t == campaignPreElection { r.campaign(campaignElection) } else { // 投票胜利了,则晋升为leader r.becomeLeader() } return } // 投票后果未满足降职后果,则开始周知其余节点进行投票 var ids []uint64 { idMap := r.prs.Voters.IDs() ids = make([]uint64, 0, len(idMap)) for id := range idMap { ids = append(ids, id) } sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) } for _, id := range ids { if id == r.id { continue } r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term) var ctx []byte if t == campaignTransfer { ctx = []byte(t) } // 将投票的音讯发送给其余节点,这里只是把音讯存储起来,期待其余goroutine生产发送 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) }}
这里咱们先注意一下,当节点开启了preVote
后,竞选类型就是campaignPreElection
,而后晋升为PreCandidate
;否则竞选类型是campaignElection
,继而晋升为 Candidate
,咱们看下这两个角色别离干了什么
func (r *raft) becomeCandidate() { // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == StateLeader { panic("invalid transition [leader -> candidate]") } r.step = stepCandidate r.reset(r.Term + 1) r.tick = r.tickElection r.Vote = r.id r.state = StateCandidate r.logger.Infof("%x became candidate at term %d", r.id, r.Term)}func (r *raft) becomePreCandidate() { // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == StateLeader { panic("invalid transition [leader -> pre-candidate]") } // Becoming a pre-candidate changes our step functions and state, // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. r.step = stepCandidate r.prs.ResetVotes() r.tick = r.tickElection r.lead = None r.state = StatePreCandidate r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)}
能够看到,PreCandidate
这里的Term没有变动,而 Candidate
的Term + 1,跟咱们匹配上咱们后面说的 preVote
的作用了-防止网络分区造成的频繁选主
poll
的作用是记录某个id给本人的投票后果,并判断整体投票后果是否实现
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) { if v { r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) } else { r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) } r.prs.RecordVote(id, v) granted, rejected, result = r.prs.TallyVotes() r.logger.Infof("%d check poll result, votes = %d, rejected = %d, voteResult = %d, record = %v, voters = %v", r.id, granted, rejected, result, r.prs.Votes, r.prs.Voters) return}
func (r *raft) send(m pb.Message) { if m.From == None { m.From = r.id } ...... // 把音讯追加到slice外面,期待其余goroutine生产 r.msgs = append(r.msgs, m)}
选主流程的逻辑如下
- 首先判断本身是否是leader或本身是否能够降职
- 是否开启了preVote环节,开启的话,则先走一遍preVote流程,跟vote流程差不多
- 而后批改本身节点的状态和信息,降职成为Candidate
- 给本身投个票,并计算投票后果是否胜利,如果胜利则间接晋升为leader,开启leader相干的流程
- 如果投票还未完结,则开始给其余节点发送投票信息
- 这里的投票信息只是寄存在raft.msgs这个slice外面了,期待其余goroutine生产
那么到此,这里的发动投票环节完结,然而投票音讯还没有收回去,也没有接管投票后果,这部分逻辑就还要回归到 raftNode.serveChannels
和 node.run
来看了
投票音讯发送
func (n *node) run() { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} var rd Ready r := n.rn.raft lead := None for { if advancec != nil { readyc = nil } else if n.rn.HasReady() { // 判断是否有音讯须要解决 // Populate a Ready. Note that this Ready is not guaranteed to // actually be handled. We will arm readyc, but there's no guarantee // that we will actually send on it. It's possible that we will // service another channel instead, loop around, and then populate // the Ready again. We could instead force the previous Ready to be // handled first, but it's generally good to emit larger Readys plus // it simplifies testing (by emitting less frequently and more // predictably). // 获取msg和raft状态,组装成ready构造 rd = n.rn.readyWithoutAccept() readyc = n.readyc } ....... select { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. case pm := <-propc: m := pm.m m.From = r.id err := r.Step(m) if pm.result != nil { pm.result <- err close(pm.result) } // 接管到follower的投票后果 // 这里是网络组件相干的解决,当节点收到其余的网络申请的时候,会通过recvc通道传递过去解决 case m := <-n.recvc: // filter out response message from unknown From. if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { // 从新进入Step函数解决,留神这里的m.Type == MsgVoteResp r.Step(m) } ...... // 在这里,将下面组装好的ready构造体,通过readyc传给raftNode.serveChannels解决 case readyc <- rd: // 更新raft状态和删除msgs n.rn.acceptReady(rd) advancec = n.advancec case <-advancec: n.rn.Advance(rd) rd = Ready{} advancec = nil case c := <-n.status: c <- getStatus(r) case <-n.stop: close(n.done) return } }}
投票后果解决
func (r *raft) Step(m pb.Message) error { ...... switch m.Type { ..... default: err := r.step(r, m) if err != nil { return err } } return nil}
这里从新进入Step函数来解决音讯,然而这时候,收到的音讯类型是 MsgVoteResp
,间接执行default步骤了
同时,此时节点的状态曾经发动投票了,所以节点晋升为 Candidate
, r.step 对应的也就是 r.stepCandidate函数了
func stepCandidate(r *raft, m pb.Message) error { ...... case myVoteRespType: // 记录follower节点的投票后果,并计算投票的后果(输或赢) gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) switch res { // 博得选举,则晋升为leader节点 case quorum.VoteWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } // 输掉抉择,则降级为follower case quorum.VoteLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) // 其余状况下,就是票数还有余与判断选举后果,则持续期待其余节点的投票 } ...... return nil}
stepCandidate 则将follower传递过去的投票后果记录,并从新统计选主的后果,如果获得成功取得超过一半的票数,则晋升为leader,并告诉到各个节点;如果输掉选主,则降级为follower;其余状况,就是选主还在持续进行中,还有其余节点没有投票,则持续期待其余节点投票
总结
- raft外面会有一个定时器,定时触发,来推动选主逻辑的运行
- 每次定时器触发的时候,会对属性electionElapsed++,当electionElapsed的值超过初始化时创立的randomizedElectionTimeout,则认为leader发送心跳超时,开始触发选主逻辑
- follower节点判断preVote的属性是否设置为true,如果为true的话,则执行preVote逻辑,这里是为了防止相似于网络分区造成的节点数量有余集群的1/2造成的频繁选主
- follower节点将本身设置为Candidate,同时开始给本身投票
- 判断投票后果是否足以博得选举,如果能够的话,则晋升为leader,并公布告诉
- 投票后果不足以博得选举的时候,将投票工作分发给其余各个节点
- 其余各个节点的投票后果通过node.recvc这个通道传给raftNode,并在
run
函数外面解决 - 每次收到其余节点的投票后果后,从新执行一下投票的判断,晓得博得选举或输掉选举
日志同步
数据处理申请
数据处理是通过httpKVAPI来解决的,用户的Put method申请时,就是创立或批改数据
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { key := r.RequestURI defer r.Body.Close() switch r.Method { case http.MethodPut: v, err := io.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on PUT (%v)\n", err) http.Error(w, "Failed on PUT", http.StatusBadRequest) return } h.store.Propose(key, string(v)) // Optimistic-- no waiting for ack from raft. Value is not yet // committed so a subsequent GET on the key may return old value w.WriteHeader(http.StatusNoContent)......}
func (s *kvstore) Propose(k string, v string) { var buf bytes.Buffer if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil { log.Fatal(err) } s.proposeC <- buf.String()}
用于创立或批改数据的申请,并不是间接存储到kvstore的,而是通过proposeC转给raftNode来解决的
raftNode接收数据
func (rc *raftNode) serveChannels() { ...... // send proposals over raft go func() { confChangeCount := uint64(0) for rc.proposeC != nil && rc.confChangeC != nil { select { // 接管kvstore传递过去的数据 case prop, ok := <-rc.proposeC: if !ok { rc.proposeC = nil } else { // blocks until accepted by raft state machine rc.node.Propose(context.TODO(), []byte(prop)) } ...... } } // client closed channel; shutdown raft if not already close(rc.stopc) }()
func (n *node) Propose(ctx context.Context, data []byte) error { return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})}func (n *node) stepWait(ctx context.Context, m pb.Message) error { return n.stepWithWaitOption(ctx, m, true)}func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { ...... ch := n.propc pm := msgWithResult{m: m} if wait { pm.result = make(chan error, 1) } select { // 将数据封装好后,传给node.propc这个通道,交给node.run办法解决 case ch <- pm: // 这里是true,也就是不return if !wait { return nil } case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } // block直到有后果 select { case err := <-pm.result: if err != nil { return err } case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } return nil}
raftNode.serveChannels
开启了一个goroutine循环检测是无数有数据音讯,有音讯后就开始封装并通过chan传递给node.run
来解决了,并block住,晓得node.run
返回处理结果
接下来看下node.run的解决
func (n *node) run() { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} var rd Ready r := n.rn.raft lead := None for { ...... select { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. // 这里就跟下面的选主一样,选主也是一个音讯,日志同步也是一个音讯,所以都走到了这里 case pm := <-propc: m := pm.m m.From = r.id err := r.Step(m) // 解决实现后,将后果通过通道传递给下面,开释后面的block逻辑 if pm.result != nil { pm.result <- err close(pm.result) } ...... } }}
func (r *raft) Step(m pb.Message) error { // Handle the message term, which may result in our stepping down to a follower. ...... default: // 调用本身注册的step办法来解决 err := r.step(r, m) if err != nil { return err } } return nil}
这里会调用r.step() 来解决,而step这个办法对应不同的角色有不同的实现
follower 对应 stepFollower
leader 对应 stepLeader
咱们首先看下stepFollower
func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return ErrProposalDropped } else if r.disableProposalForwarding { r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) return ErrProposalDropped } m.To = r.lead // 将音讯转发给leader r.send(m) ...... return nil}
依据stepFollower
的逻辑能够看到,当follower拿到创立批改数据的音讯的时候,间接将这个申请转发给leader来解决
接下来看下stepLeader
,对应的leader拿到建批改数据的音讯时的解决形式
func stepLeader(r *raft, m pb.Message) error { ...... case pb.MsgProp: if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } if r.prs.Progress[r.id] == nil { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. return ErrProposalDropped } ...... if !r.appendEntry(m.Entries...) { return ErrProposalDropped } r.bcastAppend() return nil ...... return nil}
stepLeader
这里的外围逻辑就两步
- 尝试追加追加操作到raftLog里,如果追加失败则返回
- 告诉其余节点追加数据
追加raftLog
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { li := r.raftLog.lastIndex() // 设置每个数据的Term和Index,以示意程序 for i := range es { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } // Track the size of this uncommitted proposal. // 判断以后cucommit的数据大小+以后数据大小 是否大于 定于的最大uncommitSize,如果大于则失败 if !r.increaseUncommittedSize(es) { r.logger.Debugf( "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id, ) // Drop the proposal. return false } // use latest "last" index after truncate/append // 往raftLog外面追加数据 li = r.raftLog.append(es...) // 更新以后节点的Match和Next,这里的Match示意以后节点的raftLog的Index,如果给的数据的Index < Match,则表明数据有误,会回绝追加 r.prs.Progress[r.id].MaybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. // 尝试提交,只有当超过1/2+1节点ack这个数据后,才会提交 r.maybeCommit() return true}func (r *raft) maybeCommit() bool { // 这里就返回所有节点ack的commit log的Index mci := r.prs.Committed() return r.raftLog.maybeCommit(mci, r.Term)}func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { // 当ack的Index 大于 committed的时候,才会提交 if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } return false}
当追加raftLog的时候,同时尝试提交这条数据到存储系统,这时候也会跟选主时一样,判断一下整个集群是否超过1/2+1节点收到数据并批准
告诉其余节点追加数据
追加完raftLog之后,其余节点还都没有解决,这时候数据也不会commit到存储系统,所以要先告诉一下其余节点,让他们先存储上,而后leader能力commit
func (r *raft) bcastAppend() { // 循环发送给其余节点 r.prs.Visit(func(id uint64, _ *tracker.Progress) { if id == r.id { return } r.sendAppend(id) })}func (r *raft) sendAppend(to uint64) { r.maybeSendAppend(to, true)}func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.prs.Progress[to] if pr.IsPaused() { return false } m := pb.Message{} m.To = to term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) if len(ents) == 0 && !sendIfEmpty { return false } ...... // 定义数据音讯的Type,Index,Term和追加的数据 m.Type = pb.MsgApp m.Index = pr.Next - 1 m.LogTerm = term m.Entries = ents m.Commit = r.raftLog.committed if n := len(m.Entries); n != 0 { switch pr.State { // optimistically increase the next when in StateReplicate case tracker.StateReplicate: last := m.Entries[n-1].Index pr.OptimisticUpdate(last) pr.Inflights.Add(last) case tracker.StateProbe: pr.ProbeSent = true default: r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) } } // 发送给塔器节点 r.send(m) return true}
发送给其余节点的时候,组装好数据音讯,就间接调用网络组件进行发送就行了
接下来就是接管follower节点返回的音讯了,leader发送的音讯类型是MsgApp,那么follower节点返回的音讯类型就是MsgAppResp
那就持续回到node.run
办法看下收到follower节点的音讯的解决
解决follower的音讯解决响应
又见到老朋友
func (n *node) run() { ...... for { ...... select { ...... case m := <-n.recvc: // filter out response message from unknown From. if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { r.Step(m) } ...... } }}
node.run
的 case m := <-n.recvc:
在选主的时候曾经见到过了,这个也是解决其余节点网络申请或详情的入口
func (r *raft) Step(m pb.Message) error { ...... switch m.Type { ...... default: err := r.step(r, m) if err != nil { return err } } return nil}func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { ...... // All other message types require a progress for m.From (pr). pr := r.prs.Progress[m.From] if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return nil } switch m.Type { case pb.MsgAppResp: pr.RecentActive = true if m.Reject { // 这时候数据音讯的追加被follower 回绝了,同时follower计算出来一个可能的Index点 r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d", r.id, m.RejectHint, m.LogTerm, m.From, m.Index) nextProbeIdx := m.RejectHint if m.LogTerm > 0 { // 依据follower返回的Index点和Term,找到可能的Index去改正follower的数据 nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) } // reject的follower节点,状态先批改为StateProbe,表明这个follower的lastIndex须要先去探测,不明 // 批改这个follower节点的Next if pr.MaybeDecrTo(m.Index, nextProbeIdx) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) if pr.State == tracker.StateReplicate { pr.BecomeProbe() } // 从批改后的Index日志点,持续尝试往follower追加数据 r.sendAppend(m.From) } } else { oldPaused := pr.IsPaused() // 尝试更新follower节点在Index和Next地位 if pr.MaybeUpdate(m.Index) { switch { // follower节点追加胜利的话,就不必持续探测了,状态变成StateReplicate,示意失常的follower case pr.State == tracker.StateProbe: pr.BecomeReplicate() case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: // TODO(tbg): we should also enter this branch if a snapshot is // received that is below pr.PendingSnapshot but which makes it // possible to use the log again. r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't // move to replicating state, that would only happen with // the next round of appends (but there may not be a next // round for a while, exposing an inconsistent RaftStatus). pr.BecomeProbe() pr.BecomeReplicate() case pr.State == tracker.StateReplicate: pr.Inflights.FreeLE(m.Index) } // 尝试将数据进行提交 if r.maybeCommit() { // committed index has progressed for the term, so it is safe // to respond to pending read index requests releasePendingReadIndexMessages(r) // 再一次播送,这时候播送的数据的Commit和Index及Entries都有变动,目标是为了让follower提交数据 r.bcastAppend() } else if oldPaused { // If we were paused before, this node may be missing the // latest commit index, so send it. r.sendAppend(m.From) } // We've updated flow control information above, which may // allow us to send multiple (size-limited) in-flight messages // at once (such as when transitioning from probe to // replicate, or when freeTo() covers multiple messages). If // we have more entries to send, send as many messages as we // can (without sending empty messages for the commit index) for r.maybeSendAppend(m.From, false) { } // Transfer leadership is in progress. // 节点迁徙相干 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From) r.sendTimeoutNow(m.From) } } } ...... } return nil}// 尝试更新follower的Match和Next的索引值func (pr *Progress) MaybeUpdate(n uint64) bool { var updated bool if pr.Match < n { pr.Match = n updated = true pr.ProbeAcked() } pr.Next = max(pr.Next, n+1) return updated} func (r *raft) maybeCommit() bool { // MaybeUpdate 中会保留每个响应音讯的节点的Match,这里就依据Match计算出来1/2+1以上节点共识的Match的Index点 mci := r.prs.Committed() // 依据计算出来的Index点,尝试commit return r.raftLog.maybeCommit(mci, r.Term)} func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { // 当Index 大于 提交的Index点的时候 并且 对应的Term == raftLog.Term,才会进行提交 if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } return false} func (l *raftLog) commitTo(tocommit uint64) { // never decrease commit if l.committed < tocommit { if l.lastIndex() < tocommit { l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) } // 将raftLog的commit点批改 l.committed = tocommit }}
当leader收到follower的MsgAppResp的音讯的时候,就开始进行解决了
follower返回的处理结果有两种状态
胜利
- 这时候尝试会更新返回音讯的节点在leader内存中存储的Match和Next,也就是leader记录的follower节点的raft log的状态
- 判断这条日志是否超过1/2+1以上的节点接管并解决了
- 如果超过了,则将这条日志commit,而commit的逻辑,在这里仅仅只是更新了raftLog的committed的记录值(期待
node.run
来解决),同时向所有节点播送以此让follower commit数据 - 没有超过则期待下一次解决
回绝
- 如果follower节点回绝,则follower节点的响应信息会携带可能匹配的Index和Term
- leader节点依据follower节点的响应,找到可能match的Index地位,并跟follower协调
- 反复后面两个步骤晓得macth到了leader和follower的日志Index地位,开始给follower同步日志
- 最初再回到胜利状态的解决
follower节点回绝追加
这里有一点须要阐明一下,当follower节点MsgAppResp的后果为reject的时候,阐明leader和follower日志不统一,所以这时候,leader须要去探测follower与leader节点统一的Index点,并从探测并纠正后的点去开始追加,也就是findConflictByTerm() 的逻辑
这里用源码中的图来展现一下
示例一:
idx 1 2 3 4 5 6 7 8 9 -----------------term (Leader) 1 3 3 3 5 5 5 5 5term (Follower) 1 1 1 1 2 2 假如这是初始状态的Leader节点日志和Follower节点日志当leader节点须要将Index = 9的数据同步给follower的时候,follower发现自己在Index(7,8)都没有数据,也就是Match不上,就会reject这个申请,并找到比Index = 9点的Term = 5 小的Term的数据,也就是follower的最初一条数据 Index = 6, Term = 2这时候leader收到了follower节点的回绝音讯,和Index,Term,如果leader从这个节点间接追加,必定也会失败,因为Index = 6的leader和follower的Term都不一样所以leader会再做一个计算,leader.Term <= folllower.Term,也就找到Index = 1 这个地位综上 就是 findConflictByTerm() 这个函数的作用
示例二:
idx 1 2 3 4 5 6 7 8 9 -----------------term (Leader) 1 3 3 3 3 3 3 3 7term (Follower) 1 3 3 4 4 5 5 5 6leader须要追加Index = 9的数据到followerfollower回绝后,并返回Term = 6 (leader.Term == 7 > 6)leader收到音讯后,对日志进行回溯,这时候发现Index = 8 合乎(leader.Term == 3 < 6)而后leader从新开始追加follower又回绝了,因为Index = 8时,follower.Term = 5,而leader.Term = 3,所以这时候follower须要找到一个小于等于Term 3的地位,也就会回溯了 Index = 3这时候leader从这里持续追加日志就胜利了
commit数据
看到下面可能会有点纳闷,commit数据如同啥都没干,就是commit+1,这里就要回到node.run
来看了;回顾选主的流程,选主的音讯是放到了raft.msgs外面,而后node.run
外面的一个协程循环读取,而后开始解决的,那么commit数据的逻辑是不是一样的呢
func (n *node) run() { var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} var rd Ready r := n.rn.raft lead := None for { if advancec != nil { readyc = nil } else if n.rn.HasReady() { rd = n.rn.readyWithoutAccept() readyc = n.readyc } ...... select { ...... // 将数据通过readyc发送给serveChannels来解决 case readyc <- rd: n.rn.acceptReady(rd) advancec = n.advancec ...... } }}func (rn *RawNode) HasReady() bool { r := rn.raft ...... // 这里判断,是否有音讯(对应抉择的音讯) // hasNextEnts 则判断有没有commit然而还没有apply的数据 if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() { return true } if len(r.readStates) != 0 { return true } return false}// 判断是否有commit但为apply的数据func (l *raftLog) hasNextEnts() bool { // 判断commit的Index是不是大于apply的Index off := max(l.applied+1, l.firstIndex()) return l.committed+1 > off}
node.run
这里循环判断是否有须要commit的日志或追加的msgs,如果有的话,则通过readyc发送给serveChannels
来解决
接下来看下serveChannels
的解决
func (rc *raftNode) serveChannels() { ...... // event loop on raft state machine updates for { select { ...... // store raft entries to wal, then publish over commit channel case rd := <-rc.node.Ready(): // Must save the snapshot file and WAL snapshot entry before saving any other entries // or hardstate to ensure that recovery after a snapshot restore is possible. ...... // 存储到storage rc.raftStorage.Append(rd.Entries) // 这里没有Messages了,都是Entries,所以这里能够疏忽 rc.transport.Send(rc.processMessages(rd.Messages)) // apply到kvstore applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)) if !ok { rc.stop() return } rc.maybeTriggerSnapshot(applyDoneC) rc.node.Advance() case err := <-rc.transport.ErrorC: rc.writeError(err) return case <-rc.stopc: rc.stop() return } }}
serveChannels
接管到node.run
捞取到的Entries后,就开始将日志追加到wal和storeage外面,而后通过publishEntries
存储到kvstore
接下来看下publishEntries
做了什么
func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) { if len(ents) == 0 { return nil, true } data := make([]string, 0, len(ents)) // 组装data for i := range ents { switch ents[i].Type { case raftpb.EntryNormal: if len(ents[i].Data) == 0 { // ignore empty messages break } s := string(ents[i].Data) data = append(data, s) ...... } } var applyDoneC chan struct{} if len(data) > 0 { applyDoneC = make(chan struct{}, 1) select { // 将组装好的data 通过commitc传给kvstore解决,当kvstore解决实现后,再通过applyDoneC通道告诉过去 case rc.commitC <- &commit{data, applyDoneC}: case <-rc.stopc: return nil, false } } // after commit, update appliedIndex rc.appliedIndex = ents[len(ents)-1].Index return applyDoneC, true}
publishEntries
将数据组装好后,通过commitC
传给kvstore,而后更新掉raftNode.appliedIndex,同时返回一个chan,以便kvstore告诉raftNode数据存储胜利
而readCommits
就比较简单了,收到数据存储到本身即可,同时敞开下层raftNode传递过去的通道,已告诉下层raftNode存储实现
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) { for commit := range commitC { // commit == nil的时候,kvstore从snapshot文件外面复原数据 if commit == nil { // signaled to load snapshot snapshot, err := s.loadSnapshot() if err != nil { log.Panic(err) } if snapshot != nil { log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) if err := s.recoverFromSnapshot(snapshot.Data); err != nil { log.Panic(err) } } continue } // 将数据存储到kvstore外面 for _, data := range commit.data { var dataKv kv dec := gob.NewDecoder(bytes.NewBufferString(data)) if err := dec.Decode(&dataKv); err != nil { log.Fatalf("raftexample: could not decode message (%v)", err) } s.mu.Lock() s.kvStore[dataKv.Key] = dataKv.Val s.mu.Unlock() } // 敞开applyDoneC,以告诉下层模块解决实现 close(commit.applyDoneC) } if err, ok := <-errorC; ok { log.Fatal(err) }}
总结
咱们外围目标是为了理解raft协定,所以这里仅仅解析了leader选举和日志同步的能力,还有wal日志、snapshot、网络组件,咱们都没有解析,有趣味的同学能够本人追踪看看
整个raftExample的实现比拟简答,追踪起来也比拟容易,外围的代码逻辑次要集中在了node.run
、 raftNode.serveChannels
和 stepXXX
外面,代码分层也比拟清晰