一、简介
etcd 是基于 raft 协定实现的分布式一致性 jian 值存储,本篇文章不介绍 etcd 的应用,本文解说在 etcd 源码中提供的 example,通过这个 example 来学习 etcd 是如何应用 raft 协定。
二、实现
这个 example 在 etcd 源码目录下的 contrib 目录中
tree -L 1
.
├── Makefile
├── NOTICE
├── OWNERS
├── Procfile
├── Procfile.v2
├── README.md
├── ROADMAP.md
├── auth
├── bill-of-materials.json
├── bill-of-materials.override.json
├── build
├── build.bat
├── build.ps1
├── client
├── clientv3
├── code-of-conduct.md
├── contrib # 明天的配角
├── docs
├── embed
├── etcd.conf.yml.sample
├── etcdctl
├── etcdmain
├── etcdserver
├── functional
├── functional.yaml
├── go.mod
├── go.sum
├── hack
├── integration
├── lease
├── logos
├── main.go
├── main_test.go
├── mvcc
├── pkg
├── proxy
├── raft
├── scripts
├── test
├── tests
├── tools
├── vendor
├── version
└── wal
tree -L 1 contrib/raftexample/
contrib/raftexample/
├── Procfile
├── README.md
├── doc.go
├── httpapi.go
├── kvstore.go
├── kvstore_test.go
├── listener.go
├── main.go
├── raft.go
└── raftexample_test.go
先看一下入口文件 main.go
func main() {cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
proposeC := make(chan string)
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft provides a commit stream for the proposals from the http api
var kvs *kvstore
getSnapshot := func() ([]byte, error) {return kvs.getSnapshot() }
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
// the key-value http handler will propose updates to raft
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
进行了一些初始化动作,看一下 newRaftNode 在 raft.go 文件中
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {commitC := make(chan *string)
errorC := make(chan error)
// 初始化 raftnode 这个 raft node 是 etcd 中利用层面的 raft node 在 raft 协定层面也是用一个 raft node 通过利用层面的构造体定义能够发现 在构造体中是存在一个 raft 协定层面的 node 的,这两个 node 是一对一的关系
rc := &raftNode{
proposeC: proposeC,
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
peers: peers,
join: join,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
snapshotterReady: make(chan *snap.Snapshotter, 1),
// rest of structure populated after WAL replay
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady
}
看一下 startRaft 做了什么,还是在以后文件下
func (rc *raftNode) startRaft() {if !fileutil.Exist(rc.snapdir) {if err := os.Mkdir(rc.snapdir, 0750); err != nil {log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
// 获取快照实例
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
rc.snapshotterReady <- rc.snapshotter
// 重放 wal 日志到内存,因为 etcd 保护了内存索引,查问时会通过内存索引获取到信息,这个信息是指 key 的值和版本号
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
rpeers := make([]raft.Peer, len(rc.peers))
for i := range rpeers {rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
// 和 raft 一致性协定的相干配置
c := &raft.Config{ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}
// 初始化 raft 协定层面的 node
if oldwal {rc.node = raft.RestartNode(c)
} else {
startPeers := rpeers
if rc.join {startPeers = nil}
rc.node = raft.StartNode(c, startPeers)
}
// 初始化 transport,transport 用来和 etcd 集群中其余节点间进行通信并传递信息的桥梁,raft 协定只是实现了音讯和状态的统一,然而没有实现传输音讯的代码,这部分须要 etcd 利用层面来实现
rc.transport = &rafthttp.Transport{Logger: zap.NewExample(),
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("",""),
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
// 记录集群中实例信息,用来通信
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
// serveRaft 是 transport 的 httpserver 用来解决通信,先不看
go rc.serveRaft()
go rc.serveChannels()}
看一下 serveChannels,这里是重点,上面咱们来剖析一下整个过程
func (rc *raftNode) serveChannels() {snap, err := rc.raftStorage.Snapshot()
if err != nil {panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// 当 e 咱们对 etcd 进行操作时,增删改查时,都是一个 proposals,这个 proposals 要传递到 raft 协定中,让其保护集群中各个节点的统一状态
// send proposals over raft
go func() {confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
// 收到一个 proposals 后发送到 raft 协定中,前面会看到当一个 http 申请进来时会向这个 proposeC 传递数据的
case prop, ok := <-rc.proposeC:
if !ok {rc.proposeC = nil} else {
// blocks until accepted by raft state machine
// 发送
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {rc.confChangeC = nil} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
// 当 raft 协定解决完后,会返回给下层利用一条音讯,由 etcd 利用层面进行解决,raft 协定层做了什么先不剖析,接下来会专门写一篇文章来剖析 raft 协定的流程
case rd := <-rc.node.Ready():
// 长久化到 wal 日志
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
// 保留到快照
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
// 增加到内存索引中
rc.raftStorage.Append(rd.Entries)
// 发送到集群中其余节点
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {rc.stop()
return
}
rc.maybeTriggerSnapshot()
// 从 raft 协定中获取下一条待处理的音讯
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
让咱们回到 main.go 文件中,从 newRaftNode 这个函数始终走了很远进去,这个函数最初返回了 几个参数
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
接着这行代码持续向下剖析 newKVStore
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
着是一个简略的 内存 kv 存储,模仿了 etcd 中的 kv 存储,在 etcd 中 v3 版本是用 bolt 这个 golang 语言开发的 kv 存储,这个 example 为了阐明 raft 协定在 etcd 中的利用所以简略用内存构造结构了 kv 存储。代码里做的事件就是读取 commitC 这个 cahnnel 中的信息,而后将信息存储到 map 中,就不具体分析了
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
紧接着启动了 http 服务,具体实现在 httpapi.go 文件中
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
switch {
// 看一下 put 操作
case r.Method == "PUT":
v, err := ioutil.ReadAll(r.Body)
if err != nil {log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
// stor 是 kvstor 内存 kv 存储,h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
case r.Method == "GET":
if v, ok := h.store.Lookup(key); ok {w.Write([]byte(v))
} else {http.Error(w, "Failed to GET", http.StatusNotFound)
}
case r.Method == "POST":
url, err := ioutil.ReadAll(r.Body)
if err != nil {log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
case r.Method == "DELETE":
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
default:
w.Header().Set("Allow", "PUT")
w.Header().Add("Allow", "GET")
w.Header().Add("Allow", "POST")
w.Header().Add("Allow", "DELETE")
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{
store: kv,
confChangeC: confChangeC,
},
}
go func() {if err := srv.ListenAndServe(); err != nil {log.Fatal(err)
}
}()
// exit when raft goes down
if err, ok := <-errorC; ok {log.Fatal(err)
}
}
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {log.Fatal(err)
}
// 发送申请数据到 proposeC 中,下面咱们剖析过有中央在监听这个 proposeC channel
s.proposeC <- buf.String()}
到此整个 example 中的 raft 流程完结了,看上去还是蛮简略的,接下来会专门剖析一下 raft 协定外部的原理。