工作当中应用etcd作为配置核心,次要应用了etcdclient提供的watch接口对存储的配置进行实时监听更新,很好奇etcd外部是如何做到不丢数据并联通上下游的,于是翻看了局部v3版本实现代码,在惊叹大佬们的代码程度同时又在鄙视本人写的lowB代码。

简略应用
简略应用etcdctl命令行做一个演示,次要展现一下性能。

# 首先启动一个etcd$ ./etcd# 存入数据,存三次$ etcdctl put testwatch 1$ etcdctl put testwatch 2$ etcdctl put testwatch 3# watch key,--rev=1示意从版本号为1开始watch$ etcdctl --endpoints=127.0.0.1:23790 watch testwatch --rev=1 -w=json{    "Header":{        "cluster_id":14841639068965178418,        "member_id":10276657743932975437,        "revision":27,        "raft_term":25    },    "Events":[        {            "kv":{                "key":"dGVzdHdhdGNo",                "create_revision":25,                "mod_revision":25,                "version":1,                "value":"MQ=="            }        },        {            "kv":{                "key":"dGVzdHdhdGNo",                "create_revision":25,                "mod_revision":27,                "version":3,                "value":"Mw=="            }        }    ],    "CompactRevision":0,    "Canceled":false,    "Created":false}#此时下面返回了从rev为1开始的变动,这时候再次对该key做批改(put testwatch 4),还会源源不断#输入更改后的内容等信息{    "Header":{        "cluster_id":14841639068965178418,        "member_id":10276657743932975437,        "revision":28,        "raft_term":25    },    "Events":[        {            "kv":{                "key":"dGVzdHdhdGNo",                "create_revision":25,                "mod_revision":28,                "version":4,                "value":"NA=="            }        }    ],    "CompactRevision":0,    "Canceled":false,    "Created":false}


当指定版本号时候会返回所有版本号前面的历史的批改记录,如果不指定则只会在发生变化时候返回变动后的键值。理解了最简略的用法后,咱们从上到下挖一挖watch机制的原理。

解决流程

etcd服务启动后会启动grpc服务端,并注册Watch服务,写过proto文件的应该很相熟,客户端与服务端之间通过流式grpc做交互,每个客户端的watch申请对应到一个Watch办法,这个Watch办法就联通了客户端与etcd上游存储的变更,能够源源不断将变更的键值告诉到客户端,也能够监听客户端的一些操作(如勾销监听)并同步到etcd的上游。接下来从下层到上层的源码来剖析watcher机制的实现形式。

service Watch {  rpc Watch(stream WatchRequest) returns (stream WatchResponse) {      option (google.api.http) = {        post: "/v3/watch"        body: "*"    };  }}

下层解决

每一个Watch申请都会创立一个serverWatchStream构造体,该构造体向上通过gRPCStream与客户端做交互,向下通过watchStream与etcd存储mvcc局部打交道。

//etcd/server/etcdserver/api/v3rpc/watch.gofunc (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {    //初始化一个serverWatchStream构造体    sws := serverWatchStream{         ......        //etcd启动时初始化的watchableStore赋值给watchable,下文会提到。        watchable: ws.watchable,         //用于和客户端进行流式grpc交互的接口,提供了Send和Recv等办法,Send示意发送信息到客户端,Recv示意从客户端取信息        gRPCStream:  stream,        //次要用于取出变更或者订阅的键值变动,详见下方源码        watchStream: ws.watchable.NewWatchStream(),        // chan for sending control response like watcher created and canceled.        ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),        ......        closec: make(chan struct{}),    }    sws.wg.Add(1)    go func() {        //向客户端发送变更事件        sws.sendLoop()        sws.wg.Done()    }()    errc := make(chan error, 1)    go func() {        //解决客户端的申请,订阅kv或勾销操作等        if rerr := sws.recvLoop(); rerr != nil {            ......错误判断            errc <- rerr        }    }()    //期待勾销操作    select {    case err = <-errc:        ......        //如果recvloop出错返回,敞开ctrlStream        close(sws.ctrlStream)    case <-stream.Context().Done():        ......错误处理    }        //敞开连贯与申请,敞开channel,期待两个协程退出    sws.close()     return err}func (sws *serverWatchStream) close() {    sws.watchStream.Close()    close(sws.closec)//敞开closec,用于告诉sendLoop协程退出    sws.wg.Wait()//期待全副退出}//etcd/server/mvcc/watchable_store.gofunc (s *watchableStore) NewWatchStream() WatchStream {    return &watchStream{        //etcd启动时初始化的watchableStore        watchable: s,        //这个管道用于从etcd外面拿到变更数据,调用Chan()能够取出数据,buf长度是128        ch:        make(chan WatchResponse, chanBufLen),         ......    }}

