乐趣区

关于etcd:etcd的watch是怎么实现的

工作当中应用 etcd 作为配置核心,次要应用了 etcdclient 提供的 watch 接口对存储的配置进行实时监听更新,很好奇 etcd 外部是如何做到不丢数据并联通上下游的,于是翻看了局部 v3 版本实现代码,在惊叹大佬们的代码程度同时又在鄙视本人写的 lowB 代码。

简略应用
简略应用 etcdctl 命令行做一个演示,次要展现一下性能。

# 首先启动一个 etcd
$ ./etcd
# 存入数据, 存三次
$ etcdctl put testwatch 1
$ etcdctl put testwatch 2
$ etcdctl put testwatch 3
# watch key,--rev= 1 示意从版本号为 1 开始 watch
$ etcdctl --endpoints=127.0.0.1:23790 watch testwatch --rev=1 -w=json
{
    "Header":{
        "cluster_id":14841639068965178418,
        "member_id":10276657743932975437,
        "revision":27,
        "raft_term":25
    },
    "Events":[
        {
            "kv":{
                "key":"dGVzdHdhdGNo",
                "create_revision":25,
                "mod_revision":25,
                "version":1,
                "value":"MQ=="
            }
        },
​
        {
            "kv":{
                "key":"dGVzdHdhdGNo",
                "create_revision":25,
                "mod_revision":27,
                "version":3,
                "value":"Mw=="
            }
        }
    ],
    "CompactRevision":0,
    "Canceled":false,
    "Created":false
}
#此时下面返回了从 rev 为 1 开始的变动,这时候再次对该 key 做批改(put testwatch 4),还会源源不断 #输入更改后的内容等信息
{
    "Header":{
        "cluster_id":14841639068965178418,
        "member_id":10276657743932975437,
        "revision":28,
        "raft_term":25
    },
    "Events":[
        {
            "kv":{
                "key":"dGVzdHdhdGNo",
                "create_revision":25,
                "mod_revision":28,
                "version":4,
                "value":"NA=="
            }
        }
    ],
    "CompactRevision":0,
    "Canceled":false,
    "Created":false
}


当指定版本号时候会返回所有版本号前面的历史的批改记录,如果不指定则只会在发生变化时候返回变动后的键值。理解了最简略的用法后,咱们从上到下挖一挖 watch 机制的原理。

解决流程

etcd 服务启动后会启动 grpc 服务端,并注册 Watch 服务,写过 proto 文件的应该很相熟,客户端与服务端之间通过流式 grpc 做交互,每个客户端的 watch 申请对应到一个 Watch 办法,这个 Watch 办法就联通了客户端与 etcd 上游存储的变更,能够源源不断将变更的键值告诉到客户端,也能够监听客户端的一些操作 (如勾销监听) 并同步到 etcd 的上游。接下来从下层到上层的源码来剖析 watcher 机制的实现形式。

service Watch {rpc Watch(stream WatchRequest) returns (stream WatchResponse) {option (google.api.http) = {
        post: "/v3/watch"
        body: "*"
    };
  }
}

下层解决

每一个 Watch 申请都会创立一个 serverWatchStream 构造体,该构造体向上通过 gRPCStream 与客户端做交互,向下通过 watchStream 与 etcd 存储 mvcc 局部打交道。

//etcd/server/etcdserver/api/v3rpc/watch.go
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
    // 初始化一个 serverWatchStream 构造体
    sws := serverWatchStream{ 
        ......
        //etcd 启动时初始化的 watchableStore 赋值给 watchable,下文会提到。watchable: ws.watchable, 
        // 用于和客户端进行流式 grpc 交互的接口,提供了 Send 和 Recv 等办法,Send 示意发送信息到客户端,Recv 示意从客户端取信息
        gRPCStream:  stream,
        // 次要用于取出变更或者订阅的键值变动, 详见下方源码
        watchStream: ws.watchable.NewWatchStream(),
        // chan for sending control response like watcher created and canceled.
        ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
        ......
        closec: make(chan struct{}),
​
    }
    sws.wg.Add(1)
    go func() {
        // 向客户端发送变更事件
        sws.sendLoop()
        sws.wg.Done()}()
​
    errc := make(chan error, 1)
    go func() {
        // 解决客户端的申请,订阅 kv 或勾销操作等
        if rerr := sws.recvLoop(); rerr != nil {
            ...... 错误判断
            errc <- rerr
        }
    }()
