Lab2
这个lab是要实现一个Raft一致性协定。
lab地址:http://nil.csail.mit.edu/6.82…
raft论文地址:http://nil.csail.mit.edu/6.82…
测试形式:
$ cd src/raft
$ go test
Test (2A): initial election ...
--- FAIL: TestInitialElection2A (5.09s)
config.go:333: expected one leader, got none
Test (2A): election after network failure ...
--- FAIL: TestReElection2A (4.99s)
config.go:333: expected one leader, got none
Test (2B): basic agreement ...
--- FAIL: TestBasicAgree2B (10.05s)
config.go:478: one(100) failed to reach agreement
Test (2B): RPC byte count ...
--- FAIL: TestRPCBytes2B (10.05s)
config.go:478: one(99) failed to reach agreement
Test (2B): agreement despite follower disconnection ...
--- FAIL: TestFailAgree2B (10.00s)
config.go:478: one(101) failed to reach agreement
Test (2B): no agreement if too many followers disconnect ...
lab阐明: 通过在raft/raft.go
中增加代码来实现Raft协定。在该文件中,您将找到框架代码,以及如何发送和接管rpc的示例。
您的实现必须反对以下接口,测试和最终的kv服务器将应用该接口。您能够在raft.go的评论中找到更多细节。
// create a new Raft server instance:
rf := Make(peers, me, persister, applyCh)
// start agreement on a new log entry:
rf.Start(command interface{}) (index, term, isleader)
// ask a Raft for its current term, and whether it thinks it is leader
rf.GetState() (term, isLeader)
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester).
type ApplyMsg
服务调用Make(peers,me,…)
来创立Raft节点。peer
参数是Raft对等点(包含这个)的网络标识符数组,用于RPC。
参数me
是对等体数组中这个对等体的索引。Start
要求Raft启动解决,将命令附加到日志中。Start()
应该立刻返回,而不须要期待日志追加实现。
该服务心愿您的实现为每个新提交的日志条目发送ApplyMsg
到Make()
的applyCh通道参数。
go蕴含发送RPC(sendRequestVote()
)和解决传入RPC(RequestVote()
)的示例代码。你的Raft对等体应该应用labrpc
包(源文件在src/labrpc
中)来通信。
测试人员能够通知labrpc
提早rpc,重新排列它们,并抛弃它们以模仿各种网络故障。当您能够长期批改labrpc时,请确保您的Raft与原始labrpc一起工作,因为咱们将应用它来测试和评分。
Raft实例只能与RPC交互;例如,它们不容许应用共享的Go变量或文件进行通信。后续的试验建设在这个试验的根底上,所以给本人足够的工夫来编写牢靠的代码是很重要的。
Lab2A
实现Raft Leader
选举和心跳(没有日志条目标AppendEntries rpc
)。
第2A局部的指标是选出一个领导者,如果没有失谬误,这个领导者依然是领导者,如果老领导者失败了,或者与老领导者的数据包失落了,新的领导者会接替他。
运行go test -run 2A 来测试2A代码。
首先咱们须要定义分明Raft以及两种rpc的参数类型:
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// 2A
state State // 身份,follower/candidate/leader
currentTerm int // 以后任期
votedFor int // 投票给哪个节点
logs []Entry // 日志
ElectionTimer *time.Timer // 选举超时定时器,对于所有slave节点来说,如果超时,则成为候选人,发动选举
HeartBeatTimer *time.Timer // 心跳定时器,用于同步日志
}
type RequestVoteArgs struct {
// 2A
Term int // 候选人的日志
CandidateId int // 候选人的id
LastLogIndex int // 候选人最初一条日志的index
LastLogTerm int // 候选人最初一条日志的任期
}
type RequestVoteReply struct {
// 2A
Term int // 返回的任期
VoteGranted bool // 是否投票
}
type AppendEntriesArgs struct {
Term int // leader的term
LeaderId int // leader的id
PrevLogIndex int // 之前附加日志的index
PrevLogTerm int // 之前附加日志的term
Entries []Entry // 附加日志(如果只是心跳则是空的)
LeaderCommitIndex int // leader的已提交日志index
}
type AppendEntriesReply struct {
Term int // 任期
Success bool // 心跳/同步日志是否胜利
}
咱们须要关注这样几个loop:
Raft的超时选举loop和Raft的心跳loop,他们一直循环解决选举定时器超时和心跳定时器超时的事件。
func (rf *Raft) ElectionLoop() {
for {
select {
case <-rf.ElectionTimer.C:
// 选举定时器超时,发动选举
rf.mu.Lock()
// 成为候选者
rf.SwitchState(Candidate)
rf.mu.Unlock()
ch := make(chan bool, len(rf.peers))
lastLogTerm, lastLogIndex := rf.getLastState()
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
// 申请slave投票
for slave, _ := range rf.peers {
if slave == rf.me {
// 不必给本人发送
continue
}
go func(slave int) {
// call
reply := RequestVoteReply{}
flag := rf.sendRequestVote(slave, &args, &reply)
if flag {
if reply.VoteGranted {
ch <- true
}
}
}(slave)
}
// 期待投票并成为leader
go rf.WaitForVote(ch)
}
}
}
// 心跳
func (rf *Raft) HeartBeatLoop() {
for {
select {
case <-rf.HeartBeatTimer.C:
// 同步所有的心跳
rf.sendHeartBeat()
rf.mu.Lock()
rf.ResetHeartBeatTimer()
rf.mu.Unlock()
}
}
}
几个留神点是:
-
每次心跳发送完之后就要重置选举定时器,防止leader触发选举。
func (rf *Raft) sendHeartBeat() { // 发送心跳 rf.mu.Lock() if rf.state != Leader { rf.mu.Unlock() return } rf.mu.Unlock() heartBeatNum := 0 wg := sync.WaitGroup{} wg.Add(len(rf.peers) - 1) log.Printf("HeartBeat timeout,server=%v\n", rf.me) for slave := 0; slave < len(rf.peers); slave++ { if slave == rf.me { continue } go func(slave int) { defer wg.Done() rf.mu.Lock() // 互斥锁保障一次心跳的信息是某个状态的 args := &AppendEntriesArgs{ Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: len(rf.logs) - 1, PrevLogTerm: rf.logs[len(rf.logs)-1].Term, Entries: nil, LeaderCommitIndex: rf.commitIndex, } rf.mu.Unlock() reply := &AppendEntriesReply{} if ok := rf.sendAppendEntries(slave, args, reply); ok { rf.mu.Lock() heartBeatNum++ rf.mu.Unlock() } }(slave) } // 期待所有rpc实现 wg.Wait() log.Printf("[HeartBeat]%v recv all slave reply: %v\n", rf.me, heartBeatNum) rf.mu.Lock() rf.ResetElectionTimer() // 重置选举定时器 rf.mu.Unlock() }
- 心跳定时器和选举定时器超时后都要重置定时器。
- 每次进行rpc通信后要依据term查看本人的状态,若对方term大于本人,则本人变为follower,votefor=-1。
两个RPC(AppendEntries、RequestVote)实现是本lab的重中之重:
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// log.Printf("[AppendEntries] Before: Server[%v] state[%v],currentTerm[%v],args[%+v]\n", rf.me, rf.state, rf.currentTerm, args)
rf.checkTermOrUpdateState(args.Term)
reply.Term = rf.currentTerm
reply.Success = true
// reply false if term < currentTerm
if args.Term < rf.currentTerm {
reply.Success = false
return
}
// 到这里阐明是实在的leader
rf.ResetElectionTimer()
// 如果term雷同,然而本人是候选者,则转变为追随者
if args.Term >= rf.currentTerm && rf.state == Candidate {
rf.SwitchState(Follower)
}
// 如果对方term大于本人,则更新term,并且成为追随者
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.SwitchState(Follower)
}
if args.Entries == nil || len(args.Entries) <= 0 {
// 日志为空,阐明这是心跳
return
}
// 如果preLogIndex不存在logs中或者preLogIndex的日志对应term不匹配,则同步日志失败
if args.PrevLogIndex >= len(rf.logs) || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
return
}
// 如果一个曾经存在日志在新的地位抵触,删除曾经存在日志
i := 0
for ; i < len(args.Entries); i++ {
index := i + args.PrevLogIndex + 1
entry := args.Entries[i]
// 先把之前雷同的局部同步
if index < len(rf.logs) && rf.logs[index].Term != entry.Term {
rf.logs = rf.logs[:index]
break
}
}
// 补充之前不在logs中的日志
for ; i < len(args.Entries); i++ {
rf.logs = append(rf.logs, args.Entries[i])
}
// 更新leadercommit
if args.LeaderCommitIndex > rf.commitIndex {
// commitIndex不能超出rf.logs的范畴
rf.commitIndex = min(args.LeaderCommitIndex, len(rf.logs)-1)
}
}
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
// 裸露给其余raft节点的rpc接口
// 2A:
rf.mu.Lock()
defer rf.mu.Unlock()
// log.Printf("[RequestVote] Start: Server[%v] state[%v],currentTerm[%v],args[%+v]\n", rf.me, rf.state, rf.currentTerm, args)
reply.Term = rf.currentTerm
reply.VoteGranted = false
// 任期大于对方回绝
rf.checkTermOrUpdateState(args.Term)
if args.Term < rf.currentTerm {
return
}
// 本人曾经投票并且没有投票给对方,回绝
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
return
}
// 留神,如果两个被申请投票节点是leader节点,然而其实两个节点的term雷同,log也完全相同
// 如何保障leader不会投票给申请方,这里须要保障leader的voteFor是leader本人
if rf.currentTerm < args.Term {
rf.state = Follower
rf.votedFor = args.CandidateId
reply.VoteGranted = true
return
}
lastLogTerm, lastLogIndex := rf.getLastState()
if args.LastLogTerm > lastLogTerm ||
args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex {
// 如果最初一条日志的任期号和候选人的任期号雷同,则比拟已提交日志的index(目标是保障已提交日志不会被笼罩)
reply.VoteGranted = true
// 成为follower
rf.state = Follower
rf.votedFor = args.CandidateId
} else {
return
}
}
这里的实现次要是要仔细阅读论文,而后多debug。
Lab2B
测试命令:go test -run 2B
。
咱们先来浏览一下提醒:
- 第一个指标应该是先通过TestBasicAgree2B()。首先是实现Start(),而后编写代码通过AppendEntries rpc发送和接管新的日志条目。
- 您将须要执行选举限度(论文的5.4.1节)。
- 在2B实验室晚期的测试中,一个无奈达成统一的办法是,即便领导人还活着,也要举办屡次选举。
寻找选举计时器治理中的破绽,或者在博得选举后没有立刻发送心跳。您的代码可能有反复查看某些事件的循环。
不要让这些循环间断执行而不暂停,因为这会使您的实现十分慢,从而导致测试失败。
应用Go的条件变量,或者插入工夫。Sleep(10 * time.毫秒)在每个循环迭代中。
咱们先来看看start
函数的实现,这里留神函数的返回值并且将command生成对应的log增加到日志中即可。返回值index、term、isLeader,
别离表白这个command的序列号(日志中的程序)、任期、以及对应raft节点是否是leader(只有leader才会将command增加到日志中)。
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
if term, isLeader = rf.GetState(); isLeader {
rf.mu.Lock()
rf.logs = append(rf.logs, Entry{
Term: rf.currentTerm,
Command: command,
})
rf.matchIndex[rf.me] = rf.nextIndex[rf.me] + len(rf.logs) - 1
index = len(rf.logs) - 1
if Debug > 0 {
log.Printf("[Start] Server[%v] replicate command to log | current term[%d] | current state[%d]\n | log length[%d]",
rf.me, rf.currentTerm, rf.state, len(rf.logs))
}
rf.mu.Unlock()
}
return index, term, isLeader
}
咱们留神到第2个提醒,这里要实现选举限度,咱们先来看看论文中是如何形容的:
在任何基于领导人的一致性算法中,领导人都必须存储所有曾经提交的日志条目。
在某些一致性算法中,例如 Viewstamped Replication,某个节点即便是一开始并没有蕴含所有曾经提交的日志条目,
它也能被选为领导者。这些算法都蕴含一些额定的机制来辨认失落的日志条目并把他们传送给新的领导人,
要么是在选举阶段要么在之后很快进行。可怜的是,这种办法会导致相当大的额定的机制和复杂性。
Raft应用了一种更加简略的办法,它能够保障所有之前的任期号中曾经提交的日志条目在选举的时候都会呈现在新的领导人中,
不须要传送这些日志条目给领导人。这意味着日志条目标传送是单向的,只从领导人传给跟随者,
并且领导人从不会笼罩本身本地日志中曾经存在的条目。
Raft应用投票的形式来阻止一个候选人博得选举除非这个候选人蕴含了所有曾经提交的日志条目。
候选人为了博得选举必须分割集群中的大部分节点,这意味着每一个曾经提交的日志条目在这些服务器节点中必定存在于至多一个节点上。
如果候选人的日志至多和大多数的服务器节点一样新(这个新的定义会在上面探讨),那么他肯定持有了所有曾经提交的日志条目。
申请投票RPC实现了这样的限度:RPC中蕴含了候选人的日志信息,而后投票人会回绝掉那些日志没有本人新的投票申请。
Raft通过比拟两份日志中最初一条日志条目标索引值和任期号定义谁的日志比拟新。
如果两份日志最初的条目标任期号不同,那么任期号大的日志更加新。
如果两份日志最初的条目任期号雷同,那么日志比拟长的那个就更加新。
用程序语言示意为:
args.LastLogTerm > lastLogTerm ||
args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex
// 如果候选人的最初一个日志的term大于以后节点的lastLogTerm,则阐明肯定更新
// 如果lastLogTerm相等,则比拟lastLogIndex
// 残缺代码如下:
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// 任期大于对方回绝
rf.checkTermOrUpdateState(args.Term)
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
// 本人曾经投票并且没有投票给对方,回绝
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
reply.Term = rf.currentTerm
reply.VoteGranted = false
}
lastLogTerm, lastLogIndex := rf.getLastState()
if args.LastLogTerm > lastLogTerm || args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex {
// 如果最初一条日志的任期号和候选人的任期号雷同,则比拟已提交日志的index(目标是保障已提交日志不会被笼罩)
reply.VoteGranted = true
// 成为follower
rf.state = Follower
rf.votedFor = args.CandidateId
} else {
return
}
}
提醒3通知咱们,即便某些leader还活着的时候依然可能存在反复选举的问题,
咱们最好在一个候选人成为leader时立即向其余节点发送心跳来防止屡次选举。
实现如下:
func (rf *Raft) Election() {
// 选举定时器超时,发动选举
rf.mu.Lock()
// 成为候选者
rf.SwitchState(Candidate)
lastLogTerm, lastLogIndex := rf.getLastState()
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
rf.mu.Unlock()
// 申请slave投票
numberVoted := 1
for slave, _ := range rf.peers {
if slave == rf.me {
continue
}
go func(slave int) {
reply := RequestVoteReply{}
if rf.sendRequestVote(slave, &args, &reply) {
rf.mu.Lock()
if args.Term != rf.currentTerm {
return
}
if reply.VoteGranted {
numberVoted += 1
if numberVoted > len(rf.peers)/2 && rf.state == Candidate {
rf.SwitchState(Leader)
for i := 0; i < len(rf.peers); i++ {
rf.matchIndex[i] = 0
rf.nextIndex[i] = len(rf.logs)
}
rf.mu.Unlock()
// 发送心跳
rf.sendHeartBeat()
rf.mu.Lock()
}
}
rf.mu.Unlock()
}
}(slave)
}
}
发送心跳的代码如下:
func (rf *Raft) sendHeartBeat() {
// 发送心跳
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return
}
rf.ResetElectionTimer()
rf.ResetHeartBeatTimer()
if Debug > 0 {
log.Printf("Server[%d]start send heartbeat\n", rf.me)
}
rf.mu.Unlock()
for slave := 0; slave < len(rf.peers); slave++ {
if slave == rf.me {
continue
}
go func(slave int) {
for !rf.sendHeartBeatTo(slave) {
}
}(slave)
}
}
func (rf *Raft) sendHeartBeatTo(slave int) (res bool) {
res = true
rf.mu.Lock()
// 互斥锁保障一次心跳的信息是某个状态的
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[slave] - 1, // 示意下一个要同步给slave的index
PrevLogTerm: rf.logs[rf.nextIndex[slave]-1].Term,
Entries: rf.logs[rf.nextIndex[slave]:],
LeaderCommitIndex: rf.commitIndex,
}
rf.mu.Unlock()
reply := &AppendEntriesReply{}
if ok := rf.sendAppendEntries(slave, args, reply); ok {
rf.mu.Lock()
if rf.state != Leader {
// 留神,不是leader了
rf.mu.Unlock()
return
}
if rf.currentTerm != args.Term {
// 不是同一个任期内的leader
rf.mu.Unlock()
return
}
if reply.Success {
// 发送心跳胜利了,而后更改已匹配日志条目和下一次发送日志条目
rf.matchIndex[slave] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[slave] = rf.matchIndex[slave] + 1
// 查看
rf.checkN()
} else {
if reply.Term > rf.currentTerm {
rf.SwitchState(Follower)
rf.currentTerm = reply.Term
rf.persist()
rf.mu.Unlock()
return false
}
rf.nextIndex[slave] = reply.ConflictIndex
rf.mu.Unlock()
return false
}
rf.mu.Unlock()
}
return
}
几个留神的中央,首先发送心跳前要查看身份是否是leader,只有leader能力发送心跳。
要在临界区内生成心跳的参数,而后发送时也要查看是否是leader。
rpc胜利后,如果心跳承受了,要更新leader的matchIndex、nextIndex(日志同步状态),并且调用checkN()
来更新
commitIndex。
如果心跳没有被承受,则依据返回值的conflictIndex,来更新日志同步状态以便下一次心跳。
对于几个测试的注意事项:
TestFailAgree2B:这个测试要多留神的是AppendEntries()函数的解决。
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.checkTermOrUpdateState(args.Term)
reply.Term = rf.currentTerm
reply.Success = true
// reply false if term < currentTerm
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
// 到这里阐明是实在的leader
rf.ResetElectionTimer()
// 如果本人是候选者,则转变为追随者
if rf.state == Candidate {
rf.SwitchState(Follower)
}
// 如果preLogIndex不存在logs中或者preLogIndex的日志对应term不匹配,则同步日志失败
if args.PrevLogIndex >= len(rf.logs) || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
reply.Term = rf.currentTerm
// 找到conflictIndex
if len(rf.logs) <= args.PrevLogIndex {
reply.ConflictIndex = len(rf.logs)
} else {
// 找到某个term的第一个index
for i := args.PrevLogIndex; i >= 1; i-- {
if rf.logs[i].Term != rf.logs[i-1].Term {
reply.ConflictIndex = i
break
}
}
}
return
}
// 如果一个曾经存在日志在新的地位抵触,删除曾经存在日志
conflicIndex := 0
isMatch := true
firstIndex := args.PrevLogIndex + 1
for i := 0; i < len(args.Entries); i++ {
if firstIndex+i >= len(rf.logs) || rf.logs[firstIndex+i].Term != args.Entries[i].Term {
isMatch = false
conflicIndex = i
break
}
}
if !isMatch {
rf.logs = append(rf.logs[:conflicIndex+firstIndex], args.Entries[conflicIndex:]...)
rf.persist() //长久化
if Debug > 0 {
log.Printf("Server[%d] append entries[conflic=%d,start=%d,len=%d]from server[%d]\n",
rf.me, conflicIndex, conflicIndex+firstIndex, len(args.Entries)-conflicIndex, args.LeaderId)
}
}
// 更新leadercommit
if args.LeaderCommitIndex > rf.commitIndex {
// commitIndex不能超出rf.logs的范畴
rf.commitIndex = min(args.LeaderCommitIndex, len(rf.logs)-1)
go rf.Apply()
}
reply.Term = rf.currentTerm
reply.Success = true
}
TestBackup2B:这个测试次要是不要打印日志,日志会影响效率。
发表回复