serverWatchStream与高低互通的形式是通过两个协程。一个sendLoop,次要向客户端同步变更订阅数据。一个recvloop,次要接管客户端的申请并向上游发送创立对某个键值的订阅申请,以及勾销等操作。

管制协程退出以及后续解决的形式应用了waitGroup的形式,能够在sws.close()的实现里看到敞开watchStream以及期待组等操作。

recvLoop

recvLoop负责了创立监听键值的操作,通过监听grpc流式的操作,调用Recv办法取出客户端的申请,并作出对应的响应。recvLoop的数据次要流向是从外向内。

//etcd/server/etcdserver/api/v3rpc/watch.gofunc (sws *serverWatchStream) recvLoop() error {    for {        req, err := sws.gRPCStream.Recv()        //出错解决操作        ......        //对申请进行断言判断,并别离解决各种类型的request        switch uv := req.RequestUnion.(type) {        case *pb.WatchRequest_CreateRequest: //客户端的watch创立申请            ......            //在上游创立一个服务这个客户端监听的watcher,客户端订阅的可能是一个key,也可能是一个范畴内的key,返回一个watchid。            id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)            if err == nil {                //将申请体信息中局部参数写入serverWatchStream                sws.mu.Lock()                if creq.ProgressNotify {                    sws.progress[id] = true                }//是否返回上一个kv                if creq.PrevKv {                    sws.prevKV[id] = true                }//是否分包                if creq.Fragment {                    sws.fragment[id] = true                }                sws.mu.Unlock()            }                   ......        case *pb.WatchRequest_CancelRequest:            //勾销订阅申请,会实现删除订阅的watcher等操作            ......        //其余操作解决            ......    }}

sendLoop

watchStream作为重要的变更数据源,sendLoop会轮询调用它的Chan()办法,该办法就是在获取watchStream中的ch管道中的数据。