​
    // 期待勾销操作
    select {
    case err = <-errc:
        ......
        // 如果 recvloop 出错返回,敞开 ctrlStream
        close(sws.ctrlStream)
    case <-stream.Context().Done():
        ...... 错误处理
    }
    
    // 敞开连贯与申请,敞开 channel,期待两个协程退出
    sws.close() 
    return err
}
​
func (sws *serverWatchStream) close() {sws.watchStream.Close()
    close(sws.closec)// 敞开 closec,用于告诉 sendLoop 协程退出
    sws.wg.Wait()// 期待全副退出}
​
//etcd/server/mvcc/watchable_store.go
func (s *watchableStore) NewWatchStream() WatchStream {
    return &watchStream{
        //etcd 启动时初始化的 watchableStore
        watchable: s,
        // 这个管道用于从 etcd 外面拿到变更数据,调用 Chan()能够取出数据,buf 长度是 128
        ch:        make(chan WatchResponse, chanBufLen), 
        ......
    }
}

serverWatchStream 与高低互通的形式是通过两个协程。一个 sendLoop,次要向客户端同步变更订阅数据。一个 recvloop,次要接管客户端的申请并向上游发送创立对某个键值的订阅申请,以及勾销等操作。

管制协程退出以及后续解决的形式应用了 waitGroup 的形式,能够在 sws.close()的实现里看到敞开 watchStream 以及期待组等操作。

recvLoop

recvLoop 负责了创立监听键值的操作,通过监听 grpc 流式的操作,调用 Recv 办法取出客户端的申请,并作出对应的响应。recvLoop 的数据次要流向是从外向内。

//etcd/server/etcdserver/api/v3rpc/watch.go
func (sws *serverWatchStream) recvLoop() error {
    for {req, err := sws.gRPCStream.Recv()
        // 出错解决操作
        ......
        // 对申请进行断言判断,并别离解决各种类型的 request
        switch uv := req.RequestUnion.(type) {
        case *pb.WatchRequest_CreateRequest: // 客户端的 watch 创立申请
            ......
            // 在上游创立一个服务这个客户端监听的 watcher,客户端订阅的可能是一个 key,也可能是一个范畴内的 key,返回一个 watchid。id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
            if err == nil {
                // 将申请体信息中局部参数写入 serverWatchStream
                sws.mu.Lock()
                if creq.ProgressNotify {sws.progress[id] = true
                }// 是否返回上一个 kv
                if creq.PrevKv {sws.prevKV[id] = true
                }// 是否分包
                if creq.Fragment {sws.fragment[id] = true
                }
                sws.mu.Unlock()}       
            ......
        case *pb.WatchRequest_CancelRequest:
            // 勾销订阅申请,会实现删除订阅的 watcher 等操作
            ......
        // 其余操作解决
            ......
    }
}
​

sendLoop

watchStream 作为重要的变更数据源,sendLoop 会轮询调用它的 Chan()办法,该办法就是在获取 watchStream 中的 ch 管道中的数据。

func (sws *serverWatchStream) sendLoop() {
    ......
    // 因为客户端勾销或者其余起因导致程序返回,收尾操作,清理沉积的音讯事件
    defer func() {
        ......
        // 革除 ch 中积压的数据,不便垃圾回收?for ws := range sws.watchStream.Chan() {mvcc.ReportEventReceived(len(ws.Events))
        }
        for _, wrs := range pending {
            for _, ws := range wrs {mvcc.ReportEventReceived(len(ws.Events))
            }
        }
    }()
​
    for {
        select {case wresp, ok := <-sws.watchStream.Chan(): // 从 chan 中读取数据
            //ch 被敞开,间接返回
            if !ok {return}
​
            evs := wresp.Events
            events := make([]*mvccpb.Event, len(evs))
            ......
            // 遍历接管到的变更事件封装到 events
            for i := range evs {events[i] = &evs[i]
                ......
            }
            // 将 events 封装到 WatchResponse
            wr := &pb.WatchResponse{Header:          sws.newResponseHeader(wresp.Revision),
                WatchId:         int64(wresp.WatchID),
                Events:          events,
                CompactRevision: wresp.CompactRevision,
                Canceled:        canceled,
            }
            // 如果不是指定的 watchid,放到 pending 队列
            if _, okID := ids[wresp.WatchID]; !okID {
                // buffer if id not yet announced
                wrs := append(pending[wresp.WatchID], wr)
                pending[wresp.WatchID] = wrs
                continue
            }
​
            .....
            // 判断是否须要分包,并调用 Send 办法将事件发送到客户端
            var serr error
            if !fragmented && !ok {serr = sws.gRPCStream.Send(wr)
            } else {serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)
            }
            ......// 错误处理
        case c, ok := <-sws.ctrlStream:// 解决 ctrlStream
            // 敞开则间接返回
            ......
        case <-sws.closec:// 判断 closec 是否敞开,如果敞开,间接返回
            return
        }
    }
}

