关于go:Go-clientSet-Watch-运行后随机性失效

50次阅读

共计 3554 个字符,预计需要花费 9 分钟才能阅读完成。

背景

List 和 Watch 机制是 kubernetes 中重要的机制之一。控制器通过 API Server 的 List API 来获取所有最新版本的 API 对象,通过 Watch API 来监听所有 API 对象的变动。
在程序设计过程中,往往也须要利用 List && Watch 机制,来察看 API 对象的状态,从而调用 EventHandler,做出响应。
基于此背景,Go 语言官网的 clientSet 中提供了相应的 API 接口供开发者应用。然而,笔者在应用 Watch 机制中踩到了不小坑。

问题

笔者在程序中创立 clientSet,并调用其 Watch 办法,监听某 Secret 资源变动,伪代码如下:

secretWatch, err := clientSet.CoreV1().Secrets("命名空间").Watch(context.TODO(), 
    metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", "API 对象名").String()})
for {for range secretWatch.ResultChan() {// 响应操作}
}

笔者在启动后,通过几番调试的确能够监听到信息,安心提交。
然而,通过一段运行工夫后,Watch 机制忽然失灵,而且无奈复原。本地 DeBug 也始终找不到异样点。

解决方案

问题剖析

clientSet 的 Watch 接口缺失可能监听了须要的信息,然而其难以解决各类异样。如下源码所示,因而当异样产生时,Watch 会主动敞开。

// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {// Stops watching. Will close the channel returned by ResultChan(). Releases
    // any resources used by the watch.
    Stop()

    // Returns a chan which will receive all the events. If an error occurs
    // or Stop() is called, this channel will be closed, in which case the
    // watch should be completely cleaned up.!!!明确说了在呈现谬误或者被调用 Stop 时,通道会主动敞开的
    ResultChan() <-chan Event}

解决方案

应用

应用 RetryWatcher 来替换传统 Watcher,简略来说,此类 RetryWatcher 可能保障 Watch 主动敞开后,从新拉起一个 Watcher,使得程序持续失常运行。
应用办法如下:

secretWatch, err := watchtools.NewRetryWatcher("资源版本", &cache.ListWatch{WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {return clientSet.CoreV1().Secrets("命名空间").Watch(context.TODO(), metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", "API 对象名称").String(),})
    },
})
for {for range secretWatch.ResultChan() {// 响应操作}
}

代码解读

理解应用后,笔者带着大家来深究一下源码:
首先来看看 newRetryWatcher 函数(NewRetryWatcher 接口会调用该外部函数),笔者发现其中启动了一个 rw.receive() 的协程。receive() 的官网解释是“reads the result from a watcher, restarting it if necessary.”,示意除了失常 Watch 之外,在必要时重启该 Watch。

func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
    switch initialResourceVersion {
    case "","0":
        // TODO: revisit this if we ever get WATCH v2 where it means start "now"
        //       without doing the synthetic list of objects at the beginning (see #74022)
        return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
    default:
        break
    }

    rw := &RetryWatcher{
        lastResourceVersion: initialResourceVersion,
        watcherClient:       watcherClient,
        stopChan:            make(chan struct{}),
        doneChan:            make(chan struct{}),
        resultChan:          make(chan watch.Event, 0),
        minRestartDelay:     minRestartDelay,
    }

    go rw.receive()
    return rw, nil
}

更深层次的代码须要理解 doReceive() 函数(如下代码)。若失常完结,doReceive() 返回 true,下层 receive 函数会调用 cancel 退出程序。若不失常完结,则返回 false,receive 会调用 NonSlidingUntilWithContext 重建 Watcher 持续监听。

func (rw *RetryWatcher) receive() {ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go func() {
        select {
        case <-rw.stopChan:
            cancel()
            return
        case <-ctx.Done():
            return
        }
    }()

    // We use non sliding until so we don't introduce delays on happy path when WATCH call
    // timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
    wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {done, retryAfter := rw.doReceive()
        if done {cancel()
            return
        }

        time.Sleep(retryAfter)

        klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
    }, rw.minRestartDelay)
}

func (rw *RetryWatcher) doReceive() (bool, time.Duration) {// ...}

为了升高浏览难度,笔者将 doReceive() 重要的局部拆解。

  • 首先:因为 RetryWatch 最终还是调用的传统 Watch 函数,因而,先捕获获取 Watch 中呈现的 error,若发现则返回 false;
watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
        ResourceVersion:     rw.lastResourceVersion,
        AllowWatchBookmarks: true,
    })
    switch err {
    case nil:
        break

    case io.EOF:
        // ...
    // ...
    }
  • 其次:在获取音讯时,同样可能会出现异常,返回 false。
for {
        select {
        case <-rw.stopChan:
            // ...
        case event, ok := <-ch:
            if !ok {// ...}
            switch event.Type {
            case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
                // ...
            case watch.Error:
                // ...
            default:
                // ...
            }
        }
    }

正文完
 0