func (sws *serverWatchStream) sendLoop() {    ......    //因为客户端勾销或者其余起因导致程序返回,收尾操作,清理沉积的音讯事件    defer func() {        ......        // 革除ch中积压的数据,不便垃圾回收?        for ws := range sws.watchStream.Chan() {            mvcc.ReportEventReceived(len(ws.Events))        }        for _, wrs := range pending {            for _, ws := range wrs {                mvcc.ReportEventReceived(len(ws.Events))            }        }    }()    for {        select {        case wresp, ok := <-sws.watchStream.Chan(): //从chan中读取数据            //ch被敞开,间接返回            if !ok {                return            }            evs := wresp.Events            events := make([]*mvccpb.Event, len(evs))            ......            //遍历接管到的变更事件封装到events            for i := range evs {                events[i] = &evs[i]                ......            }            //将events封装到WatchResponse            wr := &pb.WatchResponse{                Header:          sws.newResponseHeader(wresp.Revision),                WatchId:         int64(wresp.WatchID),                Events:          events,                CompactRevision: wresp.CompactRevision,                Canceled:        canceled,            }            //如果不是指定的watchid,放到pending队列            if _, okID := ids[wresp.WatchID]; !okID {                // buffer if id not yet announced                wrs := append(pending[wresp.WatchID], wr)                pending[wresp.WatchID] = wrs                continue            }            .....            //判断是否须要分包,并调用Send办法将事件发送到客户端            var serr error            if !fragmented && !ok {                serr = sws.gRPCStream.Send(wr)            } else {                serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)            }            ......//错误处理        case c, ok := <-sws.ctrlStream://解决ctrlStream            //敞开则间接返回            ......        case <-sws.closec://判断closec是否敞开,如果敞开,间接返回            return        }    }}

下层解决的流程能够用下图来示意,次要体现了serverWatchStream这个桥梁的作用。

上层解决

下面说到,再recvLoop中会调用watchStream.Watch办法,该办法会生成一个watchID,而后调用其成员watchable的watch办法创立一个对于订阅键值的watcher。

//etcd/server/mvcc/watcher.gofunc (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {    //生成watchID操作    ......    //调用watch办法,次要关注key以及ch的流向    w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)    ......}//etcd/server/mvcc/watchable_store.gofunc (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {    //key以及ch被封装到了watcher构造体中    wa := &watcher{        key:    key,        end:    end,        minRev: startRev,        id:     id,        ch:     ch,        fcs:    fcs,    }    .....}

watcher的ch收到变更数据,就会被下层的sendLoop捕捉并推送给客户端。在此之前watcher会被放在watchableStore的某个汇合中,期待监听的key变更。

watchableStore

上文提到watchableStore是在etcd初始化时候创立的一个全局的配置项。因为客户端不止一个,监听的键值不止一对,并且ch如果满了可能会被阻塞,所以该配置项共配置了三个批次的watcher汇合,别离是synced,unsynced,victims,别离寄存位于不同阶段的watcher。

//etcd/server/mvcc/watchable_store.gotype watchableStore struct {    *store    // victims是在变更产生,发送数据到ch然而通道满时,被阻塞的watcher汇合    victims []watcherBatch    victimc chan struct{}//通道用于告诉是否须要清理victims    //未同步实现的watcher,    unsynced watcherGroup    //曾经同步实现,在期待新的新的变更事件的watcher队列    synced watcherGroup    ......}

在etcd启动初始化watchableStore时候,会启动两个异步协程清理unsynced和victims汇合中的watcher。

//etcd/server/mvcc/watchable_store.gofunc newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {    ......    s := &watchableStore{        store:    NewStore(lg, b, le, cfg),        victimc:  make(chan struct{}, 1),        unsynced: newWatcherGroup(),        synced:   newWatcherGroup(),        stopc:    make(chan struct{}),    }    //创立两个协程,用于解决watcher数据    s.wg.Add(2)    go s.syncWatchersLoop() //革除unsync,每隔 100ms调用一次 syncWatchers    go s.syncVictimsLoop()  //革除victim中沉积的event    return s}

unsynced

产生watcher沉积的起因次要是两种,一种是当客户端执行watch时候指定了历史版本号,该操作须要从boltDB中取值,不能间接放到synced队列中期待新的变更,须要放到unsync中。

//etcd/server/mvcc/watchable_store.gofunc (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {    //key以及ch被封装到了watcher构造体中    wa := &watcher{        key:    key,        end:    end,        minRev: startRev,        id:     id,        ch:     ch,        fcs:    fcs,    }    s.mu.Lock()    s.revMu.RLock()    //如果指定的版本号version是历史的版本号,则将watcher放到unsynced中    synced := startRev > s.store.currentRev || startRev == 0    if synced {        //放到synced        s.synced.add(wa) //使sync队列加上watcher    } else {        //放到unsynced        s.unsynced.add(wa)    }    return wa, func() { s.cancelWatcher(wa) }//返回watcher以及勾销的办法    }

victims

第二种是积压的起因是因为检测到了watch变更,在将数据发送到ch时候,ch缓冲已满,此时须要将watcher存到其余区域(victims)。如果硬往里放数据,该协程会被park住,阻塞其余操作。

检测到watcher变更产生在put一个键值时候,此时事务提交,最终写入之前会调用notify()办法检测是否有针对该键值的watcher。

//etcd/server/mvcc/watchable_store_txn.gofunc (tw *watchableStoreTxnWrite) End() {    ......    tw.s.mu.Lock()    //提交事件之前调用notify()    tw.s.notify(rev, evs)    tw.TxnWrite.End()    tw.s.mu.Unlock()}func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {    var victim watcherBatch    //newWatcherBatch会遍历watchableStore的synced队列,并拿evs中kv比照是否有监听的key,返回一个watcher汇合    //for range遍历newWatcherBatch返回的watcher汇合    for w, eb := range newWatcherBatch(&s.synced, evs) {        ......        //调用send办法将event发送到ch中,未阻塞的话,会被最上层的sendLoop接管到。        if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {            pendingEventsGauge.Add(float64(len(eb.evs))) //promethous操作        } else {            //将watcher增加到victims汇合中            w.minRev = rev + 1            if victim == nil {                victim = make(watcherBatch)            }            w.victim = true            victim[w] = eb            //删除synced队列中的该watch            s.synced.delete(w)        }    }    s.addVictim(victim)}//etcd/server/mvcc/watchable_store.gofunc (w *watcher) send(wr WatchResponse) bool {    progressEvent := len(wr.Events) == 0    //过滤事件    ......    //将音讯发送到channel,如果ch满了就走default    select {    case w.ch <- wr:         return true    default:        return false    }}func (s *watchableStore) addVictim(victim watcherBatch) {    //空间接返回    if victim == nil {        return    }    //减少watcher到victims,并发送信号告诉    s.victims = append(s.victims, victim)    select {    case s.victimc <- struct{}{}:    default:    }}

unsynced清理

上文提到,两个沉积队列的watcher清理的形式是通过两个异步协程做到的。接下来咱们先看unsynced队列的清理形式。

//etcd/server/mvcc/watchable_store.gofunc (s *watchableStore) syncWatchersLoop() {    for {        ......        //如果队列大于0,则进入syncWatchers()同步watcher        if lastUnsyncedWatchers > 0 {            unsyncedWatchers = s.syncWatchers()        }        ......        //定时器解决        ......    }}func (s *watchableStore) syncWatchers() int {    //选出unsync队列中的watcher,返回一个watcherGroup    wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)    ......    //从boltdb中取出所有键值以及对应版本号    tx := s.store.b.ReadTx()    tx.RLock()    revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)    tx.RUnlock()    //因为下面取出的是所有的kv对以及版本号,所有要应用watcherGroup筛选进去监听的键值对应事件    evs := kvsToEvents(s.store.lg, wg, revs, vs)    var victims watcherBatch    wb := newWatcherBatch(wg, evs)    for w := range wg.watchers {        ........        //发送音讯到watcher对应的ch,如果阻塞,放入victims队列中        if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {            pendingEventsGauge.Add(float64(len(eb.evs)))        } else {            if victims == nil {//为空就make一个                victims = make(watcherBatch)            }            w.victim = true//置标记位        }        if w.victim {            victims[w] = eb        } else {            .......            //音讯发送完了,把watcher放入synced队列,期待新的变更事件            s.synced.add(w)        }        //把unsynced中的watcher勾销掉        s.unsynced.delete(w)    }    //减少victim    s.addVictim(victims)    ......    return s.unsynced.size()}

syncWatchersLoop通过一个定时器每隔100ms轮询一次unsynced watcher队列,如果队列不为空,就筛选出数据中的对应键值对以及相应版本号,并最终返还给客户端,将watcher挪动到synced队列。

victims清理

victims的清理也是通过一个异步协程的形式,如果victims队列不为空状况下,会始终以10ms的轮询速度来进行清理,在不产生拥挤时候该异步协程会阻塞在最上面的select,当上游开释信号,则开始一波清理。

//etcd/server/mvcc/watchable_store.gofunc (s *watchableStore) syncVictimsLoop() {    defer s.wg.Done()    for {        //通过moveVictims革除沉积数据        for s.moveVictims() != 0 {        }        s.mu.RLock()        isEmpty := len(s.victims) == 0        s.mu.RUnlock()        var tickc <-chan time.Time        if !isEmpty {            tickc = time.After(10 * time.Millisecond)        }        select {        case <-tickc:        case <-s.victimc://接管到信号,开始进行清理        case <-s.stopc:            return        }    }}func (s *watchableStore) moveVictims() (moved int) {    //把victims队列取出来,并置s.victims为空,后续应用newVictim代替    s.mu.Lock()    victims := s.victims    s.victims = nil    s.mu.Unlock()    var newVictim watcherBatch    //遍历队列,尝试发送    for _, wb := range victims {        // 尝试发送,发送阻塞放到newVictim        for w, eb := range wb {            rev := w.minRev - 1            if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {                pendingEventsGauge.Add(float64(len(eb.evs)))            } else {                if newVictim == nil {                    newVictim = make(watcherBatch)                }                newVictim[w] = eb                continue            }            moved++        }        ......        //遍历并判断是否将音讯发送完了        for w, eb := range wb {            if newVictim != nil && newVictim[w] != nil {                // couldn't send watch response; stays victim                continue            }            w.victim = false            if eb.moreRev != 0 {                w.minRev = eb.moreRev            }            //如果版本号小于以后版本,则导入unsync队列            if w.minRev <= curRev {                s.unsynced.add(w)            } else {                //放入sync队列                s.synced.add(w)            }        }        s.store.revMu.RUnlock()        s.mu.Unlock()    }    //把新的队列搁置到victims中    if len(newVictim) > 0 {        s.mu.Lock()        s.victims = append(s.victims, newVictim)        s.mu.Unlock()    }    return moved}

清理过程也是通过尝试发送,发送受到阻塞则放入新的victims,发送胜利则进一步判断是将watcher队列挪动到synced或unsynced队列中,最初应用新的victims赋值,这样做保障了不会产生数据的失落。

勾销监听

在创立监听操作时候,在watchableStore中返回了对应的勾销办法。

//etcd/server/mvcc/watchable_store.gofunc (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {    //创立watcher以及放入队列操作    ......    //返回watcher以及勾销的办法    return wa, func() { s.cancelWatcher(wa) }}//cancelWatcher的实现func (s *watchableStore) cancelWatcher(wa *watcher) {    for {        s.mu.Lock()        //尝试从各个队列删除        if s.unsynced.delete(wa) {            .....            break        } else if s.synced.delete(wa) {            ......            break        }        .......        //victims删除        var victimBatch watcherBatch        .......        if victimBatch != nil {            ......            delete(victimBatch, wa)            break        }        s.mu.Unlock()        time.Sleep(time.Millisecond)    }    wa.ch = nil    s.mu.Unlock()}//etcd/server/mvcc/watcher.gofunc (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {    ......    w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)    //将cancelWatcher办法赋值到watchStream的名为cancel的map中    ws.cancels[id] = c    ws.watchers[id] = w    return id, nil}

在客户端勾销对键值的监听时候,会对应到下层的recvLoop中。

func (sws *serverWatchStream) recvLoop() error { //接管客户端的申请    for {        req, err := sws.gRPCStream.Recv()        ......        //对申请进行断言判断,并别离解决各种类型的request        switch uv := req.RequestUnion.(type) {        case *pb.WatchRequest_CreateRequest: //监听的可能是一个范畴            ......        case *pb.WatchRequest_CancelRequest:            if uv.CancelRequest != nil {                //获取勾销监听的watchID                id := uv.CancelRequest.WatchId                //勾销监听操作                err := sws.watchStream.Cancel(mvcc.WatchID(id))                ......            }         ......    }}    func (ws *watchStream) Cancel(id WatchID) error {    ws.mu.Lock()    cancel, ok := ws.cancels[id]//取出cancel函数    ......    //执行cancel    cancel()    ......    return nil}

小结

咱们从上到下梳理了watch机制的实现形式,对于咱们了解watch来说,最重要的是了解两个数据结构,serverWatchStream和watchableStore。

serverWatchStream贯通客户端的grpc流与上层的变更通道,watchableStore则保护了监听的键值汇合并在上层解决用户订阅的键值。整体流程框架能够用下图示意,能够看出,这一乏味且可靠的设计背地是对channel与goroutine的奇妙使用。