Lab2
这个 lab 是要实现一个 Raft 一致性协定。
lab 地址:http://nil.csail.mit.edu/6.82…
raft 论文地址:http://nil.csail.mit.edu/6.82…
测试形式:
$ cd src/raft
$ go test
Test (2A): initial election ...
--- FAIL: TestInitialElection2A (5.09s)
config.go:333: expected one leader, got none
Test (2A): election after network failure ...
--- FAIL: TestReElection2A (4.99s)
config.go:333: expected one leader, got none
Test (2B): basic agreement ...
--- FAIL: TestBasicAgree2B (10.05s)
config.go:478: one(100) failed to reach agreement
Test (2B): RPC byte count ...
--- FAIL: TestRPCBytes2B (10.05s)
config.go:478: one(99) failed to reach agreement
Test (2B): agreement despite follower disconnection ...
--- FAIL: TestFailAgree2B (10.00s)
config.go:478: one(101) failed to reach agreement
Test (2B): no agreement if too many followers disconnect ...
lab 阐明:通过在 raft/raft.go
中增加代码来实现 Raft 协定。在该文件中,您将找到框架代码,以及如何发送和接管 rpc 的示例。
您的实现必须反对以下接口,测试和最终的 kv 服务器将应用该接口。您能够在 raft.go 的评论中找到更多细节。
// create a new Raft server instance:
rf := Make(peers, me, persister, applyCh)
// start agreement on a new log entry:
rf.Start(command interface{}) (index, term, isleader)
// ask a Raft for its current term, and whether it thinks it is leader
rf.GetState() (term, isLeader)
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester).
type ApplyMsg
服务调用 Make(peers,me,…)
来创立 Raft 节点。peer
参数是 Raft 对等点 (包含这个) 的网络标识符数组,用于 RPC。
参数 me
是对等体数组中这个对等体的索引。Start
要求 Raft 启动解决,将命令附加到日志中。Start()
应该立刻返回,而不须要期待日志追加实现。
该服务心愿您的实现为每个新提交的日志条目发送 ApplyMsg
到Make()
的 applyCh 通道参数。
go 蕴含发送 RPC(sendRequestVote()
)和解决传入 RPC(RequestVote()
)的示例代码。你的 Raft 对等体应该应用 labrpc
包(源文件在 src/labrpc
中)来通信。
测试人员能够通知 labrpc
提早 rpc,重新排列它们,并抛弃它们以模仿各种网络故障。当您能够长期批改 labrpc 时,请确保您的 Raft 与原始 labrpc 一起工作,因为咱们将应用它来测试和评分。
Raft 实例只能与 RPC 交互; 例如,它们不容许应用共享的 Go 变量或文件进行通信。后续的试验建设在这个试验的根底上,所以给本人足够的工夫来编写牢靠的代码是很重要的。
Lab2A
实现 Raft Leader
选举和心跳 (没有日志条目标AppendEntries rpc
)。
第 2A 局部的指标是选出一个领导者,如果没有失谬误,这个领导者依然是领导者,如果老领导者失败了,或者与老领导者的数据包失落了,新的领导者会接替他。
运行 go test -run 2A 来测试 2A 代码。
首先咱们须要定义分明 Raft 以及两种 rpc 的参数类型:
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// 2A
state State // 身份,follower/candidate/leader
currentTerm int // 以后任期
votedFor int // 投票给哪个节点
logs []Entry // 日志
ElectionTimer *time.Timer // 选举超时定时器,对于所有 slave 节点来说,如果超时,则成为候选人,发动选举
HeartBeatTimer *time.Timer // 心跳定时器,用于同步日志
}
type RequestVoteArgs struct {
// 2A
Term int // 候选人的日志
CandidateId int // 候选人的 id
LastLogIndex int // 候选人最初一条日志的 index
LastLogTerm int // 候选人最初一条日志的任期
}
type RequestVoteReply struct {
// 2A
Term int // 返回的任期
VoteGranted bool // 是否投票
}
type AppendEntriesArgs struct {
Term int // leader 的 term
LeaderId int // leader 的 id
PrevLogIndex int // 之前附加日志的 index
PrevLogTerm int // 之前附加日志的 term
Entries []Entry // 附加日志(如果只是心跳则是空的)LeaderCommitIndex int // leader 的已提交日志 index
}
type AppendEntriesReply struct {
Term int // 任期
Success bool // 心跳 / 同步日志是否胜利
}
咱们须要关注这样几个 loop:
Raft 的超时选举 loop 和 Raft 的心跳 loop,他们一直循环解决选举定时器超时和心跳定时器超时的事件。
func (rf *Raft) ElectionLoop() {
for {
select {
case <-rf.ElectionTimer.C:
// 选举定时器超时,发动选举
rf.mu.Lock()
// 成为候选者
rf.SwitchState(Candidate)
rf.mu.Unlock()
ch := make(chan bool, len(rf.peers))
lastLogTerm, lastLogIndex := rf.getLastState()
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
// 申请 slave 投票
for slave, _ := range rf.peers {
if slave == rf.me {
// 不必给本人发送
continue
}
go func(slave int) {
// call
reply := RequestVoteReply{}
flag := rf.sendRequestVote(slave, &args, &reply)
if flag {
if reply.VoteGranted {ch <- true}
}
}(slave)
}
// 期待投票并成为 leader
go rf.WaitForVote(ch)
}
}
}
// 心跳
func (rf *Raft) HeartBeatLoop() {
for {
select {
case <-rf.HeartBeatTimer.C:
// 同步所有的心跳
rf.sendHeartBeat()
rf.mu.Lock()
rf.ResetHeartBeatTimer()
rf.mu.Unlock()}
}
}
几个留神点是:
-
每次心跳发送完之后就要重置选举定时器,防止 leader 触发选举。
func (rf *Raft) sendHeartBeat() { // 发送心跳 rf.mu.Lock() if rf.state != Leader {rf.mu.Unlock() return } rf.mu.Unlock() heartBeatNum := 0 wg := sync.WaitGroup{} wg.Add(len(rf.peers) - 1) log.Printf("HeartBeat timeout,server=%v\n", rf.me) for slave := 0; slave < len(rf.peers); slave++ { if slave == rf.me {continue} go func(slave int) {defer wg.Done() rf.mu.Lock() // 互斥锁保障一次心跳的信息是某个状态的 args := &AppendEntriesArgs{ Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: len(rf.logs) - 1, PrevLogTerm: rf.logs[len(rf.logs)-1].Term, Entries: nil, LeaderCommitIndex: rf.commitIndex, } rf.mu.Unlock() reply := &AppendEntriesReply{} if ok := rf.sendAppendEntries(slave, args, reply); ok {rf.mu.Lock() heartBeatNum++ rf.mu.Unlock()} }(slave) } // 期待所有 rpc 实现 wg.Wait() log.Printf("[HeartBeat]%v recv all slave reply: %v\n", rf.me, heartBeatNum) rf.mu.Lock() rf.ResetElectionTimer() // 重置选举定时器 rf.mu.Unlock()}
- 心跳定时器和选举定时器超时后都要重置定时器。
- 每次进行 rpc 通信后要依据 term 查看本人的状态,若对方 term 大于本人,则本人变为 follower,votefor=-1。
两个 RPC(AppendEntries、RequestVote)实现是本 lab 的重中之重:
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {rf.mu.Lock()
defer rf.mu.Unlock()
// log.Printf("[AppendEntries] Before: Server[%v] state[%v],currentTerm[%v],args[%+v]\n", rf.me, rf.state, rf.currentTerm, args)
rf.checkTermOrUpdateState(args.Term)
reply.Term = rf.currentTerm
reply.Success = true
// reply false if term < currentTerm
if args.Term < rf.currentTerm {
reply.Success = false
return
}
// 到这里阐明是实在的 leader
rf.ResetElectionTimer()
// 如果 term 雷同,然而本人是候选者,则转变为追随者
if args.Term >= rf.currentTerm && rf.state == Candidate {rf.SwitchState(Follower)
}
// 如果对方 term 大于本人,则更新 term,并且成为追随者
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.SwitchState(Follower)
}
if args.Entries == nil || len(args.Entries) <= 0 {
// 日志为空,阐明这是心跳
return
}
// 如果 preLogIndex 不存在 logs 中或者 preLogIndex 的日志对应 term 不匹配,则同步日志失败
if args.PrevLogIndex >= len(rf.logs) || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
return
}
// 如果一个曾经存在日志在新的地位抵触,删除曾经存在日志
i := 0
for ; i < len(args.Entries); i++ {
index := i + args.PrevLogIndex + 1
entry := args.Entries[i]
// 先把之前雷同的局部同步
if index < len(rf.logs) && rf.logs[index].Term != entry.Term {rf.logs = rf.logs[:index]
break
}
}
// 补充之前不在 logs 中的日志
for ; i < len(args.Entries); i++ {rf.logs = append(rf.logs, args.Entries[i])
}
// 更新 leadercommit
if args.LeaderCommitIndex > rf.commitIndex {
// commitIndex 不能超出 rf.logs 的范畴
rf.commitIndex = min(args.LeaderCommitIndex, len(rf.logs)-1)
}
}
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (2A, 2B).
// 裸露给其余 raft 节点的 rpc 接口
// 2A:
rf.mu.Lock()
defer rf.mu.Unlock()
// log.Printf("[RequestVote] Start: Server[%v] state[%v],currentTerm[%v],args[%+v]\n", rf.me, rf.state, rf.currentTerm, args)
reply.Term = rf.currentTerm
reply.VoteGranted = false
// 任期大于对方回绝
rf.checkTermOrUpdateState(args.Term)
if args.Term < rf.currentTerm {return}
// 本人曾经投票并且没有投票给对方,回绝
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {return}
// 留神,如果两个被申请投票节点是 leader 节点,然而其实两个节点的 term 雷同,log 也完全相同
// 如何保障 leader 不会投票给申请方,这里须要保障 leader 的 voteFor 是 leader 本人
if rf.currentTerm < args.Term {
rf.state = Follower
rf.votedFor = args.CandidateId
reply.VoteGranted = true
return
}
lastLogTerm, lastLogIndex := rf.getLastState()
if args.LastLogTerm > lastLogTerm ||
args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex {
// 如果最初一条日志的任期号和候选人的任期号雷同,则比拟已提交日志的 index(目标是保障已提交日志不会被笼罩)reply.VoteGranted = true
// 成为 follower
rf.state = Follower
rf.votedFor = args.CandidateId
} else {return}
}
这里的实现次要是要仔细阅读论文,而后多 debug。
Lab2B
测试命令:go test -run 2B
。
咱们先来浏览一下提醒:
- 第一个指标应该是先通过 TestBasicAgree2B()。首先是实现 Start(), 而后编写代码通过 AppendEntries rpc 发送和接管新的日志条目。
- 您将须要执行选举限度(论文的 5.4.1 节)。
- 在 2B 实验室晚期的测试中,一个无奈达成统一的办法是,即便领导人还活着,也要举办屡次选举。
寻找选举计时器治理中的破绽,或者在博得选举后没有立刻发送心跳。您的代码可能有反复查看某些事件的循环。
不要让这些循环间断执行而不暂停,因为这会使您的实现十分慢,从而导致测试失败。
应用 Go 的条件变量,或者插入工夫。Sleep(10 * time. 毫秒)在每个循环迭代中。
咱们先来看看 start
函数的实现,这里留神函数的返回值并且将 command 生成对应的 log 增加到日志中即可。返回值 index、term、isLeader,
别离表白这个 command 的序列号(日志中的程序)、任期、以及对应 raft 节点是否是 leader(只有 leader 才会将 command 增加到日志中)。
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
if term, isLeader = rf.GetState(); isLeader {rf.mu.Lock()
rf.logs = append(rf.logs, Entry{
Term: rf.currentTerm,
Command: command,
})
rf.matchIndex[rf.me] = rf.nextIndex[rf.me] + len(rf.logs) - 1
index = len(rf.logs) - 1
if Debug > 0 {log.Printf("[Start] Server[%v] replicate command to log | current term[%d] | current state[%d]\n | log length[%d]",
rf.me, rf.currentTerm, rf.state, len(rf.logs))
}
rf.mu.Unlock()}
return index, term, isLeader
}
咱们留神到第 2 个提醒,这里要实现选举限度,咱们先来看看论文中是如何形容的:
在任何基于领导人的一致性算法中,领导人都必须存储所有曾经提交的日志条目。
在某些一致性算法中,例如 Viewstamped Replication,某个节点即便是一开始并没有蕴含所有曾经提交的日志条目,
它也能被选为领导者。这些算法都蕴含一些额定的机制来辨认失落的日志条目并把他们传送给新的领导人,
要么是在选举阶段要么在之后很快进行。可怜的是,这种办法会导致相当大的额定的机制和复杂性。
Raft 应用了一种更加简略的办法,它能够保障所有之前的任期号中曾经提交的日志条目在选举的时候都会呈现在新的领导人中,
不须要传送这些日志条目给领导人。这意味着日志条目标传送是单向的,只从领导人传给跟随者,
并且领导人从不会笼罩本身本地日志中曾经存在的条目。
Raft 应用投票的形式来阻止一个候选人博得选举除非这个候选人蕴含了所有曾经提交的日志条目。
候选人为了博得选举必须分割集群中的大部分节点,这意味着每一个曾经提交的日志条目在这些服务器节点中必定存在于至多一个节点上。
如果候选人的日志至多和大多数的服务器节点一样新(这个新的定义会在上面探讨),那么他肯定持有了所有曾经提交的日志条目。
申请投票 RPC 实现了这样的限度:RPC 中蕴含了候选人的日志信息,而后投票人会回绝掉那些日志没有本人新的投票申请。
Raft 通过比拟两份日志中最初一条日志条目标索引值和任期号定义谁的日志比拟新。
如果两份日志最初的条目标任期号不同,那么任期号大的日志更加新。
如果两份日志最初的条目任期号雷同,那么日志比拟长的那个就更加新。
用程序语言示意为:
args.LastLogTerm > lastLogTerm ||
args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex
// 如果候选人的最初一个日志的 term 大于以后节点的 lastLogTerm,则阐明肯定更新
// 如果 lastLogTerm 相等,则比拟 lastLogIndex
// 残缺代码如下:func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {rf.mu.Lock()
defer rf.mu.Unlock()
// 任期大于对方回绝
rf.checkTermOrUpdateState(args.Term)
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
// 本人曾经投票并且没有投票给对方,回绝
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
reply.Term = rf.currentTerm
reply.VoteGranted = false
}
lastLogTerm, lastLogIndex := rf.getLastState()
if args.LastLogTerm > lastLogTerm || args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex {
// 如果最初一条日志的任期号和候选人的任期号雷同,则比拟已提交日志的 index(目标是保障已提交日志不会被笼罩)reply.VoteGranted = true
// 成为 follower
rf.state = Follower
rf.votedFor = args.CandidateId
} else {return}
}
提醒 3 通知咱们,即便某些 leader 还活着的时候依然可能存在反复选举的问题,
咱们最好在一个候选人成为 leader 时立即向其余节点发送心跳来防止屡次选举。
实现如下:
func (rf *Raft) Election() {
// 选举定时器超时,发动选举
rf.mu.Lock()
// 成为候选者
rf.SwitchState(Candidate)
lastLogTerm, lastLogIndex := rf.getLastState()
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
rf.mu.Unlock()
// 申请 slave 投票
numberVoted := 1
for slave, _ := range rf.peers {
if slave == rf.me {continue}
go func(slave int) {reply := RequestVoteReply{}
if rf.sendRequestVote(slave, &args, &reply) {rf.mu.Lock()
if args.Term != rf.currentTerm {return}
if reply.VoteGranted {
numberVoted += 1
if numberVoted > len(rf.peers)/2 && rf.state == Candidate {rf.SwitchState(Leader)
for i := 0; i < len(rf.peers); i++ {rf.matchIndex[i] = 0
rf.nextIndex[i] = len(rf.logs)
}
rf.mu.Unlock()
// 发送心跳
rf.sendHeartBeat()
rf.mu.Lock()}
}
rf.mu.Unlock()}
}(slave)
}
}
发送心跳的代码如下:
func (rf *Raft) sendHeartBeat() {
// 发送心跳
rf.mu.Lock()
if rf.state != Leader {rf.mu.Unlock()
return
}
rf.ResetElectionTimer()
rf.ResetHeartBeatTimer()
if Debug > 0 {log.Printf("Server[%d]start send heartbeat\n", rf.me)
}
rf.mu.Unlock()
for slave := 0; slave < len(rf.peers); slave++ {
if slave == rf.me {continue}
go func(slave int) {for !rf.sendHeartBeatTo(slave) {}}(slave)
}
}
func (rf *Raft) sendHeartBeatTo(slave int) (res bool) {
res = true
rf.mu.Lock()
// 互斥锁保障一次心跳的信息是某个状态的
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[slave] - 1, // 示意下一个要同步给 slave 的 index
PrevLogTerm: rf.logs[rf.nextIndex[slave]-1].Term,
Entries: rf.logs[rf.nextIndex[slave]:],
LeaderCommitIndex: rf.commitIndex,
}
rf.mu.Unlock()
reply := &AppendEntriesReply{}
if ok := rf.sendAppendEntries(slave, args, reply); ok {rf.mu.Lock()
if rf.state != Leader {
// 留神,不是 leader 了
rf.mu.Unlock()
return
}
if rf.currentTerm != args.Term {
// 不是同一个任期内的 leader
rf.mu.Unlock()
return
}
if reply.Success {
// 发送心跳胜利了,而后更改已匹配日志条目和下一次发送日志条目
rf.matchIndex[slave] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[slave] = rf.matchIndex[slave] + 1
// 查看
rf.checkN()} else {
if reply.Term > rf.currentTerm {rf.SwitchState(Follower)
rf.currentTerm = reply.Term
rf.persist()
rf.mu.Unlock()
return false
}
rf.nextIndex[slave] = reply.ConflictIndex
rf.mu.Unlock()
return false
}
rf.mu.Unlock()}
return
}
几个留神的中央,首先发送心跳前要查看身份是否是 leader,只有 leader 能力发送心跳。
要在临界区内生成心跳的参数,而后发送时也要查看是否是 leader。
rpc 胜利后,如果心跳承受了,要更新 leader 的 matchIndex、nextIndex(日志同步状态),并且调用 checkN()
来更新
commitIndex。
如果心跳没有被承受,则依据返回值的 conflictIndex,来更新日志同步状态以便下一次心跳。
对于几个测试的注意事项:
TestFailAgree2B:这个测试要多留神的是 AppendEntries()函数的解决。
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {rf.mu.Lock()
defer rf.mu.Unlock()
rf.checkTermOrUpdateState(args.Term)
reply.Term = rf.currentTerm
reply.Success = true
// reply false if term < currentTerm
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
// 到这里阐明是实在的 leader
rf.ResetElectionTimer()
// 如果本人是候选者,则转变为追随者
if rf.state == Candidate {rf.SwitchState(Follower)
}
// 如果 preLogIndex 不存在 logs 中或者 preLogIndex 的日志对应 term 不匹配,则同步日志失败
if args.PrevLogIndex >= len(rf.logs) || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
reply.Term = rf.currentTerm
// 找到 conflictIndex
if len(rf.logs) <= args.PrevLogIndex {reply.ConflictIndex = len(rf.logs)
} else {
// 找到某个 term 的第一个 index
for i := args.PrevLogIndex; i >= 1; i-- {if rf.logs[i].Term != rf.logs[i-1].Term {
reply.ConflictIndex = i
break
}
}
}
return
}
// 如果一个曾经存在日志在新的地位抵触,删除曾经存在日志
conflicIndex := 0
isMatch := true
firstIndex := args.PrevLogIndex + 1
for i := 0; i < len(args.Entries); i++ {if firstIndex+i >= len(rf.logs) || rf.logs[firstIndex+i].Term != args.Entries[i].Term {
isMatch = false
conflicIndex = i
break
}
}
if !isMatch {rf.logs = append(rf.logs[:conflicIndex+firstIndex], args.Entries[conflicIndex:]...)
rf.persist() // 长久化
if Debug > 0 {log.Printf("Server[%d] append entries[conflic=%d,start=%d,len=%d]from server[%d]\n",
rf.me, conflicIndex, conflicIndex+firstIndex, len(args.Entries)-conflicIndex, args.LeaderId)
}
}
// 更新 leadercommit
if args.LeaderCommitIndex > rf.commitIndex {
// commitIndex 不能超出 rf.logs 的范畴
rf.commitIndex = min(args.LeaderCommitIndex, len(rf.logs)-1)
go rf.Apply()}
reply.Term = rf.currentTerm
reply.Success = true
}
TestBackup2B:这个测试次要是不要打印日志,日志会影响效率。