下层解决的流程能够用下图来示意,次要体现了 serverWatchStream 这个桥梁的作用。

上层解决

下面说到,再 recvLoop 中会调用 watchStream.Watch 办法,该办法会生成一个 watchID,而后调用其成员 watchable 的 watch 办法创立一个对于订阅键值的 watcher。

//etcd/server/mvcc/watcher.go
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
    // 生成 watchID 操作
    ......
    // 调用 watch 办法,次要关注 key 以及 ch 的流向
    w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
    ......
}
​
//etcd/server/mvcc/watchable_store.go
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
    //key 以及 ch 被封装到了 watcher 构造体中
    wa := &watcher{
        key:    key,
        end:    end,
        minRev: startRev,
        id:     id,
        ch:     ch,
        fcs:    fcs,
    }
    .....
}
​

watcher 的 ch 收到变更数据,就会被下层的 sendLoop 捕捉并推送给客户端。在此之前 watcher 会被放在 watchableStore 的某个汇合中,期待监听的 key 变更。

watchableStore

上文提到 watchableStore 是在 etcd 初始化时候创立的一个全局的配置项。因为客户端不止一个,监听的键值不止一对,并且 ch 如果满了可能会被阻塞,所以该配置项共配置了三个批次的 watcher 汇合,别离是 synced,unsynced,victims,别离寄存位于不同阶段的 watcher。

//etcd/server/mvcc/watchable_store.go
type watchableStore struct {
    *store
    // victims 是在变更产生,发送数据到 ch 然而通道满时,被阻塞的 watcher 汇合
    victims []watcherBatch
    victimc chan struct{}// 通道用于告诉是否须要清理 victims
​
    // 未同步实现的 watcher,unsynced watcherGroup
​
    // 曾经同步实现,在期待新的新的变更事件的 watcher 队列
    synced watcherGroup
    ......
}
​

在 etcd 启动初始化 watchableStore 时候,会启动两个异步协程清理 unsynced 和 victims 汇合中的 watcher。

//etcd/server/mvcc/watchable_store.go
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
    ......
    s := &watchableStore{store:    NewStore(lg, b, le, cfg),
        victimc:  make(chan struct{}, 1),
        unsynced: newWatcherGroup(),
        synced:   newWatcherGroup(),
        stopc:    make(chan struct{}),
    }
    // 创立两个协程,用于解决 watcher 数据
    s.wg.Add(2)
    go s.syncWatchersLoop() // 革除 unsync,每隔 100ms 调用一次 syncWatchers
    go s.syncVictimsLoop()  // 革除 victim 中沉积的 event
    return s
}

unsynced

产生 watcher 沉积的起因次要是两种,一种是当客户端执行 watch 时候指定了历史版本号,该操作须要从 boltDB 中取值,不能间接放到 synced 队列中期待新的变更,须要放到 unsync 中。

//etcd/server/mvcc/watchable_store.go
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
    //key 以及 ch 被封装到了 watcher 构造体中
    wa := &watcher{
        key:    key,
        end:    end,
        minRev: startRev,
        id:     id,
        ch:     ch,
        fcs:    fcs,
    }
    s.mu.Lock()
    s.revMu.RLock()
    // 如果指定的版本号 version 是历史的版本号,则将 watcher 放到 unsynced 中
    synced := startRev > s.store.currentRev || startRev == 0
    if synced {
        // 放到 synced
        s.synced.add(wa) // 使 sync 队列加上 watcher
    } else {
        // 放到 unsynced
        s.unsynced.add(wa)
    }
    return wa, func() { s.cancelWatcher(wa) }// 返回 watcher 以及勾销的办法    
}

victims

第二种是积压的起因是因为检测到了 watch 变更,在将数据发送到 ch 时候,ch 缓冲已满,此时须要将 watcher 存到其余区域(victims)。如果硬往里放数据,该协程会被 park 住,阻塞其余操作。

检测到 watcher 变更产生在 put 一个键值时候,此时事务提交,最终写入之前会调用 notify()办法检测是否有针对该键值的 watcher。

//etcd/server/mvcc/watchable_store_txn.go
func (tw *watchableStoreTxnWrite) End() {
    ......
    tw.s.mu.Lock()
    // 提交事件之前调用 notify()
    tw.s.notify(rev, evs)
    tw.TxnWrite.End()
    tw.s.mu.Unlock()}

