前言

代码上传到集体github仓库6.824
多线程编程因为状况过于简单,单单通过运行几次go test -run 2A命令失去PASS是无奈证实代码的可靠性的.
能够通过该门课程助教提供的脚本test
集体最开始写过的一版代码能通过1000次测试,起初通过从新设计思考后才发现有显著的bug.
所以同学们能够多应用该脚本多跑几次,看看有无出错,查找log日志发现错误在哪里.
有须要的同学还能够批改测试代码.
该版本代码能通过上述脚本运行2000/2000次测试无谬误,集体不敢保障bug free,如有发现错误望能斧正.

运行模型

raft次要由三个局部组成:

  1. 主部(只能由此局部批改raft状态, 计时等工作都由此局部进行)
  2. 发送信息局部(进行rpc调用)
  3. 解决接管信息局部(响应其它raft的rpc调用,响应本人rpc调用收到的reply)

同步变量

type Raft struct {    rpcMutex   sync.Mutex // 将收到,收回rpc调用实现之后的数据处理串行化 但并发调用rpc    stateMutex sync.Mutex // 爱护raft状态的读写平安    //用于外部数据同步    requestChan  chan InnerRequest    responseChan chan InnerResponse    timer *time.Timer}

raft相当于状态机,要扭转raft的状态只有两种办法 超时接管信息
运行形式如下

  1. raft主部串行执行,在加锁的状态下批改完状态之后,依据状况抉择是否reset计时器,再开释锁,应用select监听超时信号或者是解决接管信息局部发送的信号
  2. 发送信息局部较为简单,只须要加锁状态下拷贝raft状态,而后并发进行rpc调用即可(留神:发送时加锁会导致不同的raft实体循环调用从而导致死锁)
  3. 接管的信息是并发达到的,在所有处理函数前加锁rpcMutex,函数退出后再开释该锁,使得该阶段串行化. 此外,在执行时如果发现须要告诉raft批改状态,还要通过requestChan和responseChan与主部进行通信,因为都是0缓存,往这两个channel写数据未被读出时,写者处于阻塞状态.

代码次要局部

1. MainProcess

通过rf.state判断执行分支

func (rf *Raft) MainProcess() {    for {        switch rf.state {        case FOLLOWERSTATE:            rf.FollowerProcess()        case CANDIDATESTATE:            rf.currentTerm += 1            rf.votedFor = rf.me            rf.numOfVotedPeers = 1            for i := range rf.peers {                if i != rf.me {                    rf.votedStateOfPeers[i] = false                }            }            rf.CandidateProcess()        case LEADERSTATE:            rf.LeaderProcess()        case DEADSTATE:            return        }    }}

2. CandidateProcess

func (rf *Raft) CandidateProcess() {    //candidate一个Term内能够发送多轮申请选票    //因而多设置了一个tmpTimer这一个计时器    tmpTimer := time.NewTimer(HEARTBEATS_INTERVAL)    //Term计时器    rf.timer.Reset(GetTimeoutInterval())    //并发发送选票申请    go rf.sendAllRequestVote()    for {        //在进入监听状态之前要对stateMutex进行解锁        rf.stateMutex.Unlock()        select {        //该轮Term超时        case <-rf.timer.C:            rf.stateMutex.Lock()            //这里的冗余代码是为了不便揭示,该版本有多处冗余代码            rf.state = CANDIDATESTATE            return        //超时,发动该Term内的又一轮选票申请        case <-tmpTimer.C:            rf.stateMutex.Lock()            tmpTimer.Reset(GetTimeoutInterval())            go rf.sendAllRequestVote()            continue        //解决接管信息函数发来的信号        case tmp := <-rf.requestChan:            rf.stateMutex.Lock()            //操作类型            operation := tmp.operation            //该信号对应的Term,            term := tmp.term            extraInf := tmp.extraInf            //Term过期,摈弃            if term < rf.currentTerm {                rf.responseChan <- InnerResponse{false, UNDIFINE, rf.currentTerm, []int{}}                continue            }            switch operation {            case NEWTERM:                //这里判断条件能够是==                if term <= rf.currentTerm {                    //因为解决局部阻塞在该channel上,即便不操作,也要开释信号                    rf.responseChan <- InnerResponse{false, UNDIFINE, rf.currentTerm, []int{}}                    //continue用于不须要重启计时的操作的分支的退出                    continue                } else {                    if !rf.timer.Stop() {                        <-rf.timer.C                    }                    rf.state = FOLLOWERSTATE                    rf.currentTerm = extraInf[0]                    rf.votedFor = -1                    rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}                    //return用于须要重启计时的操作的分支的退出                    //返回至MainProcess()                    //因而这种分支之前须要排空rf.timer.C                    return                }            case LEGALLEADER:                if !rf.timer.Stop() {                    <-rf.timer.C                }                rf.state = FOLLOWERSTATE                rf.currentTerm = extraInf[0]                rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}                return            case LATERCANDIDATE:                if !rf.timer.Stop() {                    <-rf.timer.C                }                rf.state = FOLLOWERSTATE                rf.currentTerm = extraInf[0]                rf.votedFor = extraInf[1]                rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}                return            case VOTEFOR:                rf.responseChan <- InnerResponse{false, UNDIFINE, rf.currentTerm, []int{}}                continue            case GETVOTE:                if !rf.votedStateOfPeers[extraInf[1]] {                    rf.numOfVotedPeers += 1                    rf.votedStateOfPeers[extraInf[1]] = true                    if rf.numOfVotedPeers > rf.numOfAllPeers/2 {                        if !rf.timer.Stop() {                            <-rf.timer.C                        }                        rf.state = LEADERSTATE                        rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}                        return                    } else {                        continue                    }                } else {                    continue                }            case BEDEAD:                rf.state = DEADSTATE                rf.responseChan <- InnerResponse{true, UNDIFINE, rf.currentTerm, []int{}}                if !rf.timer.Stop() {                    <-rf.timer.C                }                return            }        }    }}

须要留神的细节

1. 计时器timer的应用

在assignment的页面里提到了能够应用time.Sleep()来代替计时性能,因为Timer和Ticker难以使用正确,然而应用time.Sleep()办法终归是不优雅.
timer通过time.NewTimer(Duration)创立,在通过指定的Duration工夫之后,会往timer.C这个channel里发送信号,应用timer.Stop()能够进行计时,应用timer.Reset(Duration)能够重设工夫,这些是通过简略地浏览文档就能失去的信息.
然而须要留神的一点是调用timer.Stop()的返回值
在调用timer.Stop()后正确将计时器进行后,timer.Stop()返回值为true.
然而当timer.Stop()在计时器进行后再调用则会返回false,为了不影响后序的信号传递,须要将timer.C排空

if !timer.Stop(){    <-timer.C}
//谬误示范://通过select语句判断timer.C中是否有信号,若无信号间接通过default分支//然而问题在于一种状况timer.Stop()未正确进行,然而信号量还未发送到timer.C上,此时程序间接从default语句通过,而未正确处理信号量if !timer.Stop(){    select{    case <- timer.C:    case default:    }}

2. golang闭包

和大小写首字母一样对go不相熟的人常踩的坑,for循环中应用for定义的参数会因为协程援用同一块地址而导致运行谬误

func (rf *Raft) sendAllAppendEntries() {    tmp := rf.getCopy()    args := AppendEntriesArgs{        Term:     tmp.currentTerm,        LeaderId: tmp.me,    }    for i := range tmp.peers {        if i != rf.me {            //应用tmp_i            tmp_i := i            go rf.sendAppendEntries(tmp_i, &args, &AppendEntriesReply{})        }    }}

3. goroutine id

调试代码时可能会遇到不可能呈现的输入,例如goroutine杀不洁净导致输入谬误等,这种状况下能够尝试打印goroutine id进行调试.

func GoID() int {    var buf [64]byte    n := runtime.Stack(buf[:], false)    // 失去id字符串    idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]    id, err := strconv.Atoi(idField)    if err != nil {        panic(fmt.Sprintf("cannot get goroutine id: %v", err))    }    return id}