一、简介

etcd 是基于 raft 协定实现的分布式一致性jian值存储,本篇文章不介绍etcd的应用,本文解说在etcd源码中提供的example,通过这个example来学习etcd是如何应用 raft协定。


二、实现

这个example在etcd源码目录下的contrib目录中

tree -L 1.├── Makefile├── NOTICE├── OWNERS├── Procfile├── Procfile.v2├── README.md├── ROADMAP.md├── auth├── bill-of-materials.json├── bill-of-materials.override.json├── build├── build.bat├── build.ps1├── client├── clientv3├── code-of-conduct.md├── contrib    # 明天的配角├── docs├── embed├── etcd.conf.yml.sample├── etcdctl├── etcdmain├── etcdserver├── functional├── functional.yaml├── go.mod├── go.sum├── hack├── integration├── lease├── logos├── main.go├── main_test.go├── mvcc├── pkg├── proxy├── raft├── scripts├── test├── tests├── tools├── vendor├── version└── wal
tree -L 1 contrib/raftexample/contrib/raftexample/├── Procfile├── README.md├── doc.go├── httpapi.go├── kvstore.go├── kvstore_test.go├── listener.go├── main.go├── raft.go└── raftexample_test.go

先看一下入口文件 main.go

func main() {    cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")    id := flag.Int("id", 1, "node ID")    kvport := flag.Int("port", 9121, "key-value server port")    join := flag.Bool("join", false, "join an existing cluster")    flag.Parse()    proposeC := make(chan string)    defer close(proposeC)    confChangeC := make(chan raftpb.ConfChange)    defer close(confChangeC)    // raft provides a commit stream for the proposals from the http api    var kvs *kvstore    getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)    // the key-value http handler will propose updates to raft    serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)}

进行了一些初始化动作,看一下 newRaftNode 在 raft.go文件中

// newRaftNode initiates a raft instance and returns a committed log entry// channel and error channel. Proposals for log updates are sent over the// provided the proposal channel. All log entries are replayed over the// commit channel, followed by a nil message (to indicate the channel is// current), then new log entries. To shutdown, close proposeC and read errorC.func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,    confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {    commitC := make(chan *string)    errorC := make(chan error)    // 初始化 raftnode 这个raft node 是etcd 中利用层面的 raft node 在 raft 协定层面也是用一个 raft node 通过利用层面的构造体定义能够发现 在构造体中是存在一个 raft 协定层面的node的,这两个node是一对一的关系    rc := &raftNode{        proposeC:    proposeC,        confChangeC: confChangeC,        commitC:     commitC,        errorC:      errorC,        id:          id,        peers:       peers,        join:        join,        waldir:      fmt.Sprintf("raftexample-%d", id),        snapdir:     fmt.Sprintf("raftexample-%d-snap", id),        getSnapshot: getSnapshot,        snapCount:   defaultSnapshotCount,        stopc:       make(chan struct{}),        httpstopc:   make(chan struct{}),        httpdonec:   make(chan struct{}),        snapshotterReady: make(chan *snap.Snapshotter, 1),        // rest of structure populated after WAL replay    }    go rc.startRaft()    return commitC, errorC, rc.snapshotterReady}

看一下 startRaft 做了什么,还是在以后文件下

func (rc *raftNode) startRaft() {    if !fileutil.Exist(rc.snapdir) {        if err := os.Mkdir(rc.snapdir, 0750); err != nil {            log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)        }    }    // 获取快照实例    rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)    rc.snapshotterReady <- rc.snapshotter   // 重放wal日志到内存,因为etcd保护了内存索引,查问时会通过内存索引获取到信息,这个信息是指key的值和版本号    oldwal := wal.Exist(rc.waldir)    rc.wal = rc.replayWAL()    rpeers := make([]raft.Peer, len(rc.peers))    for i := range rpeers {        rpeers[i] = raft.Peer{ID: uint64(i + 1)}    }    // 和 raft 一致性协定的相干配置    c := &raft.Config{        ID:                        uint64(rc.id),        ElectionTick:              10,        HeartbeatTick:             1,        Storage:                   rc.raftStorage,        MaxSizePerMsg:             1024 * 1024,        MaxInflightMsgs:           256,        MaxUncommittedEntriesSize: 1 << 30,    }    // 初始化 raft协定层面的 node     if oldwal {        rc.node = raft.RestartNode(c)    } else {        startPeers := rpeers        if rc.join {            startPeers = nil        }        rc.node = raft.StartNode(c, startPeers)    }    // 初始化 transport, transport 用来和etcd 集群中其余节点间进行通信并传递信息的桥梁,raft协定只是实现了音讯和状态的统一,然而没有实现传输音讯的代码,这部分须要etcd利用层面来实现    rc.transport = &rafthttp.Transport{        Logger:      zap.NewExample(),        ID:          types.ID(rc.id),        ClusterID:   0x1000,        Raft:        rc,        ServerStats: stats.NewServerStats("", ""),        LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),        ErrorC:      make(chan error),    }    // 记录集群中实例信息,用来通信    rc.transport.Start()    for i := range rc.peers {        if i+1 != rc.id {            rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})        }    }    // serveRaft 是transport 的httpserver用来解决通信,先不看    go rc.serveRaft()    go rc.serveChannels()}

看一下 serveChannels,这里是重点,上面咱们来剖析一下整个过程

func (rc *raftNode) serveChannels() {    snap, err := rc.raftStorage.Snapshot()    if err != nil {        panic(err)    }    rc.confState = snap.Metadata.ConfState    rc.snapshotIndex = snap.Metadata.Index    rc.appliedIndex = snap.Metadata.Index    defer rc.wal.Close()    ticker := time.NewTicker(100 * time.Millisecond)    defer ticker.Stop()     // 当e咱们对etcd进行操作时,增删改查时,都是一个proposals,这个proposals要传递到 raft 协定中,让其保护集群中各个节点的统一状态    // send proposals over raft    go func() {        confChangeCount := uint64(0)        for rc.proposeC != nil && rc.confChangeC != nil {            select {            // 收到一个 proposals 后发送到 raft 协定中,前面会看到当一个http申请进来时会向这个proposeC传递数据的            case prop, ok := <-rc.proposeC:                if !ok {                    rc.proposeC = nil                } else {                    // blocks until accepted by raft state machine                    // 发送                    rc.node.Propose(context.TODO(), []byte(prop))                }            case cc, ok := <-rc.confChangeC:                if !ok {                    rc.confChangeC = nil                } else {                    confChangeCount++                    cc.ID = confChangeCount                    rc.node.ProposeConfChange(context.TODO(), cc)                }            }        }        // client closed channel; shutdown raft if not already        close(rc.stopc)    }()    // event loop on raft state machine updates    for {        select {        case <-ticker.C:            rc.node.Tick()        // store raft entries to wal, then publish over commit channel        // 当raft 协定解决完后,会返回给下层利用一条音讯,由etcd利用层面进行解决,raft 协定层做了什么先不剖析,接下来会专门写一篇文章来剖析raft协定的流程        case rd := <-rc.node.Ready():            // 长久化到 wal 日志            rc.wal.Save(rd.HardState, rd.Entries)            if !raft.IsEmptySnap(rd.Snapshot) {                // 保留到快照                rc.saveSnap(rd.Snapshot)    rc.raftStorage.ApplySnapshot(rd.Snapshot)                rc.publishSnapshot(rd.Snapshot)            }            // 增加到内存索引中            rc.raftStorage.Append(rd.Entries)            // 发送到集群中其余节点            rc.transport.Send(rd.Messages)            if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {                rc.stop()                return            }            rc.maybeTriggerSnapshot()            // 从 raft 协定中获取下一条待处理的音讯            rc.node.Advance()        case err := <-rc.transport.ErrorC:            rc.writeError(err)            return        case <-rc.stopc:            rc.stop()            return        }    }}

让咱们回到 main.go 文件中,从 newRaftNode 这个函数始终走了很远进去,这个函数最初返回了 几个参数

    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

接着这行代码持续向下剖析 newKVStore

kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

着是一个简略的 内存 kv 存储,模仿了etcd中的kv存储,在etcd中 v3版本是用 bolt 这个golang语言开发的kv存储,这个 example为了阐明raft协定在etcd中的利用所以简略用内存构造结构了kv存储。代码里做的事件就是读取 commitC 这个cahnnel 中的信息,而后将信息存储到map中,就不具体分析了

serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)

紧接着启动了http服务,具体实现在 httpapi.go 文件中

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {    key := r.RequestURI    switch {    // 看一下put操作    case r.Method == "PUT":        v, err := ioutil.ReadAll(r.Body)        if err != nil {            log.Printf("Failed to read on PUT (%v)\n", err)            http.Error(w, "Failed on PUT", http.StatusBadRequest)            return        }        // stor 是 kvstor 内存kv存储,        h.store.Propose(key, string(v))        // Optimistic-- no waiting for ack from raft. Value is not yet        // committed so a subsequent GET on the key may return old value        w.WriteHeader(http.StatusNoContent)    case r.Method == "GET":        if v, ok := h.store.Lookup(key); ok {            w.Write([]byte(v))        } else {            http.Error(w, "Failed to GET", http.StatusNotFound)        }    case r.Method == "POST":        url, err := ioutil.ReadAll(r.Body)        if err != nil {            log.Printf("Failed to read on POST (%v)\n", err)            http.Error(w, "Failed on POST", http.StatusBadRequest)            return        }        nodeId, err := strconv.ParseUint(key[1:], 0, 64)        if err != nil {            log.Printf("Failed to convert ID for conf change (%v)\n", err)            http.Error(w, "Failed on POST", http.StatusBadRequest)            return        }        cc := raftpb.ConfChange{            Type:    raftpb.ConfChangeAddNode,            NodeID:  nodeId,            Context: url,        }        h.confChangeC <- cc        // As above, optimistic that raft will apply the conf change        w.WriteHeader(http.StatusNoContent)    case r.Method == "DELETE":        nodeId, err := strconv.ParseUint(key[1:], 0, 64)        if err != nil {            log.Printf("Failed to convert ID for conf change (%v)\n", err)            http.Error(w, "Failed on DELETE", http.StatusBadRequest)            return        }        cc := raftpb.ConfChange{            Type:   raftpb.ConfChangeRemoveNode,            NodeID: nodeId,        }        h.confChangeC <- cc        // As above, optimistic that raft will apply the conf change        w.WriteHeader(http.StatusNoContent)    default:        w.Header().Set("Allow", "PUT")        w.Header().Add("Allow", "GET")        w.Header().Add("Allow", "POST")        w.Header().Add("Allow", "DELETE")        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)    }}// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {    srv := http.Server{        Addr: ":" + strconv.Itoa(port),        Handler: &httpKVAPI{            store:       kv,            confChangeC: confChangeC,        },    }    go func() {        if err := srv.ListenAndServe(); err != nil {            log.Fatal(err)        }    }()    // exit when raft goes down    if err, ok := <-errorC; ok {        log.Fatal(err)    }}
func (s *kvstore) Propose(k string, v string) {    var buf bytes.Buffer    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {        log.Fatal(err)    }     // 发送申请数据到proposeC中,下面咱们剖析过有中央在监听这个proposeC channel    s.proposeC <- buf.String()}

到此整个example中的raft流程完结了,看上去还是蛮简略的,接下来会专门剖析一下 raft协定外部的原理。