背景

公司目前在基于k8s做调度(基于io.fabric8:kubernetes-client:4.2.0),在运行的过程中,遇到了如下问题:

OkHttp WebSocket https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket close received. code: 1000, reason: [OkHttp WebSocket https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Submitting reconnect task to the executor[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Scheduling reconnect task[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Current reconnect backoff is 1000 milliseconds (T0)[reconnectAttempt|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@700f518a199 - 2020-11-17T06:39:13.874Z -[merlion-k8s-backend]-[merlion-k8s-backend-6b4cc44855-s6wnq]: 06:39:13.873 [OkHttp https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket successfully opened  WARN PodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 135562761 (135563127)at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:254)[kubernetes-client-4.2.2.jar:?]at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [okhttp-3.12.0.jar:?]at okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [okhttp-3.12.0.jar:?]at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.12.0.jar:?]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

单凭这个问题其实没什么,然而代码中是:

  watchConnection = kubernetesClient.pods()         .withLabel(MERLION_TASK_LABEL, applicationId)         //      .withResourceVersion(resourceVersion)         .watch(new TaskPodsWatcher())

因为咱们曾经正文掉了withResourceVersion(resourceVersion),(如果没有正文掉,阐明咱们的代码中设置的resourceVersion太小)然而还会报too old resource version

剖析

间接跳转到WatchConnectionManager onClosed 如下:

     @Override      public void onClosed(WebSocket webSocket, int code, String reason) {        logger.debug("WebSocket close received. code: {}, reason: {}", code, reason);        if (forceClosed.get()) {          logger.debug("Ignoring onClose for already closed/closing websocket");          return;        }        if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {          closeEvent(new KubernetesClientException("Connection unexpectedly closed"));          return;        }        scheduleReconnect();      }

对于onclosed的解释是

 /**   * Invoked when both peers have indicated that no more messages will be transmitted and the   * connection has been successfully released. No further calls to this listener will be made.   */  public void onClosed(WebSocket webSocket, int code, String reason) {  }

阐明因为长时间没有event的传输,导致该connect被开释了,从而导致WebSocket 被敞开了(这种在工作不是很多的状况下产生的概率很大),从而进行了重联操作scheduleReconnect,而该办法调用了runWatch():

executor.schedule(new NamedRunnable("reconnectAttempt") {              @Override              public void execute() {                try {                  runWatch();                  reconnectPending.set(false);                } catch (Exception e) {                  // An unexpected error occurred and we didn't even get an onFailure callback.                  logger.error("Exception in reconnect", e);                  webSocketRef.set(null);                  closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));                  close();                }              }            }, nextReconnectInterval(), TimeUnit.MILLISECONDS);          }

而在runWatch()办法中,咱们又调用了

 if (this.resourceVersion.get() != null) {      httpUrlBuilder.addQueryParameter("resourceVersion", this.resourceVersion.get());    }

而this.resourceVersion 值的设置在 public void onMessage(WebSocket webSocket, String message) 办法中:

WatchEvent event = readWatchEvent(message);          Object object = event.getObject();          if (object instanceof HasMetadata) {            @SuppressWarnings("unchecked")            T obj = (T) object;            // Dirty cast - should always be valid though            resourceVersion.set(((HasMetadata) obj).getMetadata().getResourceVersion());            Watcher.Action action = Watcher.Action.valueOf(event.getType());            watcher.eventReceived(action, obj);          } else if (object instanceof KubernetesResourceList) {            @SuppressWarnings("unchecked")            KubernetesResourceList list = (KubernetesResourceList) object;            // Dirty cast - should always be valid though            resourceVersion.set(list.getMetadata().getResourceVersion());            Watcher.Action action = Watcher.Action.valueOf(event.getType());            List<HasMetadata> items = list.getItems();            if (items != null) {              for (HasMetadata item : items) {                watcher.eventReceived(action, (T) item);              }            }

也就是说,如果说如果重联的时候间隔上次设置resourceVersion超过了etc保留的最小resourceVersion的话,就会报too old resource version谬误:

解决

通过网上查问kubernetes-too-old-resource-version,该Kubernetes Client team memeber 提到了:

Fabric8 does not handle it with plain watch. But it is handling it in SharedInformer API, see ReflectorWatcher. I would recommend using informer API when writing operators since it's better than plain list and watch

也就是说,咱们能够用SharedInformer api来实现,而watch机制解决不了这种状况,所以咱们能够用SharedInformer实现,截止到2020年11月16日,咱们获取到kubernetes-client最新版本,kubernetes-client:4.13.0,编码实现:

val sharedInformerFactory = kubernetesClient.informers()    val podInformer = sharedInformerFactory      .sharedIndexInformerFor(classOf[Pod], classOf[PodList],        new OperationContext().withNamespace("test"), 30 * 1000L)    podInformer.addEventHandler(new ResourceEventHandler[Pod] {      override def onAdd(obj: Pod): Unit = {        eventReceived(obj, "ADD")      }      override def onDelete(obj: Pod, deletedFinalStateUnknown: Boolean): Unit = {        eventReceived(obj, "DELETE")      }      override def onUpdate(oldObj: Pod, newObj: Pod): Unit = {        eventReceived(newObj, "UPDATE")      }      private def idShouldUpdate(pod: Pod): Boolean = {        pod.getMetadata.getLabels.getOrDefault(MERLION_TASK_LABEL, "") == applicationId      }      private def eventReceived(pod: Pod, action: String): Unit = {        if (idShouldUpdate(pod)) {          val podName = pod.getMetadata.getName          logger.info(s"Received job pod update for pod named $podName, action ${action}")          snapshotsStore.updatePod(pod)        }      }    })    sharedInformerFactory.startAllRegisteredInformers() }

其中SharedInformerFactory的机制和k8s Informer机制一样的,可能保障音讯的可靠性,
其中最次要的是ReflectorWatcher和Reflector和DefaultSharedIndexInformer,咱们简略的剖析一下:

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {    this.resyncCheckPeriodMillis = resyncPeriod;    this.defaultEventHandlerResyncPeriod = resyncPeriod;    this.processor = new SharedProcessor<>();    this.indexer = new Cache();    DeltaFIFO<T> fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer);    this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners);    controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName());  }

DefaultSharedIndexInformer 中,用DeltaFIFO作为event的存储,而this::handleDeltas的调用是在Controller作为this.queue.pop 的参数processFunc函数被调用,也就是说这个函数来生产fifo外面的event,如下:

private void processLoop() throws Exception {    while (true) {      try {        this.queue.pop(this.processFunc);      } catch (InterruptedException t) {        log.error("DefaultController#processLoop got interrupted {}", t.getMessage(), t);        return;      } catch (Exception e) {        log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e);        throw e;      }    }  

而queue也是DeltaFIFO的形参传进来的,也就是说queue就是fifo,而fifo外面的数据从哪里来呢?在controller::run函数中:

 if (fullResyncPeriod > 0) {          reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);        } else {          reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);        }reflector.listAndWatch()

将会调用reflector.listAndWatch()办法,该办法进行相似k8s的list-watch机制,如下:

public void listAndWatch() throws Exception {    try {      log.info("Started ReflectorRunnable watch for {}", apiTypeClass);      reListAndSync();      resyncExecutor.scheduleWithFixedDelay(this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS);      startWatcher();    } catch (Exception exception) {      store.isPopulated(false);      throw new RejectedExecutionException("Error while starting ReflectorRunnable watch", exception);    }  }

reListAndSync进行全量event数据的拉取,startWatcher进行watch,获取增量event数据,那这个watch是什么呢?如下:

 watch.set(        listerWatcher.watch(new ListOptionsBuilder()          .withWatch(Boolean.TRUE).withResourceVersion(lastSyncResourceVersion.get()).withTimeoutSeconds(null).build(),        operationContext.getNamespace(), operationContext, watcher)      )

这里的watcher在reflector的构造函数中初始化

watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);

而ReflectorWatcher是继承自Watcher,所以也有对应的eventReceived办法和onClose办法,如下:

@Override  public void eventReceived(Action action, T resource) {    if (action == null) {      final String errorMessage = String.format("Unrecognized event %s", resource.getMetadata().getName());      log.error(errorMessage);      throw new KubernetesClientException(errorMessage);    }    log.info("Event received {}", action.name());    switch (action) {      case ERROR:        final String errorMessage = String.format("ERROR event for %s", resource.getMetadata().getName());        log.error(errorMessage);        throw new KubernetesClientException(errorMessage);      case ADDED:        store.add(resource);        break;      case MODIFIED:        store.update(resource);        break;      case DELETED:        store.delete(resource);        break;    }    lastSyncResourceVersion.set(resource.getMetadata().getResourceVersion());    log.debug("{}#Receiving resourceVersion {}", resource.getKind(), lastSyncResourceVersion.get());  }  @Override  public void onClose(KubernetesClientException exception) {    log.error("Watch closing");    Optional.ofNullable(exception)      .map(e -> {        log.debug("Exception received during watch", e);        return exception;      })      .map(KubernetesClientException::getStatus)      .map(Status::getCode)      .filter(c -> c.equals(HttpURLConnection.HTTP_GONE))      .ifPresent(c -> onHttpGone.run());    onClose.run();  }

在eventReceived办法中,所有音讯的都会增加到store中也就是fifo的queue中,在onClose办法中,咱们看到如果HTTP_GONE,也就是too old resource version的话,会进行onHttpGone.run(),也会进行onClose.run(),而
onHttpGone就是Reflector的reListAndSync函数,onClose是Reflector的startWatcher函数,也就是说一旦该watcher被敞开,就会从新进行watch。

留神

在kubernetes-client:4.6.2中,WatchConnectionManager onMessage 对于HTTP_GONE的解决是不一样的,如下:

if (status.getCode() == HTTP_GONE) {                logger.info("The resource version {} no longer exists. Scheduling a reconnect.", resourceVersion.get());                resourceVersion.set(null);                scheduleReconnect();              } else {                logger.error("Error received: {}", status.toString());              }

一旦产生了HTTP_GONE,,会把resourceVersion设置为null,也就是获取最新的event,而且会立刻重联,而在4.13.0版本和4.2.0版本,是不会立刻重联,而是让用户去解决的。