共计 6984 个字符,预计需要花费 18 分钟才能阅读完成。
1. 程序结构
lab2 的试验是要实现以下接口
// create a new Raft server instance: | |
rf := Make(peers, me, persister, applyCh) | |
// start agreement on a new log entry: | |
rf.Start(command interface{}) (index, term, isleader) | |
// ask a Raft for its current term, and whether it thinks it is leader | |
rf.GetState() (term, isLeader) | |
// each time a new entry is committed to the log, each Raft peer | |
// should send an ApplyMsg to the service (or tester). | |
type ApplyMsg |
其中参数在正文中根本都有解释。
Make 用来创立点对点 server,peers 是所有的 server,len(peers)也就是所有的 server 数量,me 就是以后的 server,也就是在 peers 中的下标。persister 和 applyCh 以及 Start 函数在 2A 试验中没有用上。
做试验前强烈建议多读几遍 raft 论文。
助教的 student guide 也很重要:
student guide
FAQ
locking and structure
2. 常见问题
- 什么时候须要重置超时选举工夫?在 studen guide 中写到有 3 种状况须要重置超时工夫:
a) 从以后的 leader 中收到 AppendEntries,收到过期的不须要重置
b) 开始一轮选举的时候,当然本轮没有选举后果开始下一轮的时候也应该重置
c) 当你给其余节点投票的时候
- 什么时候重置 votefFor?
当 term 扭转的时候重置 votedfor,例如投票的时候收到了 term 更高,那么不论此刻是否投票过,都应该转变为 follower 并且重置 votedfor;当 leader 掉线一段时间重连回来,此刻群组有新 leader,那么 old leader 应该转变为 follower 并且重置 votedfor。
- 在做 RPC 申请的时候不应该加锁。例如:
rf.mu.Lock()
rf. sendRequestVote(...)
rf.mu.UnLock()
这种写法是不合逻辑的,容易造成死锁,咱们仅需对共享资源加锁就足够了,比方这里对参数结构加锁。
- server 成为 leader 之后不须要进行 选举超时倒计时。
3. 实现 给出局部代码,仅供参考
3.1 流程
- 首先 raft 启动,在 Make 中做初始化,一开始所有的 server 都是 follower,并且每个 server 会随机调配一个选举超时工夫,同时所有的 server 都有雷同的心跳间隔时间。那么因为波及到角色转换,咱们最好为每一种角色写一个转换函数,convertToFollower,convertToCandidate,convertToLeader。
- 什么时候会转变到 follower 呢?不外乎在心跳,投票或者 appendEntries 的时候收到了更加新的 server 的信息,这时候须要转变到 follower 的同时更新本人的 Term
- 转变到 candidate 只有一种,当选举超时内没有收到心跳或者 appendEntries,那么超时之后被动变成 candidate
- 转变到 leader,只有在 candidate 收到大多数申请的时候,竞选胜利,成为 leader
func (rf* Raft) convertToCandidate(){ | |
rf.raftStatus = Candidate | |
rf.currentTerm++ | |
rf.votedFor = rf.me | |
rf.nonLeaderCond.Signal() // awake electionTimeoutTick} | |
func (rf* Raft) convertToFollower(newTerm int){ | |
rf.raftStatus = Follower | |
rf.currentTerm = newTerm | |
rf.votedFor = -1 | |
rf.nonLeaderCond.Signal() // awake electionTimeoutTick} | |
func (rf* Raft) convertToLeader(){rf.raftStatus = Leader} |
初始化的时候,为了避免瓜分选票的状况产生,须要随机设置不同的超时工夫,论文倡议是 150-300ms,倡议写个函数,用工夫戳做随机种子,论文倡议每次生成不同的超时工夫。
func (rf *Raft) getRandElecTime() int {rand.Seed(time.Now().UnixNano()) | |
electionTimeout := rand.Intn(150) + 150 // [150,300) | |
return electionTimeout | |
} | |
func (rf* Raft) resetElectionTimer() {rf.lastHeartbeatTime = time.Now().UnixNano() | |
rf.electionTimeout = rf.getRandElecTime()} |
依据论文解析,咱们须要长时间运行的 gorutinue(能够简略了解为死循环)来处理事件和统计选举超时。
func (rf *Raft) EventLoop() () { | |
for{rf.mu.Lock() | |
if rf.killed(){rf.mu.Unlock() | |
return | |
} | |
rf.mu.Unlock() | |
select { | |
case <- rf.electionTimeoutChan: | |
rf.mu.Lock() | |
DPrintf("[EventLoop]: Id %d Term %d State %s \t || \t election timeout , start an eletion\n", | |
rf.me, rf.currentTerm, state2name(rf.raftStatus)) | |
rf.mu.Unlock() | |
// start election, if election timeout | |
go rf.startElection() | |
case <-rf.heartbeatPeriodChan: | |
DPrintf("[EventLoop]: Id %d Term %d State %s \t || \t election timeout , start to send heartbeat\n", | |
rf.me, rf.currentTerm, state2name(rf.raftStatus)) | |
go rf.broadcastHeartbeat()} | |
} | |
} | |
func (rf *Raft) electionTimeoutTick() { | |
for {rf.mu.Lock() | |
_, isLeader := rf.GetState() | |
if rf.killed(){rf.mu.Unlock() | |
return | |
} | |
rf.mu.Unlock() | |
if isLeader { | |
// if is leader , no need to check election timeout | |
rf.nonLeaderCond.L.Lock() | |
rf.nonLeaderCond.Wait() | |
rf.nonLeaderCond.L.Unlock()}else { // follower and candidate | |
rf.mu.Lock() | |
elapseTime := time.Now().UnixNano() - rf.lastHeartbeatTime | |
if elapseTime/int64(time.Millisecond) > int64(rf.electionTimeout){DPrintf("[electionTimeoutTick]: Id %d Term %d State %s\t || \ttimeout,"+ | |
"convert to candidate\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus)) | |
DPrintf("[electionTimeoutTick] : %d %d\n",elapseTime/int64(time.Millisecond),int64(rf.electionTimeout)) | |
rf.electionTimeoutChan <- true | |
} | |
rf.mu.Unlock() | |
time.Sleep(time.Millisecond*10) | |
} | |
} | |
} |
试验局部要求心跳不超过 10 次 /s,也就是不小于 100ms/ 次,而心跳工夫距离应该小于选举超时工夫,因而设置为 100 到 150ms 之间。
func (rf* Raft) broadcastHeartbeat() { | |
for {rf.mu.Lock() | |
_,isLeader := rf.GetState() | |
if rf.killed(){rf.mu.Unlock() | |
return | |
} | |
rf.mu.Unlock() | |
if !isLeader { | |
// not leader , then return | |
return | |
} | |
// send heart beat | |
for i,_ := range rf.peers{ | |
if i == rf.me{continue} | |
rf.mu.Lock() | |
//prevLogIndex := len(rf.log)-1 | |
prevLogIndex,PrevLogTerm :=rf.getLastLogInfo() | |
args := AppendEntriesArgs{Term: rf.currentTerm,LeaderId: rf.me,PrevLogIndex: prevLogIndex, | |
PrevLogTerm: PrevLogTerm, LeaderCommit: rf.commitIndex} | |
var reply AppendEntriesReply | |
rf.mu.Unlock() | |
go func(index int, args *AppendEntriesArgs, reply* AppendEntriesReply) {ok := rf.sendAppendEntries(index,args,reply) | |
rf.mu.Lock() | |
defer rf.mu.Unlock() | |
if ok == false{ | |
// send heartbeat failed | |
//DPrintf("[broadcastHeartbeat]: %d send to peer's id %d failed term %d\n",args.LeaderId,index,reply.Term) | |
}else {//DPrintf("[broadcastHeartbeat]: Id %d Term %d State %s\t || \ttimeout,"+ | |
// "send heartbeat to %d success\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus),index) | |
if reply.Term > rf.currentTerm{ | |
// | |
DPrintf("[broadcastHeartbeat]: Id %d Term %d State %s\t || \ttimeout,"+ | |
"send heartbeat failed, reply term %d\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus),reply.Term) | |
rf.convertToFollower(reply.Term) | |
}else if reply.Term == rf.currentTerm && reply.Success == false{// follower's log index and log term not match}else {//} | |
} | |
// check reply | |
}(i,&args,&reply) | |
} | |
// sleep | |
time.Sleep(time.Duration(rf.heartbeatInterval)*time.Millisecond) | |
} | |
} |
- 开始选举。当某一台 server 的选举超时计时先倒数完,这台 server 依照规定,先给变成 candidate,term 加一,给本人投票,重置选举超时计时器,而后让其余 server 投票。只有超过半数,那么就入选 leader,开始给其余节点发心跳。
func (rf* Raft) startElection() {rf.mu.Lock() | |
rf.convertToCandidate() | |
nVotes := 1 // has voted in the term | |
// 3.reset election timeout | |
rf.resetElectionTimer() | |
DPrintf("[startElection]: Id %d Term %d state %s \n",rf.me, rf.currentTerm, state2name(rf.raftStatus)) | |
rf.mu.Unlock() | |
// 4.send request vote to other server | |
go func(nVotes* int ,rf* Raft) { | |
var wg sync.WaitGroup | |
winThreadHold := len(rf.peers)/2 + 1 | |
for i,_ := range rf.peers{ | |
if i == rf.me{continue} | |
rf.mu.Lock() | |
lastLogIndex ,LastLogTerm := rf.getLastLogInfo() | |
reqArgs := RequestVoteArgs{Term: rf.currentTerm,CandidateId: rf.me, LastLogIndex: lastLogIndex,LastLogTerm: LastLogTerm} | |
rf.mu.Unlock() | |
wg.Add(1) | |
var reply RequestVoteReply | |
go func(index int, rf* Raft, args* RequestVoteArgs, reply* RequestVoteReply) {defer wg.Done() | |
DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + | |
"start send request vote to %d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),index) | |
ok := rf.sendRequestVote(index,args,reply) | |
if ok == false {DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + | |
"send request vote to %d failed \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),index) | |
return | |
} | |
rf.mu.Lock() | |
defer rf.mu.Unlock() | |
// reject vote | |
if reply.VoteGranted == false{ | |
if reply.Term > rf.currentTerm{DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + | |
"peer term:%d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),reply.Term) | |
rf.convertToFollower(reply.Term) | |
} | |
}else{ | |
*nVotes += 1 | |
// if it self has became leader , then no need to do | |
_,isLeader := rf.GetState() | |
if isLeader{return} | |
if rf.raftStatus == Candidate && *nVotes >= winThreadHold {DPrintf("[startElection]: Id %d Term %d state %s \t || \t" + | |
"win election votes:%d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),*nVotes) | |
// win election | |
rf.convertToLeader() | |
// re init nextIndex | |
rf.reInitNextIndex() | |
// send heartbeat immediately to all server | |
DPrintf("start send heartbeat\n") | |
go rf.broadcastHeartbeat()} | |
} | |
}(i,rf,&reqArgs,&reply) | |
} | |
// wait all send finish | |
wg.Wait()}(&nVotes,rf) | |
} |
- 解决投票
那么这里次要是依据 term 和 lastlogindex 来判断是否投票。
不投票?
a) 比本人 Term 小
b) log 比本人旧
c) 在本次 Term 中曾经投票过了(args.Term == rf.currentTerm && rf.votedFor!= -1)
- 解决心跳
论文说了,entries 为空示意心跳信息。
如果收到了 term 更高的发来的心跳,应该转变为 follower 并重置选举超时计时器
收到以后 term 的,间接重置选举超时计时器
同时咱们应该在 reply 中附上本人的 term,以便 leader 检测本人是否是过期的 leader(leader 断线重连,term 比本人小)
本篇文章由一文多发平台 ArtiPub 主动公布