func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
    var victim watcherBatch
    //newWatcherBatch 会遍历 watchableStore 的 synced 队列,并拿 evs 中 kv 比照是否有监听的 key,返回一个 watcher 汇合
    //for range 遍历 newWatcherBatch 返回的 watcher 汇合
    for w, eb := range newWatcherBatch(&s.synced, evs) {
        ......
        // 调用 send 办法将 event 发送到 ch 中,未阻塞的话,会被最上层的 sendLoop 接管到。if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {pendingEventsGauge.Add(float64(len(eb.evs))) //promethous 操作
        } else {
            // 将 watcher 增加到 victims 汇合中
            w.minRev = rev + 1
            if victim == nil {victim = make(watcherBatch)
            }
            w.victim = true
            victim[w] = eb
            // 删除 synced 队列中的该 watch
            s.synced.delete(w)
        }
    }
    s.addVictim(victim)
}

//etcd/server/mvcc/watchable_store.go
func (w *watcher) send(wr WatchResponse) bool {progressEvent := len(wr.Events) == 0
    // 过滤事件
    ......
    // 将音讯发送到 channel,如果 ch 满了就走 default
    select {
    case w.ch <- wr: 
        return true
    default:
        return false
    }
}

func (s *watchableStore) addVictim(victim watcherBatch) {
    // 空间接返回
    if victim == nil {return}
    // 减少 watcher 到 victims,并发送信号告诉
    s.victims = append(s.victims, victim)
    select {case s.victimc <- struct{}{}:
    default:
    }
}

unsynced 清理

上文提到,两个沉积队列的 watcher 清理的形式是通过两个异步协程做到的。接下来咱们先看 unsynced 队列的清理形式。

//etcd/server/mvcc/watchable_store.go
func (s *watchableStore) syncWatchersLoop() {
    for {
        ......
        // 如果队列大于 0,则进入 syncWatchers()同步 watcher
        if lastUnsyncedWatchers > 0 {unsyncedWatchers = s.syncWatchers()
        }
        ......
        // 定时器解决
        ......
    }
}

func (s *watchableStore) syncWatchers() int {
    // 选出 unsync 队列中的 watcher,返回一个 watcherGroup
    wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
    ......
    // 从 boltdb 中取出所有键值以及对应版本号
    tx := s.store.b.ReadTx()
    tx.RLock()
    revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
    tx.RUnlock()
    // 因为下面取出的是所有的 kv 对以及版本号,所有要应用 watcherGroup 筛选进去监听的键值对应事件
    evs := kvsToEvents(s.store.lg, wg, revs, vs)

    var victims watcherBatch
    wb := newWatcherBatch(wg, evs)
    for w := range wg.watchers {
        ........
        // 发送音讯到 watcher 对应的 ch,如果阻塞,放入 victims 队列中
        if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {pendingEventsGauge.Add(float64(len(eb.evs)))
        } else {
            if victims == nil {// 为空就 make 一个
                victims = make(watcherBatch)
            }
            w.victim = true// 置标记位
        }

        if w.victim {victims[w] = eb
        } else {
            .......
            // 音讯发送完了,把 watcher 放入 synced 队列,期待新的变更事件
            s.synced.add(w)
        }
        // 把 unsynced 中的 watcher 勾销掉
        s.unsynced.delete(w)
    }
    // 减少 victim
    s.addVictim(victims)
    ......
    return s.unsynced.size()}

syncWatchersLoop 通过一个定时器每隔 100ms 轮询一次 unsynced watcher 队列,如果队列不为空,就筛选出数据中的对应键值对以及相应版本号,并最终返还给客户端,将 watcher 挪动到 synced 队列。

victims 清理

victims 的清理也是通过一个异步协程的形式,如果 victims 队列不为空状况下,会始终以 10ms 的轮询速度来进行清理,在不产生拥挤时候该异步协程会阻塞在最上面的 select,当上游开释信号,则开始一波清理。

//etcd/server/mvcc/watchable_store.go
func (s *watchableStore) syncVictimsLoop() {defer s.wg.Done()

    for {
        // 通过 moveVictims 革除沉积数据
        for s.moveVictims() != 0 {}
        s.mu.RLock()
        isEmpty := len(s.victims) == 0
        s.mu.RUnlock()

        var tickc <-chan time.Time
        if !isEmpty {tickc = time.After(10 * time.Millisecond)
        }

        select {
        case <-tickc:
        case <-s.victimc:// 接管到信号,开始进行清理
        case <-s.stopc:
            return
        }
    }
}

