共计 10453 个字符,预计需要花费 27 分钟才能阅读完成。
背景
公司目前在基于 k8s 做调度(基于 io.fabric8:kubernetes-client:4.2.0),在运行的过程中,遇到了如下问题:
OkHttp WebSocket https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket close received. code: 1000, reason:
[OkHttp WebSocket https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Submitting reconnect task to the executor
[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Scheduling reconnect task
[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Current reconnect backoff is 1000 milliseconds (T0)
[reconnectAttempt|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@700f518a
199 - 2020-11-17T06:39:13.874Z -[merlion-k8s-backend]-[merlion-k8s-backend-6b4cc44855-s6wnq]: 06:39:13.873 [OkHttp https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket successfully opened
WARN PodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 135562761 (135563127)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:254)[kubernetes-client-4.2.2.jar:?]
at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [okhttp-3.12.0.jar:?]
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [okhttp-3.12.0.jar:?]
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.12.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
单凭这个问题其实没什么,然而代码中是:
watchConnection = kubernetesClient.pods()
.withLabel(MERLION_TASK_LABEL, applicationId)
// .withResourceVersion(resourceVersion)
.watch(new TaskPodsWatcher())
因为咱们曾经正文掉了 withResourceVersion(resourceVersion)
,(如果没有正文掉,阐明咱们的代码中设置的 resourceVersion 太小)然而还会报 too old resource version
剖析
间接跳转到 WatchConnectionManager onClosed 如下:
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {logger.debug("WebSocket close received. code: {}, reason: {}", code, reason);
if (forceClosed.get()) {logger.debug("Ignoring onClose for already closed/closing websocket");
return;
}
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {closeEvent(new KubernetesClientException("Connection unexpectedly closed"));
return;
}
scheduleReconnect();}
对于 onclosed 的解释是
/**
* Invoked when both peers have indicated that no more messages will be transmitted and the
* connection has been successfully released. No further calls to this listener will be made.
*/
public void onClosed(WebSocket webSocket, int code, String reason) {}
阐明因为长时间没有 event 的传输,导致该 connect 被开释了,从而导致 WebSocket 被敞开了(这种在工作不是很多的状况下产生的概率很大),从而进行了重联操作 scheduleReconnect,而该办法调用了 runWatch():
executor.schedule(new NamedRunnable("reconnectAttempt") {
@Override
public void execute() {
try {runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
close();}
}
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
}
而在 runWatch() 办法中,咱们又调用了
if (this.resourceVersion.get() != null) {httpUrlBuilder.addQueryParameter("resourceVersion", this.resourceVersion.get());
}
而 this.resourceVersion 值的设置在 public void onMessage(WebSocket webSocket, String message) 办法中:
WatchEvent event = readWatchEvent(message);
Object object = event.getObject();
if (object instanceof HasMetadata) {@SuppressWarnings("unchecked")
T obj = (T) object;
// Dirty cast - should always be valid though
resourceVersion.set(((HasMetadata) obj).getMetadata().getResourceVersion());
Watcher.Action action = Watcher.Action.valueOf(event.getType());
watcher.eventReceived(action, obj);
} else if (object instanceof KubernetesResourceList) {@SuppressWarnings("unchecked")
KubernetesResourceList list = (KubernetesResourceList) object;
// Dirty cast - should always be valid though
resourceVersion.set(list.getMetadata().getResourceVersion());
Watcher.Action action = Watcher.Action.valueOf(event.getType());
List<HasMetadata> items = list.getItems();
if (items != null) {for (HasMetadata item : items) {watcher.eventReceived(action, (T) item);
}
}
也就是说,如果说如果重联的时候间隔上次设置 resourceVersion 超过了 etc 保留的最小 resourceVersion 的话,就会报 too old resource version 谬误:
解决
通过网上查问 kubernetes-too-old-resource-version, 该 Kubernetes Client team memeber 提到了:
Fabric8 does not handle it with plain watch. But it is handling it in SharedInformer API, see ReflectorWatcher. I would recommend using informer API when writing operators since it's better than plain list and watch
也就是说, 咱们能够用 SharedInformer api 来实现,而 watch 机制解决不了这种状况,所以咱们能够用 SharedInformer 实现,截止到 2020 年 11 月 16 日,咱们获取到 kubernetes-client 最新版本,kubernetes-client:4.13.0, 编码实现:
val sharedInformerFactory = kubernetesClient.informers()
val podInformer = sharedInformerFactory
.sharedIndexInformerFor(classOf[Pod], classOf[PodList],
new OperationContext().withNamespace("test"), 30 * 1000L)
podInformer.addEventHandler(new ResourceEventHandler[Pod] {override def onAdd(obj: Pod): Unit = {eventReceived(obj, "ADD")
}
override def onDelete(obj: Pod, deletedFinalStateUnknown: Boolean): Unit = {eventReceived(obj, "DELETE")
}
override def onUpdate(oldObj: Pod, newObj: Pod): Unit = {eventReceived(newObj, "UPDATE")
}
private def idShouldUpdate(pod: Pod): Boolean = {pod.getMetadata.getLabels.getOrDefault(MERLION_TASK_LABEL, "") == applicationId
}
private def eventReceived(pod: Pod, action: String): Unit = {if (idShouldUpdate(pod)) {
val podName = pod.getMetadata.getName
logger.info(s"Received job pod update for pod named $podName, action ${action}")
snapshotsStore.updatePod(pod)
}
}
})
sharedInformerFactory.startAllRegisteredInformers()}
其中 SharedInformerFactory 的机制和 k8s Informer 机制一样的,可能保障音讯的可靠性,
其中最次要的是 ReflectorWatcher 和 Reflector 和 DefaultSharedIndexInformer, 咱们简略的剖析一下:
public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this.resyncCheckPeriodMillis = resyncPeriod;
this.defaultEventHandlerResyncPeriod = resyncPeriod;
this.processor = new SharedProcessor<>();
this.indexer = new Cache();
DeltaFIFO<T> fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer);
this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners);
controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName());
}
DefaultSharedIndexInformer 中,用 DeltaFIFO 作为 event 的存储,而 this::handleDeltas 的调用是在 Controller 作为 this.queue.pop 的参数 processFunc 函数被调用,也就是说这个函数来生产 fifo 外面的 event, 如下:
private void processLoop() throws Exception {while (true) {
try {this.queue.pop(this.processFunc);
} catch (InterruptedException t) {log.error("DefaultController#processLoop got interrupted {}", t.getMessage(), t);
return;
} catch (Exception e) {log.error("DefaultController#processLoop recovered from crashing {}", e.getMessage(), e);
throw e;
}
}
而 queue 也是 DeltaFIFO 的形参传进来的,也就是说 queue 就是 fifo,而 fifo 外面的数据从哪里来呢?在 controller::run 函数中:
if (fullResyncPeriod > 0) {reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);
} else {reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);
}
reflector.listAndWatch()
将会调用 reflector.listAndWatch() 办法,该办法进行相似 k8s 的 list-watch 机制,如下:
public void listAndWatch() throws Exception {
try {log.info("Started ReflectorRunnable watch for {}", apiTypeClass);
reListAndSync();
resyncExecutor.scheduleWithFixedDelay(this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS);
startWatcher();} catch (Exception exception) {store.isPopulated(false);
throw new RejectedExecutionException("Error while starting ReflectorRunnable watch", exception);
}
}
reListAndSync 进行全量 event 数据的拉取,startWatcher 进行 watch,获取增量 event 数据,那这个 watch 是什么呢?如下:
watch.set(listerWatcher.watch(new ListOptionsBuilder()
.withWatch(Boolean.TRUE).withResourceVersion(lastSyncResourceVersion.get()).withTimeoutSeconds(null).build(),
operationContext.getNamespace(), operationContext, watcher)
)
这里的 watcher 在 reflector 的构造函数中初始化
watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);
而 ReflectorWatcher 是继承自 Watcher,所以也有对应的 eventReceived 办法和 onClose 办法,如下:
@Override
public void eventReceived(Action action, T resource) {if (action == null) {final String errorMessage = String.format("Unrecognized event %s", resource.getMetadata().getName());
log.error(errorMessage);
throw new KubernetesClientException(errorMessage);
}
log.info("Event received {}", action.name());
switch (action) {
case ERROR:
final String errorMessage = String.format("ERROR event for %s", resource.getMetadata().getName());
log.error(errorMessage);
throw new KubernetesClientException(errorMessage);
case ADDED:
store.add(resource);
break;
case MODIFIED:
store.update(resource);
break;
case DELETED:
store.delete(resource);
break;
}
lastSyncResourceVersion.set(resource.getMetadata().getResourceVersion());
log.debug("{}#Receiving resourceVersion {}", resource.getKind(), lastSyncResourceVersion.get());
}
@Override
public void onClose(KubernetesClientException exception) {log.error("Watch closing");
Optional.ofNullable(exception)
.map(e -> {log.debug("Exception received during watch", e);
return exception;
})
.map(KubernetesClientException::getStatus)
.map(Status::getCode)
.filter(c -> c.equals(HttpURLConnection.HTTP_GONE))
.ifPresent(c -> onHttpGone.run());
onClose.run();}
在 eventReceived 办法中,所有音讯的都会增加到 store 中也就是 fifo 的 queue 中, 在 onClose 办法中,咱们看到如果 HTTP_GONE,也就是 too old resource version 的话,会进行 onHttpGone.run(), 也会进行 onClose.run(), 而
onHttpGone 就是 Reflector 的 reListAndSync 函数,onClose 是 Reflector 的 startWatcher 函数, 也就是说一旦该 watcher 被敞开,就会从新进行 watch。
留神
在 kubernetes-client:4.6.2 中,WatchConnectionManager onMessage 对于 HTTP_GONE 的解决是不一样的, 如下:
if (status.getCode() == HTTP_GONE) {logger.info("The resource version {} no longer exists. Scheduling a reconnect.", resourceVersion.get());
resourceVersion.set(null);
scheduleReconnect();} else {logger.error("Error received: {}", status.toString());
}
一旦产生了 HTTP_GONE,, 会把 resourceVersion 设置为 null, 也就是获取最新的 event,而且会立刻重联,而在 4.13.0 版本和 4.2.0 版本,是不会立刻重联,而是让用户去解决的。