说到 raftexample,很多人可能很生疏,我晓得 raft,我也晓得 example,哪来的 raftexample?这里做下简略的介绍,raftexample 是 etcd 外面 raft 模块实现的简略示例,它实现了一个简略的基于 raft 协定的 kvstore 存储集群零碎,并提供了 rest api 以供应用
而 raftexample 仅有以下几个文件,几百行代码,通过浏览,能够帮忙咱们更好的了解 raft 协定,投入产出比超大
演示动画
官网举荐的动画演示地址:http://thesecretlivesofdata.c…
leader 选举
日志同步
概念解析
逻辑时钟
逻辑时钟其实是一个定时器 time.Tick
,每隔一段时间触发一次,是推动 raft 选主的外围,在这外面有几个属性先理解下
electionElapsed:逻辑时钟推动计数,follower 和 leader 没推动一次逻辑时钟,这个数值就会 +1;follower 收到 leader 的心跳音讯后会重置为 0;leader 则在每次发送心跳前重置为 0
heartbeatElapsed:leader 的逻辑时钟推动计数,不仅仅会减少 electionElapsed 计数,还会减少 heartbeatElapsed 的计数
heartbeatTimeout:当 leader 的 heartbeatElapsed 计数达到 heartbeatTimeout 预约义的值的时候,就会向各个节点发送一次心跳
electionTimeout:当 leader 的 electionElapsed 的逻辑时钟推动次数超过这个值的时候,如果 leader 同时开启了 checkQuorum 来判断以后集群各节点的存活状态时,这时候 leader 就会进行探活(探活不会发动网络申请,靠本身存储的各个节点状态)
randomizedElectionTimeout:随机生产的一个值,当 follower 的 electionElapsed 计数达到这个值的时候,就会开始发动新一轮选举
所以,能够看进去,逻辑时钟次要是推动 leader 心跳和探活、follower 的选举的
follower 发动选举的条件:electionElapsed >= randomizedElectionTimeout
leader 发送心跳的条件:heartbeatElapsed >= heartbeatTimeout
leader 发动集群节点探活的条件:electionElapsed > electionTimeout
这里就有一个问题了,leader 节点为什么会有 electionElapsed 和 heartbeatElapsed 两个计数,一个不就能够了吗?
其实是因为,当 leader 发动探活或者计数满足探活条件的时候,electionElapsed 就会被置为 0,所以对于 leader 而言,electionElapsed 跟 heartbeatElapsed 的值并不统一,也不同步
raft.Peer
Peer: 集群中的节点,每一个退出集群的利用过程,都是一个节点
type Peer struct {
ID uint64
Context []byte}
ID: 节点的 id,过程初始化的时候,会给集群中的每个节点调配一个 ID
Context: 上下文
raftpb.Message
Message: 这是 raftexample(前面简称 raft 来代替)中的一个重要的构造体,是所有音讯的形象,包含且不限于选举 / 增加数据 / 配置变更,都是一种音讯类型
type Message struct {
Type MessageType
To uint64
From uint64
Term uint64
LogTerm uint64
Index uint64
Entries []Entry
Commit uint64
Snapshot Snapshot
Reject bool
RejectHint uint64
Context []byte}
Type: 音讯类型,raft 中就是依据不同的音讯类型来实现不同的逻辑解决的
- MsgHup MessageType = 0 // follower 节点认为 leader 挂了的时候,发动选举
- MsgBeat MessageType = 1 // leader 才会发送的本地音讯,raft 初始化时会定义一个心跳超时的次数,当逻辑时钟推动的次数超过定义的心跳超时次数后,就会触发这个音讯,而后 leader 会向 follower 发送 MsgHeartbeat 音讯
- MsgProp MessageType = 2 // 写入数据或批改集群配置的音讯类型
- MsgApp MessageType = 3 // leader 向 follower 播送追加日志的音讯
- MsgAppResp MessageType = 4 // follower 响应 leader 的追加日志申请的音讯类型
- MsgVote MessageType = 5 // candinator 发送选举命令的音讯类型
- MsgVoteResp MessageType = 6 // 其余节点响应 candinator 选举命令的音讯类型
- MsgSnap MessageType = 7 // leader 向 follower 发送快照数据的音讯类型
- MsgHeartbeat MessageType = 8 // leader 向 follower 发送的心跳音讯
- MsgHeartbeatResp MessageType = 9 // follower 响应的心跳音讯
- MsgUnreachable MessageType = 10 // 音讯不可达,本地音讯
- MsgSnapStatus MessageType = 11 // 报告发送给 follower 节点的 Snap 音讯状态,是否发送胜利
- MsgCheckQuorum MessageType = 12 // 这个也是本地音讯,用于 leader 判断以后存活节点的状态,如果不超过一半节点存活则降为 follower 节点
- MsgTransferLeader MessageType = 13 // 转移 leader 权
- MsgTimeoutNow MessageType = 14 // 当发送 MsgTransferLeader 的 follower 的日志与 leader 始终的时候,leader 发送 MsgTimeoutNow 给这个 follower,follower 开始进行选举
- MsgReadIndex MessageType = 15 // follower 节点向 leader 节点申请获取 commit index 的地位
- MsgReadIndexResp MessageType = 16 // leader 节点响应 MsgReadIndex
- MsgPreVote MessageType = 17 // preVote 音讯是 Vote 音讯前可选的音讯,如果开启了 preVote,则 follower 在转成 candidator 前进行一次 preVote 与投票,这时候 Term 任期不会减少,防止因为网络分区造成有余 1 / 2 的分区节点的频繁选主
- MsgPreVoteResp MessageType = 18 // 其余节点对 MsgPreVote 的响应
To: 音讯是发送给的节点的 ID
From: 音讯发送方节点的 ID
Term: 以后的任期
LogTerm:当 leader 发送给 follower 节点日志的时候,follower 节点回绝的时候,会匹配到一个适合的日志地位尝试让 leader 开始同步,这个日志点的日志对应的 Term 就是 LogTerm
Index: 音讯在日志中的 Index
Entries: 日志记录
Commit: 音讯发送节点的日志 commit 的地位
Snapshot:在传输快照时的快照信息
Reject:节点是否回绝收到的音讯;例如,follower 收到 leader 的 MsgApp 音讯的时候,发现日志不能间接追加,就会回绝掉 leader 的这个日志同步音讯
RejectHint:follower 回绝掉 leader 节点音讯后,计算出来的一个可能匹配 leader 日志的索引地位
Context:上下文
raftpb.Entry
Entry: 就是常说的日志
type Entry struct {
Term uint64
Index uint64
Type EntryType
Data []byte}
Term:这条日志对应的任期,每个日志都是由 leader 同步过去的,而 leader 都有一个任期,这个就是同步日志时 leader 的任期
Index:索引,也是每条日志的一个标识
Type:日志的类型,EntryNormal 示意惯例日志;EntryConfChange/EntryConfChangeV2 示意配置变更的日志
Data:就是日志存储的数据
简略看两个 demo
leader 和 follower 日志不同步的状况
idx 1 2 3 4 5 6 7 8 9
-----------------
term (Leader) 1 3 3 3 5 5 5 5 5
term (Follower) 1 1 1 1 2 2
leader 和 follower 日志同步的状况
idx 1 2 3 4 5 6 7 8 9
-----------------
term (Leader) 1 3 3 3 5 5 5 5 5
term (Follower) 1 3 3 3 5 5 5 5 5
leader 和 follower 节点日志的同步就是通过 Entries 外面每个 Entry 的 Index 和 Term 是否雷同来判断的
留神
- 前面所有的数据增删改查、配置增删、选举等音讯统称为消息日志或日志
- 在日志同步这里,咱们会疏忽 wal 日志和 snapshot 相干的内容
源码解析
构造体介绍
raftNode
type raftNode struct {
// 接管 kvstore 的存储数据日志
proposeC <-chan string // proposed messages (k,v)
// 接管 kvHttpApi 的配置变更日志
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
// 日志同步到 kvstore
commitC chan<- *commit // entries committed to log (k,v)
// 模块间出错的音讯告诉通道
errorC chan<- error // errors from raft session
// 节点的 id
id int // client ID for raft session
// 以后集群下的所有节点 ip:ports
peers []string // raft peer URLs
// 以后节点是否接入一个集群,启动时候依据这个判断是重启还是启动一个新的节点
join bool // node is joining an existing cluster
// wal 日志目录
waldir string // path to WAL directory
// snapshot 目录
snapdir string // path to snapshot directory
// 获取 snapshot 的办法
getSnapshot func() ([]byte, error)
// 用于集群的状态
confState raftpb.ConfState
// snapshot 的日志的 Index
snapshotIndex uint64
// 曾经 applied 的日志的 Index
appliedIndex uint64
// raft backing for the commit/error channel
// node 的实例,实现了 Node 接口的办法
node raft.Node
// storage 实例
raftStorage *raft.MemoryStorage
// wal 实例
wal *wal.WAL
// snapshot 实例,治理 snapshot
snapshotter *snap.Snapshotter
// 与 snapshot 交互的通道,判断 snapshot 实例是否创立实现
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
// 两次 snapshot 之间的 apply 的日志最小条数
// 当上一次 snapshot 之后,apply 的日志超过这个数量,就会开启新一轮 snapshot,用于及时开释 wal 和 raftLog 中的存储压力
snapCount uint64
// 网络组件
transport *rafthttp.Transport
// 通道告诉敞开 serveChannel,敞开网络组件
stopc chan struct{} // signals proposal channel closed
// 敞开 raft node 的 http 服务
httpstopc chan struct{} // signals http server to shutdown
// raft node 的 http 敞开胜利后告诉其余模块的通道
httpdonec chan struct{} // signals http server shutdown complete
// 日志组件
logger *zap.Logger
}
raft
type raft struct {
// 节点 id
id uint64
// 以后节点的 Term 任期
Term uint64
// 以后节点在选举时,投票给了谁,初始化时为 0,也就是谁都没有投给
Vote uint64
// 与 readIndex 申请无关,这里不多做介绍
readStates []ReadState
// the log
raftLog *raftLog
// 单条音讯最大的 size
maxMsgSize uint64
// 最大的 uncommit 日志数量,当 uncommit 日志数量大于这个值的时候,就不再追加日志了
maxUncommittedSize uint64
// TODO(tbg): rename to trk.
// 集中群各个节点状态,蕴含了节点日志复制状况等,上面会独自介绍一下
prs tracker.ProgressTracker
// 状态,follower、candidator、leader 等
state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
// 记录了以后节点待发送的音讯,这里的音讯会被及时生产
msgs []pb.Message
// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
// leader 转换对象的 id
leadTransferee uint64
// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64
// an estimate of the size of the uncommitted tail of the Raft log. Used to
// prevent unbounded log growth. Only maintained by the leader. Reset on
// term changes.
// 未 commit 的日志的数量
uncommittedSize uint64
readOnly *readOnly
// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
// valid message from current leader when it is a follower.
electionElapsed int
// number of ticks since it reached last heartbeatTimeout.
// only leader keeps heartbeatElapsed.
heartbeatElapsed int
// 是否开启节点探活
checkQuorum bool
// 是否开启 preVote
preVote bool
heartbeatTimeout int
electionTimeout int
// randomizedElectionTimeout is a random number between
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
randomizedElectionTimeout int
// 是否不容许数据转发给 leader,开启的话,follower 节点收到的数据日志,就间接抛弃掉,而不会转发给 leader
disableProposalForwarding bool
// 逻辑时钟办法,leader 对应 tickHeartbeat,follower 和 candidate 对应 tickElection
tick func()
// 解决音讯的办法,leader 对应 stepLeader,follower 对应 stepFollower,candidate 对应 stepCandidate
step stepFunc
logger Logger
// pendingReadIndexMessages is used to store messages of type MsgReadIndex
// that can't be answered as new leader didn't committed any log in
// current term. Those will be handled as fast as first log is committed in
// current term.
pendingReadIndexMessages []pb.Message}
这里有两个点解释下:
checkQuorum:这个开关是用于 leader 判断各个节点的沉闷状态的,leader 给 follower 发送信息的时候都会设置一个沉闷态,然而如果呈现网络分区的状况下,leader 所在的分区小于 1 / 2 节点,那么这个 leader 也就没用了,能够被动将本人降级为 follower 节点
preVote:preVote 也是为了网络分区的状况设置的,当网络分区后,某个分区外面的节点数小于 1 / 2 节点数,则 follower 会频繁的发动选举,发动选举时就会将本人的 Term +1,然而并不会选举胜利,所以会陷入一个循环:发动选举 ->term+1-> 选举失败 -> 发动选举 ->term+1……,当网络分区接触时,这个 Term 可能曾经很大了,合并后真正的 leader 的 Term 可能都没有分区外面的 follower 的 Term 大,就会影响日志的准确性了,所以网络分区中的节点会先发动 preVote,这时候 Term 不变,preVote 胜利后才会真正的发动选举流程
raftLog
raftLog 是节点中治理日志的组件
type raftLog struct {
// storage contains all stable entries since the last snapshot.
// storage 组件,外面存储了 snapshot 之后的所有的 stable 日志记录
storage Storage
// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
// 记录了所有 unstable 的日志记录和 snapshot
unstable unstable
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
// commit 日志的最大的 Index 的值,这个是统计的 storage 组件外面的
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
// 曾经 applied 日志的最大的 Index 的值
applied uint64
logger Logger
// maxNextEntsSize is the maximum number aggregate byte size of the messages
// returned from calls to nextEnts.
maxNextEntsSize uint64
}
// Invariant: applied <= committed
在官网的 applied 字段的正文上,有这么一段话:applied <= committed 恒成立,这是为什么
咱们先简略理解一下日志的生命周期,前面跟踪日志同步的时候再具体介绍
用户提交一个创立数据的申请 -> kvstore 生成一条数据日志 -> 日志存储到 unstable 构造中 -> 日志追加到 storage 外面 -> 日志同步给其余节点 -> 节点同步实现,commit 日志,更新 committed 地位 -> kvstore 存储 -> 更新 applied 地位
启动流程
func main() {cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
// 创立 proposeC 和 confChangeC 通道,这两个通道用于 kvstore raftNode http 三个模块的交互,所以在里面创立
proposeC := make(chan string)
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft provides a commit stream for the proposals from the http api
var kvs *kvstore
getSnapshot := func() ([]byte, error) {return kvs.getSnapshot() }
// 启动 raftNode 模块
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
// 启动 kvstore 模块
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
// the key-value http handler will propose updates to raft
// 启动 httpKVApi 模块,用于接管申请
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
raftexample 的启动流程是比较简单的,将所有模块启动,并创立了各个模块之间的交互通道,便于各个模块间通信,并且实现了个股模块之间的解耦
- raftNode: raftexample 的外围模块,提供了 raft 协定,选主、日志同步等能力
- kvstore: raftexample 的存储模块,最终提交的日志,都会存储在这外面,在 raftexample 里,这个存储系统实质是个 map
- httpKvApi: 提供的与外界交互的 api,能够通过 httpapi 来实现存储数据的创立批改和删除以及集群节点的减少和删除
咱们这里依照 httpKvApi->kvstore->raftNode 由简入深的节奏解析
httpKVAPI 模块
httpKvApi 是提供了一个 http 服务,通过不同的 method 来操作不同的对象(数据和集群节点),实现了数据和节点的增删改查
启动
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
// 定义 http server
srv := http.Server{Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{
store: kv,
confChangeC: confChangeC,
},
}
go func() {
// 启动 http server
if err := srv.ListenAndServe(); err != nil {log.Fatal(err)
}
}()
// exit when raft goes down
// 与上层的 raft 通信,如果 raft 挂了,则本模块也要退出
if err, ok := <-errorC; ok {log.Fatal(err)
}
}
启动的逻辑比较简单
异步启动了一个 http 服务,并且阻塞监听 error chan,第一防止过程退出,第二档 raft 模块退出后,http 也能够及时退出
申请解决
申请解决对立集中在了 ServeHTTP 这一个办法外面
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
switch r.Method {
// Put 形式,则表明是数据减少或批改
case http.MethodPut:
v, err := io.ReadAll(r.Body)
if err != nil {log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
// 这里间接返回,而底层则会进行数据一致性的解决等操作,所以如果立刻进行 GET 申请的话,还是有可能获取老的数据
w.WriteHeader(http.StatusNoContent)
case http.MethodGet:
// Get 形式,间接到 kvstore 外面获取数据的 value
if v, ok := h.store.Lookup(key); ok {w.Write([]byte(v))
} else {http.Error(w, "Failed to GET", http.StatusNotFound)
}
case http.MethodPost:
// Post 形式是减少集群节点,解析进去 nodeId,并通过 confChangeC 传给 raftNode
url, err := io.ReadAll(r.Body)
if err != nil {log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
// Delete 形式则是删除一个集群节点,并通过 confChangeC 传给 raftNode 来解决
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
default:
w.Header().Set("Allow", http.MethodPut)
w.Header().Add("Allow", http.MethodGet)
w.Header().Add("Allow", http.MethodPost)
w.Header().Add("Allow", http.MethodDelete)
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
下面就是 httpKvApi 的所有解决的状况了
- Put 申请:增加或批改 kvstore 外面存储的数据,但这里并不是间接批改,而是调用
h.store.Propose
(即kvstore.Propose
)来解决,kvstore.Propose
则是通过 proposeC 将数据给到 raftNode 来保证数据一致性后再提交保留到 kvstore 外面,这里前面会具体解析,也是 raft 的外围 - Get 申请:这个办法就比较简单了,间接到 kvstore 外面读取数据,而 Put 办法则是增加和批改数据,然而 Put 不间接批改,所以如果 Put 后立即获取某个 key 的 value,则有可能 raftNode 还没有解决完提交到 kvstore 外面,而导致读取的还是老的数据
- Post 申请:增加一个集群节点,这个节点会被 leader 通晓并将此节点退出 follower
- Delete 申请:删除一个节点,也是交给 leader 来解决
kvstore 模块
创立 kvstore
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore {
// 实例化 kvstore,example 外面存储系统就是一个 map
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
// 加载 snapshot
snapshot, err := s.loadSnapshot()
if err != nil {log.Panic(err)
}
// 如果 snapshot 不为空,则先从 snapshot 外面把数据恢复
if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)
}
}
// read commits from raft into kvStore map until error
// 开启一个 goroutine,读取从 raftNode 外面提交的 commit 数据
go s.readCommits(commitC, errorC)
return s
}
// 加载 snapshot
func (s *kvstore) loadSnapshot() (*raftpb.Snapshot, error) {snapshot, err := s.snapshotter.Load()
// ErrNoSnapshot 示意没有 snapshot,这里消化掉 err,防止下层模块退出
if err == snap.ErrNoSnapshot {return nil, nil}
if err != nil {return nil, err}
// 找到了,则返回
return snapshot, nil
}
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {return s.loadMatching(func(*raftpb.Snapshot) bool {return true})
}
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
// 这里返回 snapshot 的文件列表,依照工夫排序,也就是从新到旧排序
names, err := s.snapNames()
if err != nil {return nil, err}
var snap *raftpb.Snapshot
// 遍历 snapshot 文件,其实也就是读取最新的文件
for _, name := range names {
// loadSnap 就不追了,就是读取 snapshot 文件的数据,并反序列化为构造体对象
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {return snap, nil}
}
// 没找到获取没有适合的 snapshot,就返回 ErrNoSnapshot
return nil, ErrNoSnapshot
}
// 从 snapshot 外面复原数据到 kvstore,这里的入参是下面 snapshot.Data,也就是[]byte
func (s *kvstore) recoverFromSnapshot(snapshot []byte) error {var store map[string]string
if err := json.Unmarshal(snapshot, &store); err != nil {return err}
// 反序列化为 map 后,间接赋值给 kvstore
s.mu.Lock()
defer s.mu.Unlock()
s.kvStore = store
return nil
}
创立的逻辑:
- 首先实例化 kvstore 这个构造体,并实例化存储系统,在 example 外面也就是一个 map
- 而后本地寻找是否有 snapshot,并读取最新的 snapshot,反序列化为 snapshot 构造体
- 如果找到了 snapshot,则将 snapshot.Data 反序列化为 map,给到 kvstore,也就从 snapshot 外面复原了 kvstore 的存储数据
- 最初开启一个 goroutine,通过 commitC 与 raftNode 交互,读取数据并存储
数据存储
下面说了,kvstore 会开启一个通道 chan 与 raftNode 通信,读取数据并存储,这里看下具体的逻辑
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
for commit := range commitC {
// 如果读取到 nil,则从 snapshot 外面再次复原数据
if commit == nil {
// signaled to load snapshot
snapshot, err := s.loadSnapshot()
if err != nil {log.Panic(err)
}
if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)
}
}
continue
}
// 读取数据
for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()}
// 敞开 commit 的 chan,以告诉下层模块存储实现
close(commit.applyDoneC)
}
// 如果遇到 error chan,跟 httpkvapi 模块一样,退出以后模块
if err, ok := <-errorC; ok {log.Fatal(err)
}
}
commitC 这个通道和解决逻辑就比较简单了
这个 goroutine 就卡在这个 chan,有数据过去就进行解决,并告诉下层模块解决实现
如果 raftNode 遇到异样退出了,则通过 error chan 告诉,kvstore 模块也就退出了
数据查找
func (s *kvstore) Lookup(key string) (string, bool) {s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.kvStore[key]
return v, ok
}
查找就比较简单了,数据结构就是一个 map,间接从 map 外面读数据就行了
数据提交
后面说过数据存储,那么数据提交和数据存储是什么关系,为什么会有两个办法来解决数据呢?
其实 raft 协定外面咱们能够晓得,当用户提交一个数据创立 / 批改的申请的时候,首先把数据提交过去,而后 raftNode 会把数据告诉到上面的各个节点,当数据有一半节点以上接管到的时候,才会真正存储到存储系统外面
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {log.Fatal(err)
}
s.proposeC <- buf.String()}
所以,这里的数据提交,其实就是通过 proposeC 给到 raftNode,raftNode 解决完后,再通过 commitC 给到 kvstore 存储起来
总结 & 备注
- 这里的 map+lock 形式,效率会低很多,不如间接用 sync.Map;当然这里只是个 example,也就不须要思考那么多
- 数据提交存储流程:当用户通过 httpkvapi 创立一个数据批改 / 创立的申请的时候,kvstore 会先把这个数据通过 proposeC 转发给 raftNode,而后 raftNode 解决实现后才会真正存储到数据库(也就是本身的 map)
raftNode 模块
启动 raftNode
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {commitC := make(chan *commit)
errorC := make(chan error)
rc := &raftNode{
proposeC: proposeC,
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
peers: peers,
join: join,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
logger: zap.NewExample(),
snapshotterReady: make(chan *snap.Snapshotter, 1),
// rest of structure populated after WAL replay
}
// 异步启动 raft
go rc.startRaft()
// 返回 commitC,errorC rc.snapshotterReady,用于跟其余模块通信
return commitC, errorC, rc.snapshotterReady
}
func (rc *raftNode) startRaft() {if !fileutil.Exist(rc.snapdir) {if err := os.Mkdir(rc.snapdir, 0750); err != nil {log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
// signal replay has finishei
// 告诉其余模块 snapshotter 初始化实现
rc.snapshotterReady <- rc.snapshotter
rpeers := make([]raft.Peer, len(rc.peers))
for i := range rpeers {rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
// 初始化 raft 的相干配置,用于启动 raft
c := &raft.Config{ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}
// 依据 oldwal 和 join,判断是新启动的节点还是重启的节点
// 如果是重启,则能够从 snapshot 外面复原数据
if oldwal || rc.join {rc.node = raft.RestartNode(c)
} else {rc.node = raft.StartNode(c, rpeers)
}
// 初始化网络组件
rc.transport = &rafthttp.Transport{
Logger: rc.logger,
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("",""),
LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
rc.transport.Start()
// 启动与各个节点的网络 pipeline 通信通道
for i := range rc.peers {
if i+1 != rc.id {rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
go rc.serveRaft()
go rc.serveChannels()}
raftNode 启动流程:
- 启动 raft,返回 commitC 与 kvstore 通信,errorC 与其余模块通信,以告诉异样及时退出,snapshotterReady chan 与 kvstore 通信以告诉 snapshot 初始化实现
- 初始化 snapshotter,回放 wal 日志
- 初始化 raft.Config,依据 wal 目录是否存在与 join 配置来判断是新节点还是旧的节点,旧的节点则从 snapshot 外面复原数据即可
- 初始化网络组件,并与各个节点进行通信,初始化各个节点的 pipeline 通道
- 启动 raft server 服务
- 启动 raftNode 的各个通道,与各个模块间进行通信
启动 node
func StartNode(c *Config, peers []Peer) Node {if len(peers) == 0 {panic("no peers given; use RestartNode instead")
}
// 初始化 node 节点,并初始化配置
rn, err := NewRawNode(c)
if err != nil {panic(err)
}
// 将各个节点的信息存储到 raftLog 外面,期待日志同步,而后周知各个节点配置变更的音讯
err = rn.Bootstrap(peers)
if err != nil {c.Logger.Warningf("error occurred during starting a new node: %v", err)
}
// 实例化 node 节点
n := newNode(rn)
// node 就开始在后盾异步运行了,直至退出
go n.run()
return &n
}
func NewRawNode(config *Config) (*RawNode, error) {
// 依据配置,实例化 raft
r := newRaft(config)
rn := &RawNode{raft: r,}
// 初始化 Leader, State, Term Vote CommitIndex 等 raft 本身属性
rn.prevSoftSt = r.softState()
rn.prevHardSt = r.hardState()
return rn, nil
}
func newRaft(c *Config) *raft {if err := c.validate(); err != nil {panic(err.Error())
}
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
hs, cs, err := c.Storage.InitialState()
if err != nil {panic(err) // TODO(bdarnell)
}
r := &raft{
id: c.ID,
lead: None,
isLearner: false,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
checkQuorum: c.CheckQuorum,
preVote: c.PreVote,
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
// 初始化了各个节点在以后节点的状态及存储信息等,包含投票信息,日志 commit 节点,是否沉闷等
cfg, prs, err := confchange.Restore(confchange.Changer{
Tracker: r.prs,
LastIndex: raftlog.lastIndex(),}, cs)
if err != nil {panic(err)
}
assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
if !IsEmptyHardState(hs) {r.loadState(hs)
}
// 更新 raftLog 的 apply index
if c.Applied > 0 {raftlog.appliedTo(c.Applied)
}
// 启动后,就会变更集群中的 follower 节点
r.becomeFollower(r.Term, None)
var nodesStrs []string
for _, n := range r.prs.VoterNodes() {nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
return r
}
// newNode 就初始化了各个 chan,与各个模块进行通信
func newNode(rn *RawNode) node {
return node{propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
// make tickc a buffered chan, so raft node can buffer some ticks when the node
// is busy processing raft messages. Raft node will resume process buffered
// ticks when it becomes idle.
tickc: make(chan struct{}, 128),
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
rn: rn,
}
}
Node 节点的启动流程:
- 实例化 node 节点,并初始化 softState 和 hardState,也就是 Leader, State, Term Vote CommitIndex 等 raft 本身属性
- 实例化 raft,初始化了 raftLog,而后捞取 storage 的 hardState 和配置信息
- 初始化了各个节点在以后节点的状态及存储信息等,包含投票信息,日志 commit 节点,是否沉闷等
- 依据 storage 捞取的 hardState 更新节点的 hardState
- 降级为 follower
启动 serverRaft
func (rc *raftNode) serveRaft() {url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {log.Fatalf("raftexample: Failed parsing URL (%v)", err)
}
ln, err := newStoppableListener(url.Host, rc.httpstopc)
if err != nil {log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
}
err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
select {
case <-rc.httpstopc:
default:
log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
}
close(rc.httpdonec)
}
serverRaft 就是启动了一个 http 服务,用于各个节点间通信;这里跟 httpKvApi 不同,httpKvApi 只有是用于与集群通信,而 raft server 则是各个节点间选主,日志同步等到通信
启动流程介绍完了后,各个对象也都筹备好了,状态也都就绪了,前面就是开始正式的干活了 -leader 选举和日志同步
leader 选举
raftNode 选主是由逻辑时钟推动的,逻辑时钟的逻辑,后面曾经介绍了,这里跟着代码看看具体的实现
leader 会定时给 follower 发送心跳,同时 follower 会有个定时器查看心跳距离,如果心跳距离超过设定的工夫,就会触发选主了
定时器触发
func (rc *raftNode) serveChannels() {ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
......
for {
select {
case <-ticker.C:
rc.node.Tick()
......
}
}
}
func (n *node) Tick() {
select {case n.tickc <- struct{}{}:
case <-n.done:
default:
n.rn.raft.logger.Warningf("%x A tick missed to fire. Node blocks too long!", n.rn.raft.id)
}
}
定时器接管解决
定时器触发是在 serveChannels
外面实现的,而后会通过 tickc 给到node.run
办法解决
func (n *node) run() {
......
switch {
......
case <-n.tickc:
n.rn.Tick()
......
}
}
func (rn *RawNode) Tick() {rn.raft.tick()
}
func (r *raft) tickElection() {
r.electionElapsed++
// 判断心跳是否超时,每次 tick electionElapsed++,如果 electionElapsed >= raft 初始化时的 randomizedElectionTimeout
// 则认为心跳超时了,这时候就能够进行选主了
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {r.logger.Debugf("error occurred during election: %v", err)
}
}
}
// 心跳超时判断
func (r *raft) pastElectionTimeout() bool {return r.electionElapsed >= r.randomizedElectionTimeout}
follower 节点每次逻辑时钟推动,就会给 electionElapsed+1,而在 raft 初始化的时候,会设置一个 randomizedElectionTimeout,当逻辑时钟推动的次数超过 randomizedElectionTimeout 的时候,就会触发 leader 选举流程了
那么什么时候 electionElapsed 会被重置呢
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
......
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
......
}
当节点启动的时候,就会切换成 follower 的状态,当 follower 收到 leader 的 MsgHeartbeat 时,就会重置 electionElapsed
这里当判断 electionElapsed > randomizedElectionTimeout,也就是 leader 心跳超时的时候,就开始发动了选主的投票流程了
发动投票
leader 选举这里咱们先不思考投票音讯的日志解决
func (r *raft) Step(m pb.Message) error {
......
switch m.Type {
case pb.MsgHup:
// preVote 是个开关,用于选举前置判断,例如以后节点被网络分区等状况造成的异样,只有 preVote 通过后才会发动投票
// preVote 和 vote 逻辑差不多,就不多剖析了
if r.preVote {r.hup(campaignPreElection)
} else {r.hup(campaignElection)
}
......
}
}
func (r *raft) hup(t CampaignType) {
// 如果以后节点是 leader,则疏忽
if r.state == StateLeader {r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}
// 判断以后节点是否能够晋升为 leader
if !r.promotable() {r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
// 获取未提交的 raftLog
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}
// 开始竞选
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}
func (r *raft) campaign(t CampaignType) {if !r.promotable() {// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
}
var term uint64
var voteMsg pb.MessageType
// 更新 raft 的状态
if t == campaignPreElection {r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
// 给本身投票,并记录投票后果,判断是否能升级
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
// 这里则是 preVote 胜利了,则是发动真正的投票环节
if t == campaignPreElection {r.campaign(campaignElection)
} else {
// 投票胜利了,则晋升为 leader
r.becomeLeader()}
return
}
// 投票后果未满足降职后果,则开始周知其余节点进行投票
var ids []uint64
{idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool {return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {continue}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {ctx = []byte(t)
}
// 将投票的音讯发送给其余节点,这里只是把音讯存储起来,期待其余 goroutine 生产发送
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
这里咱们先注意一下,当节点开启了preVote
后,竞选类型就是campaignPreElection
,而后晋升为PreCandidate
;否则竞选类型是campaignElection
,继而晋升为 Candidate
,咱们看下这两个角色别离干了什么
func (r *raft) becomeCandidate() {// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
func (r *raft) becomePreCandidate() {// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {panic("invalid transition [leader -> pre-candidate]")
}
// Becoming a pre-candidate changes our step functions and state,
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r.step = stepCandidate
r.prs.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}
能够看到,PreCandidate
这里的 Term 没有变动,而 Candidate
的 Term + 1,跟咱们匹配上咱们后面说的 preVote
的作用了 - 防止网络分区造成的频繁选主
poll
的作用是记录某个 id 给本人的投票后果,并判断整体投票后果是否实现
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
if v {r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
}
r.prs.RecordVote(id, v)
granted, rejected, result = r.prs.TallyVotes()
r.logger.Infof("%d check poll result, votes = %d, rejected = %d, voteResult = %d, record = %v, voters = %v",
r.id, granted, rejected, result, r.prs.Votes, r.prs.Voters)
return
}
func (r *raft) send(m pb.Message) {
if m.From == None {m.From = r.id}
......
// 把音讯追加到 slice 外面,期待其余 goroutine 生产
r.msgs = append(r.msgs, m)
}
选主流程的逻辑如下
- 首先判断本身是否是 leader 或本身是否能够降职
- 是否开启了 preVote 环节,开启的话,则先走一遍 preVote 流程,跟 vote 流程差不多
- 而后批改本身节点的状态和信息,降职成为 Candidate
- 给本身投个票,并计算投票后果是否胜利,如果胜利则间接晋升为 leader,开启 leader 相干的流程
- 如果投票还未完结,则开始给其余节点发送投票信息
- 这里的投票信息只是寄存在 raft.msgs 这个 slice 外面了,期待其余 goroutine 生产
那么到此,这里的发动投票环节完结,然而投票音讯还没有收回去,也没有接管投票后果,这部分逻辑就还要回归到 raftNode.serveChannels
和 node.run
来看了
投票音讯发送
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {readyc = nil} else if n.rn.HasReady() {
// 判断是否有音讯须要解决
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
// 获取 msg 和 raft 状态,组装成 ready 构造
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
.......
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
// 接管到 follower 的投票后果
// 这里是网络组件相干的解决,当节点收到其余的网络申请的时候,会通过 recvc 通道传递过去解决
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
// 从新进入 Step 函数解决,留神这里的 m.Type == MsgVoteResp
r.Step(m)
}
......
// 在这里,将下面组装好的 ready 构造体,通过 readyc 传给 raftNode.serveChannels 解决
case readyc <- rd:
// 更新 raft 状态和删除 msgs
n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
}
投票后果解决
func (r *raft) Step(m pb.Message) error {
......
switch m.Type {
.....
default:
err := r.step(r, m)
if err != nil {return err}
}
return nil
}
这里从新进入 Step 函数来解决音讯,然而这时候,收到的音讯类型是 MsgVoteResp
,间接执行 default 步骤了
同时,此时节点的状态曾经发动投票了,所以节点晋升为 Candidate
, r.step 对应的也就是 r.stepCandidate 函数了
func stepCandidate(r *raft, m pb.Message) error {
......
case myVoteRespType:
// 记录 follower 节点的投票后果,并计算投票的后果(输或赢)gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
// 博得选举,则晋升为 leader 节点
case quorum.VoteWon:
if r.state == StatePreCandidate {r.campaign(campaignElection)
} else {r.becomeLeader()
r.bcastAppend()}
// 输掉抉择,则降级为 follower
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
// 其余状况下,就是票数还有余与判断选举后果,则持续期待其余节点的投票
}
......
return nil
}
stepCandidate 则将 follower 传递过去的投票后果记录,并从新统计选主的后果,如果获得成功取得超过一半的票数,则晋升为 leader,并告诉到各个节点;如果输掉选主,则降级为 follower;其余状况,就是选主还在持续进行中,还有其余节点没有投票,则持续期待其余节点投票
总结
- raft 外面会有一个定时器,定时触发,来推动选主逻辑的运行
- 每次定时器触发的时候,会对属性 electionElapsed++,当 electionElapsed 的值超过初始化时创立的 randomizedElectionTimeout,则认为 leader 发送心跳超时,开始触发选主逻辑
- follower 节点判断 preVote 的属性是否设置为 true,如果为 true 的话,则执行 preVote 逻辑,这里是为了防止相似于网络分区造成的节点数量有余集群的 1 / 2 造成的频繁选主
- follower 节点将本身设置为 Candidate,同时开始给本身投票
- 判断投票后果是否足以博得选举,如果能够的话,则晋升为 leader,并公布告诉
- 投票后果不足以博得选举的时候,将投票工作分发给其余各个节点
- 其余各个节点的投票后果通过 node.recvc 这个通道传给 raftNode,并在
run
函数外面解决 - 每次收到其余节点的投票后果后,从新执行一下投票的判断,晓得博得选举或输掉选举
日志同步
数据处理申请
数据处理是通过 httpKVAPI 来解决的,用户的 Put method 申请时,就是创立或批改数据
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
switch r.Method {
case http.MethodPut:
v, err := io.ReadAll(r.Body)
if err != nil {log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
......
}
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {log.Fatal(err)
}
s.proposeC <- buf.String()}
用于创立或批改数据的申请,并不是间接存储到 kvstore 的,而是通过 proposeC 转给 raftNode 来解决的
raftNode 接收数据
func (rc *raftNode) serveChannels() {
......
// send proposals over raft
go func() {confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
// 接管 kvstore 传递过去的数据
case prop, ok := <-rc.proposeC:
if !ok {rc.proposeC = nil} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
......
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
func (n *node) Propose(ctx context.Context, data []byte) error {return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
func (n *node) stepWait(ctx context.Context, m pb.Message) error {return n.stepWithWaitOption(ctx, m, true)
}
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
......
ch := n.propc
pm := msgWithResult{m: m}
if wait {pm.result = make(chan error, 1)
}
select {
// 将数据封装好后,传给 node.propc 这个通道,交给 node.run 办法解决
case ch <- pm:
// 这里是 true,也就是不 return
if !wait {return nil}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
// block 直到有后果
select {
case err := <-pm.result:
if err != nil {return err}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
return nil
}
raftNode.serveChannels
开启了一个 goroutine 循环检测是无数有数据音讯,有音讯后就开始封装并通过 chan 传递给node.run
来解决了,并 block 住,晓得node.run
返回处理结果
接下来看下 node.run 的解决
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
......
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
// 这里就跟下面的选主一样,选主也是一个音讯,日志同步也是一个音讯,所以都走到了这里
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
// 解决实现后,将后果通过通道传递给下面,开释后面的 block 逻辑
if pm.result != nil {
pm.result <- err
close(pm.result)
}
......
}
}
}
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
......
default:
// 调用本身注册的 step 办法来解决
err := r.step(r, m)
if err != nil {return err}
}
return nil
}
这里会调用 r.step() 来解决,而 step 这个办法对应不同的角色有不同的实现
follower 对应 stepFollower
leader 对应 stepLeader
咱们首先看下 stepFollower
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
} else if r.disableProposalForwarding {r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return ErrProposalDropped
}
m.To = r.lead
// 将音讯转发给 leader
r.send(m)
......
return nil
}
依据 stepFollower
的逻辑能够看到,当 follower 拿到创立批改数据的音讯的时候,间接将这个申请转发给 leader 来解决
接下来看下stepLeader
,对应的 leader 拿到建批改数据的音讯时的解决形式
func stepLeader(r *raft, m pb.Message) error {
......
case pb.MsgProp:
if len(m.Entries) == 0 {r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return ErrProposalDropped
}
......
if !r.appendEntry(m.Entries...) {return ErrProposalDropped}
r.bcastAppend()
return nil
......
return nil
}
stepLeader
这里的外围逻辑就两步
- 尝试追加追加操作到 raftLog 里,如果追加失败则返回
- 告诉其余节点追加数据
追加 raftLog
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {li := r.raftLog.lastIndex()
// 设置每个数据的 Term 和 Index,以示意程序
for i := range es {es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
// 判断以后 cucommit 的数据大小 + 以后数据大小 是否大于 定于的最大 uncommitSize,如果大于则失败
if !r.increaseUncommittedSize(es) {
r.logger.Debugf(
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
r.id,
)
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
// 往 raftLog 外面追加数据
li = r.raftLog.append(es...)
// 更新以后节点的 Match 和 Next,这里的 Match 示意以后节点的 raftLog 的 Index,如果给的数据的 Index < Match,则表明数据有误,会回绝追加
r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
// 尝试提交,只有当超过 1 /2+ 1 节点 ack 这个数据后,才会提交
r.maybeCommit()
return true
}
func (r *raft) maybeCommit() bool {
// 这里就返回所有节点 ack 的 commit log 的 Index
mci := r.prs.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
}
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
// 当 ack 的 Index 大于 committed 的时候,才会提交
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {l.commitTo(maxIndex)
return true
}
return false
}
当追加 raftLog 的时候,同时尝试提交这条数据到存储系统,这时候也会跟选主时一样,判断一下整个集群是否超过 1 /2+ 1 节点收到数据并批准
告诉其余节点追加数据
追加完 raftLog 之后,其余节点还都没有解决,这时候数据也不会 commit 到存储系统,所以要先告诉一下其余节点,让他们先存储上,而后 leader 能力 commit
func (r *raft) bcastAppend() {
// 循环发送给其余节点
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {return}
r.sendAppend(id)
})
}
func (r *raft) sendAppend(to uint64) {r.maybeSendAppend(to, true)
}
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {pr := r.prs.Progress[to]
if pr.IsPaused() {return false}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {return false}
......
// 定义数据音讯的 Type,Index,Term 和追加的数据
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in StateReplicate
case tracker.StateReplicate:
last := m.Entries[n-1].Index
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
case tracker.StateProbe:
pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
// 发送给塔器节点
r.send(m)
return true
}
发送给其余节点的时候,组装好数据音讯,就间接调用网络组件进行发送就行了
接下来就是接管 follower 节点返回的音讯了,leader 发送的音讯类型是 MsgApp,那么 follower 节点返回的音讯类型就是 MsgAppResp
那就持续回到node.run
办法看下收到 follower 节点的音讯的解决
解决 follower 的音讯解决响应
又见到老朋友
func (n *node) run() {
......
for {
......
select {
......
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {r.Step(m)
}
......
}
}
}
node.run
的 case m := <-n.recvc:
在选主的时候曾经见到过了,这个也是解决其余节点网络申请或详情的入口
func (r *raft) Step(m pb.Message) error {
......
switch m.Type {
......
default:
err := r.step(r, m)
if err != nil {return err}
}
return nil
}
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
......
// All other message types require a progress for m.From (pr).
pr := r.prs.Progress[m.From]
if pr == nil {r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {
// 这时候数据音讯的追加被 follower 回绝了,同时 follower 计算出来一个可能的 Index 点
r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
nextProbeIdx := m.RejectHint
if m.LogTerm > 0 {
// 依据 follower 返回的 Index 点和 Term,找到可能的 Index 去改正 follower 的数据
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
// reject 的 follower 节点,状态先批改为 StateProbe,表明这个 follower 的 lastIndex 须要先去探测,不明
// 批改这个 follower 节点的 Next
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == tracker.StateReplicate {pr.BecomeProbe()
}
// 从批改后的 Index 日志点,持续尝试往 follower 追加数据
r.sendAppend(m.From)
}
} else {oldPaused := pr.IsPaused()
// 尝试更新 follower 节点在 Index 和 Next 地位
if pr.MaybeUpdate(m.Index) {
switch {
// follower 节点追加胜利的话,就不必持续探测了,状态变成 StateReplicate,示意失常的 follower
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
// TODO(tbg): we should also enter this branch if a snapshot is
// received that is below pr.PendingSnapshot but which makes it
// possible to use the log again.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.BecomeProbe()
pr.BecomeReplicate()
case pr.State == tracker.StateReplicate:
pr.Inflights.FreeLE(m.Index)
}
// 尝试将数据进行提交
if r.maybeCommit() {
// committed index has progressed for the term, so it is safe
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
// 再一次播送,这时候播送的数据的 Commit 和 Index 及 Entries 都有变动,目标是为了让 follower 提交数据
r.bcastAppend()} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.maybeSendAppend(m.From, false) { }
// Transfer leadership is in progress.
// 节点迁徙相干
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
......
}
return nil
}
// 尝试更新 follower 的 Match 和 Next 的索引值
func (pr *Progress) MaybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.ProbeAcked()}
pr.Next = max(pr.Next, n+1)
return updated
}
func (r *raft) maybeCommit() bool {
// MaybeUpdate 中会保留每个响应音讯的节点的 Match,这里就依据 Match 计算出来 1 /2+ 1 以上节点共识的 Match 的 Index 点
mci := r.prs.Committed()
// 依据计算出来的 Index 点,尝试 commit
return r.raftLog.maybeCommit(mci, r.Term)
}
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
// 当 Index 大于 提交的 Index 点的时候 并且 对应的 Term == raftLog.Term,才会进行提交
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {l.commitTo(maxIndex)
return true
}
return false
}
func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {if l.lastIndex() < tocommit {l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
}
// 将 raftLog 的 commit 点批改
l.committed = tocommit
}
}
当 leader 收到 follower 的 MsgAppResp 的音讯的时候,就开始进行解决了
follower 返回的处理结果有两种状态
-
胜利
- 这时候尝试会更新返回音讯的节点在 leader 内存中存储的 Match 和 Next,也就是 leader 记录的 follower 节点的 raft log 的状态
- 判断这条日志是否超过 1 /2+ 1 以上的节点接管并解决了
- 如果超过了,则将这条日志 commit,而 commit 的逻辑,在这里仅仅只是更新了 raftLog 的 committed 的记录值(期待
node.run
来解决),同时向所有节点播送以此让 follower commit 数据 - 没有超过则期待下一次解决
-
回绝
- 如果 follower 节点回绝,则 follower 节点的响应信息会携带可能匹配的 Index 和 Term
- leader 节点依据 follower 节点的响应,找到可能 match 的 Index 地位,并跟 follower 协调
- 反复后面两个步骤晓得 macth 到了 leader 和 follower 的日志 Index 地位,开始给 follower 同步日志
- 最初再回到胜利状态的解决
follower 节点回绝追加
这里有一点须要阐明一下,当 follower 节点 MsgAppResp 的后果为 reject 的时候,阐明 leader 和 follower 日志不统一,所以这时候,leader 须要去探测 follower 与 leader 节点统一的 Index 点,并从探测并纠正后的点去开始追加,也就是 findConflictByTerm() 的逻辑
这里用源码中的图来展现一下
示例一:
idx 1 2 3 4 5 6 7 8 9
-----------------
term (Leader) 1 3 3 3 5 5 5 5 5
term (Follower) 1 1 1 1 2 2
假如这是初始状态的 Leader 节点日志和 Follower 节点日志
当 leader 节点须要将 Index = 9 的数据同步给 follower 的时候,follower 发现自己在 Index(7,8)都没有数据,也就是 Match 不上,就会 reject 这个申请,并找到比 Index = 9 点的 Term = 5 小的 Term 的数据,也就是 follower 的最初一条数据 Index = 6, Term = 2
这时候 leader 收到了 follower 节点的回绝音讯,和 Index,Term,如果 leader 从这个节点间接追加,必定也会失败,因为 Index = 6 的 leader 和 follower 的 Term 都不一样
所以 leader 会再做一个计算,leader.Term <= folllower.Term,也就找到 Index = 1 这个地位
综上 就是 findConflictByTerm() 这个函数的作用
示例二:
idx 1 2 3 4 5 6 7 8 9
-----------------
term (Leader) 1 3 3 3 3 3 3 3 7
term (Follower) 1 3 3 4 4 5 5 5 6
leader 须要追加 Index = 9 的数据到 follower
follower 回绝后,并返回 Term = 6 (leader.Term == 7 > 6)
leader 收到音讯后,对日志进行回溯,这时候发现 Index = 8 合乎(leader.Term == 3 < 6)而后 leader 从新开始追加
follower 又回绝了,因为 Index = 8 时,follower.Term = 5,而 leader.Term = 3,所以这时候 follower 须要找到一个小于等于 Term 3 的地位,也就会回溯了 Index = 3
这时候 leader 从这里持续追加日志就胜利了
commit 数据
看到下面可能会有点纳闷,commit 数据如同啥都没干,就是 commit+1,这里就要回到 node.run
来看了;回顾选主的流程,选主的音讯是放到了 raft.msgs 外面,而后node.run
外面的一个协程循环读取,而后开始解决的,那么 commit 数据的逻辑是不是一样的呢
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {readyc = nil} else if n.rn.HasReady() {rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
......
select {
......
// 将数据通过 readyc 发送给 serveChannels 来解决
case readyc <- rd:
n.rn.acceptReady(rd)
advancec = n.advancec
......
}
}
}
func (rn *RawNode) HasReady() bool {
r := rn.raft
......
// 这里判断,是否有音讯(对应抉择的音讯)// hasNextEnts 则判断有没有 commit 然而还没有 apply 的数据
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {return true}
if len(r.readStates) != 0 {return true}
return false
}
// 判断是否有 commit 但为 apply 的数据
func (l *raftLog) hasNextEnts() bool {
// 判断 commit 的 Index 是不是大于 apply 的 Index
off := max(l.applied+1, l.firstIndex())
return l.committed+1 > off
}
node.run
这里循环判断是否有须要 commit 的日志或追加的 msgs,如果有的话,则通过 readyc 发送给 serveChannels
来解决
接下来看下serveChannels
的解决
func (rc *raftNode) serveChannels() {
......
// event loop on raft state machine updates
for {
select {
......
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
// Must save the snapshot file and WAL snapshot entry before saving any other entries
// or hardstate to ensure that recovery after a snapshot restore is possible.
......
// 存储到 storage
rc.raftStorage.Append(rd.Entries)
// 这里没有 Messages 了,都是 Entries,所以这里能够疏忽
rc.transport.Send(rc.processMessages(rd.Messages))
// apply 到 kvstore
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {rc.stop()
return
}
rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
serveChannels
接管到 node.run
捞取到的 Entries 后,就开始将日志追加到 wal 和 storeage 外面,而后通过publishEntries
存储到 kvstore
接下来看下publishEntries
做了什么
func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) {if len(ents) == 0 {return nil, true}
data := make([]string, 0, len(ents))
// 组装 data
for i := range ents {switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
s := string(ents[i].Data)
data = append(data, s)
......
}
}
var applyDoneC chan struct{}
if len(data) > 0 {applyDoneC = make(chan struct{}, 1)
select {
// 将组装好的 data 通过 commitc 传给 kvstore 解决,当 kvstore 解决实现后,再通过 applyDoneC 通道告诉过去
case rc.commitC <- &commit{data, applyDoneC}:
case <-rc.stopc:
return nil, false
}
}
// after commit, update appliedIndex
rc.appliedIndex = ents[len(ents)-1].Index
return applyDoneC, true
}
publishEntries
将数据组装好后,通过 commitC
传给 kvstore,而后更新掉 raftNode.appliedIndex,同时返回一个 chan,以便 kvstore 告诉 raftNode 数据存储胜利
而 readCommits
就比较简单了,收到数据存储到本身即可,同时敞开下层 raftNode 传递过去的通道,已告诉下层 raftNode 存储实现
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
for commit := range commitC {
// commit == nil 的时候,kvstore 从 snapshot 文件外面复原数据
if commit == nil {
// signaled to load snapshot
snapshot, err := s.loadSnapshot()
if err != nil {log.Panic(err)
}
if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)
}
}
continue
}
// 将数据存储到 kvstore 外面
for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()}
// 敞开 applyDoneC,以告诉下层模块解决实现
close(commit.applyDoneC)
}
if err, ok := <-errorC; ok {log.Fatal(err)
}
}
总结
咱们外围目标是为了理解 raft 协定,所以这里仅仅解析了 leader 选举和日志同步的能力,还有 wal 日志、snapshot、网络组件,咱们都没有解析,有趣味的同学能够本人追踪看看
整个 raftExample 的实现比拟简答,追踪起来也比拟容易,外围的代码逻辑次要集中在了 node.run
、raftNode.serveChannels
和 stepXXX
外面,代码分层也比拟清晰