func (s *watchableStore) moveVictims() (moved int) {
    // 把 victims 队列取出来,并置 s.victims 为空,后续应用 newVictim 代替
    s.mu.Lock()
    victims := s.victims
    s.victims = nil
    s.mu.Unlock()
    var newVictim watcherBatch
    // 遍历队列,尝试发送
    for _, wb := range victims {
        // 尝试发送,发送阻塞放到 newVictim
        for w, eb := range wb {
            rev := w.minRev - 1
            if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {pendingEventsGauge.Add(float64(len(eb.evs)))
            } else {
                if newVictim == nil {newVictim = make(watcherBatch)
                }
                newVictim[w] = eb
                continue
            }
            moved++
        }
        ......
        // 遍历并判断是否将音讯发送完了
        for w, eb := range wb {if newVictim != nil && newVictim[w] != nil {
                // couldn't send watch response; stays victim
                continue
            }
            w.victim = false
            if eb.moreRev != 0 {w.minRev = eb.moreRev}
            // 如果版本号小于以后版本,则导入 unsync 队列
            if w.minRev <= curRev {s.unsynced.add(w)
            } else {
                // 放入 sync 队列
                s.synced.add(w)
            }
        }
        s.store.revMu.RUnlock()
        s.mu.Unlock()}
    // 把新的队列搁置到 victims 中
    if len(newVictim) > 0 {s.mu.Lock()
        s.victims = append(s.victims, newVictim)
        s.mu.Unlock()}

    return moved
}

清理过程也是通过尝试发送,发送受到阻塞则放入新的 victims,发送胜利则进一步判断是将 watcher 队列挪动到 synced 或 unsynced 队列中,最初应用新的 victims 赋值,这样做保障了不会产生数据的失落。

勾销监听

在创立监听操作时候,在 watchableStore 中返回了对应的勾销办法。

//etcd/server/mvcc/watchable_store.go
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
    // 创立 watcher 以及放入队列操作
    ......
    // 返回 watcher 以及勾销的办法
    return wa, func() { s.cancelWatcher(wa) }
}
//cancelWatcher 的实现
func (s *watchableStore) cancelWatcher(wa *watcher) {
    for {s.mu.Lock()
        // 尝试从各个队列删除
        if s.unsynced.delete(wa) {
            .....
            break
        } else if s.synced.delete(wa) {
            ......
            break
        }
        .......
        //victims 删除
        var victimBatch watcherBatch
        .......
        if victimBatch != nil {
            ......
            delete(victimBatch, wa)
            break
        }
        s.mu.Unlock()
        time.Sleep(time.Millisecond)
    }

    wa.ch = nil
    s.mu.Unlock()}

//etcd/server/mvcc/watcher.go
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
    ......
    w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
    // 将 cancelWatcher 办法赋值到 watchStream 的名为 cancel 的 map 中
    ws.cancels[id] = c
    ws.watchers[id] = w
    return id, nil
}

在客户端勾销对键值的监听时候,会对应到下层的 recvLoop 中。

func (sws *serverWatchStream) recvLoop() error { // 接管客户端的申请
    for {req, err := sws.gRPCStream.Recv()
        ......
        // 对申请进行断言判断,并别离解决各种类型的 request
        switch uv := req.RequestUnion.(type) {
        case *pb.WatchRequest_CreateRequest: // 监听的可能是一个范畴
            ......
        case *pb.WatchRequest_CancelRequest:
            if uv.CancelRequest != nil {
                // 获取勾销监听的 watchID
                id := uv.CancelRequest.WatchId
                // 勾销监听操作
                err := sws.watchStream.Cancel(mvcc.WatchID(id))
                ......
            }
         ......
    }
}
    
func (ws *watchStream) Cancel(id WatchID) error {ws.mu.Lock()
    cancel, ok := ws.cancels[id]// 取出 cancel 函数
    ......
    // 执行 cancel
    cancel()
    ......
    return nil
}

小结

咱们从上到下梳理了 watch 机制的实现形式,对于咱们了解 watch 来说,最重要的是了解两个数据结构,serverWatchStream 和 watchableStore。

serverWatchStream 贯通客户端的 grpc 流与上层的变更通道,watchableStore 则保护了监听的键值汇合并在上层解决用户订阅的键值。整体流程框架能够用下图示意,能够看出,这一乏味且可靠的设计背地是对 channel 与 goroutine 的奇妙使用。

退出移动版