1.程序结构
lab2的试验是要实现以下接口
// create a new Raft server instance:rf := Make(peers, me, persister, applyCh)// start agreement on a new log entry:rf.Start(command interface{}) (index, term, isleader)// ask a Raft for its current term, and whether it thinks it is leaderrf.GetState() (term, isLeader)// each time a new entry is committed to the log, each Raft peer// should send an ApplyMsg to the service (or tester).type ApplyMsg
其中参数在正文中根本都有解释。
Make 用来创立点对点server ,peers是所有的server,len(peers)也就是所有的server数量,me就是以后的server,也就是在peers中的下标。persister和applyCh 以及Start函数在2A试验中没有用上。
做试验前强烈建议多读几遍raft论文。
助教的student guide 也很重要:
student guide
FAQ
locking and structure
2.常见问题
- 什么时候须要重置超时选举工夫?在 studen guide中写到有3种状况须要重置超时工夫:
a) 从以后的leader 中收到 AppendEntries ,收到过期的不须要重置
b) 开始一轮选举的时候,当然本轮没有选举后果开始下一轮的时候也应该重置
c) 当你给其余节点投票的时候
- 什么时候重置votefFor?
当term扭转的时候重置votedfor,例如投票的时候收到了term更高,那么不论此刻是否投票过,都应该转变为follower并且重置votedfor;当leader掉线一段时间重连回来,此刻群组有新leader,那么old leader应该转变为follower并且重置votedfor。
- 在做RPC申请的时候不应该加锁。例如:
rf.mu.Lock()
rf. sendRequestVote(...)
rf.mu.UnLock()
这种写法是不合逻辑的,容易造成死锁,咱们仅需对共享资源加锁就足够了,比方这里对参数结构加锁。
- server 成为leader之后不须要进行 选举超时倒计时。
3.实现 给出局部代码,仅供参考
3.1 流程
- 首先 raft 启动,在Make 中做初始化,一开始所有的server都是follower,并且每个server会随机调配一个选举超时工夫,同时所有的server都有雷同的心跳间隔时间。那么因为波及到角色转换,咱们最好为每一种角色写一个转换函数,convertToFollower,convertToCandidate,convertToLeader。
- 什么时候会转变到follower呢?不外乎在心跳,投票或者appendEntries的时候收到了更加新的server的信息,这时候须要转变到follower的同时更新本人的Term
- 转变到candidate只有一种,当选举超时内没有收到心跳或者appendEntries,那么超时之后被动变成candidate
- 转变到leader,只有在candidate收到大多数申请的时候,竞选胜利,成为leader
func (rf* Raft) convertToCandidate(){ rf.raftStatus = Candidate rf.currentTerm++ rf.votedFor = rf.me rf.nonLeaderCond.Signal() // awake electionTimeoutTick}func (rf* Raft) convertToFollower(newTerm int){ rf.raftStatus = Follower rf.currentTerm = newTerm rf.votedFor = -1 rf.nonLeaderCond.Signal() // awake electionTimeoutTick}func (rf* Raft) convertToLeader(){ rf.raftStatus = Leader}
初始化的时候,为了避免瓜分选票的状况产生,须要随机设置不同的超时工夫,论文倡议是150-300ms ,倡议写个函数,用工夫戳做随机种子,论文倡议每次生成不同的超时工夫。
func (rf *Raft) getRandElecTime() int { rand.Seed(time.Now().UnixNano()) electionTimeout := rand.Intn(150) + 150 // [150,300) return electionTimeout}func (rf* Raft) resetElectionTimer() { rf.lastHeartbeatTime = time.Now().UnixNano() rf.electionTimeout = rf.getRandElecTime()}
依据论文解析,咱们须要长时间运行的gorutinue (能够简略了解为死循环)来处理事件和统计选举超时。
func (rf *Raft) EventLoop() () { for{ rf.mu.Lock() if rf.killed(){ rf.mu.Unlock() return } rf.mu.Unlock() select { case <- rf.electionTimeoutChan: rf.mu.Lock() DPrintf("[EventLoop]: Id %d Term %d State %s \t || \t election timeout , start an eletion\n", rf.me, rf.currentTerm, state2name(rf.raftStatus)) rf.mu.Unlock() // start election, if election timeout go rf.startElection() case <-rf.heartbeatPeriodChan: DPrintf("[EventLoop]: Id %d Term %d State %s \t || \t election timeout , start to send heartbeat\n", rf.me, rf.currentTerm, state2name(rf.raftStatus)) go rf.broadcastHeartbeat() } }}func (rf *Raft) electionTimeoutTick() { for { rf.mu.Lock() _, isLeader := rf.GetState() if rf.killed(){ rf.mu.Unlock() return } rf.mu.Unlock() if isLeader { // if is leader , no need to check election timeout rf.nonLeaderCond.L.Lock() rf.nonLeaderCond.Wait() rf.nonLeaderCond.L.Unlock() }else { // follower and candidate rf.mu.Lock() elapseTime := time.Now().UnixNano() - rf.lastHeartbeatTime if elapseTime/int64(time.Millisecond) > int64(rf.electionTimeout){ DPrintf("[electionTimeoutTick]: Id %d Term %d State %s\t || \ttimeout,"+ "convert to candidate\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus)) DPrintf("[electionTimeoutTick] : %d %d\n",elapseTime/int64(time.Millisecond),int64(rf.electionTimeout)) rf.electionTimeoutChan <- true } rf.mu.Unlock() time.Sleep(time.Millisecond*10) } }}
试验局部要求心跳不超过10次/s,也就是不小于100ms/次,而心跳工夫距离应该小于选举超时工夫,因而设置为100到150ms之间。
func (rf* Raft) broadcastHeartbeat() { for { rf.mu.Lock() _,isLeader := rf.GetState() if rf.killed(){ rf.mu.Unlock() return } rf.mu.Unlock() if !isLeader { // not leader , then return return } // send heart beat for i,_ := range rf.peers{ if i == rf.me{ continue } rf.mu.Lock() //prevLogIndex := len(rf.log)-1 prevLogIndex,PrevLogTerm :=rf.getLastLogInfo() args := AppendEntriesArgs{Term: rf.currentTerm,LeaderId: rf.me,PrevLogIndex: prevLogIndex, PrevLogTerm: PrevLogTerm, LeaderCommit: rf.commitIndex} var reply AppendEntriesReply rf.mu.Unlock() go func(index int, args *AppendEntriesArgs, reply* AppendEntriesReply) { ok := rf.sendAppendEntries(index,args,reply) rf.mu.Lock() defer rf.mu.Unlock() if ok == false{ // send heartbeat failed //DPrintf("[broadcastHeartbeat]: %d send to peer's id %d failed term %d\n",args.LeaderId,index,reply.Term) }else { //DPrintf("[broadcastHeartbeat]: Id %d Term %d State %s\t || \ttimeout,"+ // "send heartbeat to %d success\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus),index) if reply.Term > rf.currentTerm{ // DPrintf("[broadcastHeartbeat]: Id %d Term %d State %s\t || \ttimeout,"+ "send heartbeat failed, reply term %d\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus),reply.Term) rf.convertToFollower(reply.Term) }else if reply.Term == rf.currentTerm && reply.Success == false{ // follower's log index and log term not match }else { // } } // check reply }(i,&args,&reply) } // sleep time.Sleep(time.Duration(rf.heartbeatInterval)*time.Millisecond) }}
- 开始选举。当某一台server 的选举超时计时先倒数完,这台server依照规定,先给变成candidate,term加一,给本人投票,重置选举超时计时器,而后让其余server投票。只有超过半数,那么就入选leader,开始给其余节点发心跳。
func (rf* Raft) startElection() { rf.mu.Lock() rf.convertToCandidate() nVotes := 1 // has voted in the term // 3.reset election timeout rf.resetElectionTimer() DPrintf("[startElection]: Id %d Term %d state %s \n",rf.me, rf.currentTerm, state2name(rf.raftStatus)) rf.mu.Unlock() // 4.send request vote to other server go func(nVotes* int ,rf* Raft) { var wg sync.WaitGroup winThreadHold := len(rf.peers)/2 + 1 for i,_ := range rf.peers{ if i == rf.me{ continue } rf.mu.Lock() lastLogIndex ,LastLogTerm := rf.getLastLogInfo() reqArgs := RequestVoteArgs{Term: rf.currentTerm,CandidateId: rf.me, LastLogIndex: lastLogIndex,LastLogTerm: LastLogTerm} rf.mu.Unlock() wg.Add(1) var reply RequestVoteReply go func(index int, rf* Raft, args* RequestVoteArgs, reply* RequestVoteReply) { defer wg.Done() DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + "start send request vote to %d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),index) ok := rf.sendRequestVote(index,args,reply) if ok == false { DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + "send request vote to %d failed \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),index) return } rf.mu.Lock() defer rf.mu.Unlock() // reject vote if reply.VoteGranted == false{ if reply.Term > rf.currentTerm{ DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + "peer term:%d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),reply.Term) rf.convertToFollower(reply.Term) } }else{ *nVotes += 1 // if it self has became leader , then no need to do _,isLeader := rf.GetState() if isLeader{ return } if rf.raftStatus == Candidate && *nVotes >= winThreadHold { DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + "win election votes:%d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),*nVotes) // win election rf.convertToLeader() // re init nextIndex rf.reInitNextIndex() // send heartbeat immediately to all server DPrintf("start send heartbeat\n") go rf.broadcastHeartbeat() } } }(i,rf,&reqArgs,&reply) } // wait all send finish wg.Wait() }(&nVotes,rf)}
- 解决投票
那么这里次要是依据term和lastlogindex来判断是否投票。
不投票?
a) 比本人Term小
b) log比本人旧
c) 在本次Term中曾经投票过了(args.Term == rf.currentTerm && rf.votedFor!= -1)
- 解决心跳
论文说了,entries为空示意心跳信息。
如果收到了term更高的发来的心跳,应该转变为follower并重置选举超时计时器
收到以后term的,间接重置选举超时计时器
同时咱们应该在reply中附上本人的term,以便leader检测本人是否是过期的leader(leader断线重连,term比本人小)
本篇文章由一文多发平台ArtiPub主动公布