共计 4536 个字符,预计需要花费 12 分钟才能阅读完成。
前言
代码上传到集体 github 仓库 6.824
多线程编程因为状况过于简单, 单单通过运行几次 go test -run 2A 命令失去 PASS 是无奈证实代码的可靠性的.
能够通过该门课程助教提供的脚本 test
集体最开始写过的一版代码能通过 1000 次测试, 起初通过从新设计思考后才发现有显著的 bug.
所以同学们能够多应用该脚本多跑几次, 看看有无出错, 查找 log 日志发现错误在哪里.
有须要的同学还能够批改测试代码.
该版本代码能通过上述脚本运行 2000/2000 次测试无谬误, 集体不敢保障 bug free, 如有发现错误望能斧正.
运行模型
raft 次要由三个局部组成:
- 主部(只能由此局部批改 raft 状态, 计时等工作都由此局部进行)
- 发送信息局部(进行 rpc 调用)
- 解决接管信息局部(响应其它 raft 的 rpc 调用, 响应本人 rpc 调用收到的 reply)
同步变量
type Raft struct {
rpcMutex sync.Mutex // 将收到, 收回 rpc 调用实现之后的数据处理串行化 但并发调用 rpc
stateMutex sync.Mutex // 爱护 raft 状态的读写平安
// 用于外部数据同步
requestChan chan InnerRequest
responseChan chan InnerResponse
timer *time.Timer
}
raft 相当于状态机, 要扭转 raft 的状态只有两种办法 超时 和 接管信息
运行形式如下
- raft 主部串行执行, 在加锁的状态下批改完状态之后, 依据状况抉择是否 reset 计时器, 再开释锁, 应用 select 监听超时信号或者是 解决接管信息局部 发送的信号
- 发送信息局部较为简单, 只须要加锁状态下拷贝 raft 状态, 而后并发进行 rpc 调用即可(留神: 发送时加锁会导致不同的 raft 实体循环调用从而导致死锁)
- 接管的信息是并发达到的, 在所有处理函数前加锁 rpcMutex, 函数退出后再开释该锁, 使得该阶段串行化. 此外, 在执行时如果发现须要告诉 raft 批改状态, 还要通过 requestChan 和 responseChan 与主部进行通信, 因为都是 0 缓存, 往这两个 channel 写数据未被读出时, 写者处于阻塞状态.
代码次要局部
1. MainProcess
通过 rf.state 判断执行分支
func (rf *Raft) MainProcess() {
for {
switch rf.state {
case FOLLOWERSTATE:
rf.FollowerProcess()
case CANDIDATESTATE:
rf.currentTerm += 1
rf.votedFor = rf.me
rf.numOfVotedPeers = 1
for i := range rf.peers {
if i != rf.me {rf.votedStateOfPeers[i] = false
}
}
rf.CandidateProcess()
case LEADERSTATE:
rf.LeaderProcess()
case DEADSTATE:
return
}
}
}
2. CandidateProcess
func (rf *Raft) CandidateProcess() {
//candidate 一个 Term 内能够发送多轮申请选票
// 因而多设置了一个 tmpTimer 这一个计时器
tmpTimer := time.NewTimer(HEARTBEATS_INTERVAL)
//Term 计时器
rf.timer.Reset(GetTimeoutInterval())
// 并发发送选票申请
go rf.sendAllRequestVote()
for {
// 在进入监听状态之前要对 stateMutex 进行解锁
rf.stateMutex.Unlock()
select {
// 该轮 Term 超时
case <-rf.timer.C:
rf.stateMutex.Lock()
// 这里的冗余代码是为了不便揭示, 该版本有多处冗余代码
rf.state = CANDIDATESTATE
return
// 超时, 发动该 Term 内的又一轮选票申请
case <-tmpTimer.C:
rf.stateMutex.Lock()
tmpTimer.Reset(GetTimeoutInterval())
go rf.sendAllRequestVote()
continue
// 解决接管信息函数发来的信号
case tmp := <-rf.requestChan:
rf.stateMutex.Lock()
// 操作类型
operation := tmp.operation
// 该信号对应的 Term,
term := tmp.term
extraInf := tmp.extraInf
//Term 过期, 摈弃
if term < rf.currentTerm {rf.responseChan <- InnerResponse{false, UNDIFINE, rf.currentTerm, []int{}}
continue
}
switch operation {
case NEWTERM:
// 这里判断条件能够是 ==
if term <= rf.currentTerm {
// 因为解决局部阻塞在该 channel 上, 即便不操作, 也要开释信号
rf.responseChan <- InnerResponse{false, UNDIFINE, rf.currentTerm, []int{}}
//continue 用于不须要重启计时的操作的分支的退出
continue
} else {if !rf.timer.Stop() {<-rf.timer.C}
rf.state = FOLLOWERSTATE
rf.currentTerm = extraInf[0]
rf.votedFor = -1
rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}
//return 用于须要重启计时的操作的分支的退出
// 返回至 MainProcess()
// 因而这种分支之前须要排空 rf.timer.C
return
}
case LEGALLEADER:
if !rf.timer.Stop() {<-rf.timer.C}
rf.state = FOLLOWERSTATE
rf.currentTerm = extraInf[0]
rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}
return
case LATERCANDIDATE:
if !rf.timer.Stop() {<-rf.timer.C}
rf.state = FOLLOWERSTATE
rf.currentTerm = extraInf[0]
rf.votedFor = extraInf[1]
rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}
return
case VOTEFOR:
rf.responseChan <- InnerResponse{false, UNDIFINE, rf.currentTerm, []int{}}
continue
case GETVOTE:
if !rf.votedStateOfPeers[extraInf[1]] {
rf.numOfVotedPeers += 1
rf.votedStateOfPeers[extraInf[1]] = true
if rf.numOfVotedPeers > rf.numOfAllPeers/2 {if !rf.timer.Stop() {<-rf.timer.C}
rf.state = LEADERSTATE
rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}
return
} else {continue}
} else {continue}
case BEDEAD:
rf.state = DEADSTATE
rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}
if !rf.timer.Stop() {<-rf.timer.C}
return
}
}
}
}
须要留神的细节
1. 计时器 timer 的应用
在 assignment 的页面里提到了能够应用 time.Sleep()来代替计时性能, 因为 Timer 和 Ticker 难以使用正确, 然而应用 time.Sleep()办法终归是不优雅.
timer 通过 time.NewTimer(Duration)创立, 在通过指定的 Duration 工夫之后, 会往 timer.C 这个 channel 里发送信号, 应用 timer.Stop()能够进行计时, 应用 timer.Reset(Duration)能够重设工夫, 这些是通过简略地浏览文档就能失去的信息.
然而须要留神的一点是调用 timer.Stop()的返回值
在调用 timer.Stop()后正确将计时器进行后,timer.Stop()返回值为 true.
然而当 timer.Stop()在计时器进行后再调用则会返回 false, 为了不影响后序的信号传递, 须要将 timer.C 排空
if !timer.Stop(){<-timer.C}
// 谬误示范:
// 通过 select 语句判断 timer.C 中是否有信号, 若无信号间接通过 default 分支
// 然而问题在于一种状况 timer.Stop()未正确进行, 然而信号量还未发送到 timer.C 上, 此时程序间接从 default 语句通过, 而未正确处理信号量
if !timer.Stop(){
select{
case <- timer.C:
case default:
}
}
2. golang 闭包
和大小写首字母一样对 go 不相熟的人常踩的坑,for 循环中应用 for 定义的参数会因为协程援用同一块地址而导致运行谬误
func (rf *Raft) sendAllAppendEntries() {tmp := rf.getCopy()
args := AppendEntriesArgs{
Term: tmp.currentTerm,
LeaderId: tmp.me,
}
for i := range tmp.peers {
if i != rf.me {
// 应用 tmp_i
tmp_i := i
go rf.sendAppendEntries(tmp_i, &args, &AppendEntriesReply{})
}
}
}
3. goroutine id
调试代码时可能会遇到不可能呈现的输入, 例如 goroutine 杀不洁净导致输入谬误等, 这种状况下能够尝试打印 goroutine id 进行调试.
func GoID() int {var buf [64]byte
n := runtime.Stack(buf[:], false)
// 失去 id 字符串
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine"))[0]
id, err := strconv.Atoi(idField)
if err != nil {panic(fmt.Sprintf("cannot get goroutine id: %v", err))
}
return id
}