作者:丛国庆
在 Kubernetes(简称 K8s,一个可移植容器的编排管理工具)体系中,etcd 存储集群的数据信息,kube-apiserver 作为对立入口,任何对数据的操作都必须通过 kube-apiserver。因而 Dubbo 想要以 Kubernetes 作为注册核心,必须调用 kube-apiserver 获取服务地址列表,那是以什么样的机制放弃信息的可靠性、实时性、程序性、高性能呢?答案就是 基于 List/Watch 的 Informer 组件。
List/Watch 机制介绍
List / Watch 机制是 Kubernetes 中实现集群管制模块最外围的设计之一,它采纳对立的异步音讯解决机制,保障了音讯的实时性、可靠性、程序性和性能等,为申明式格调的 API 奠定了良好的根底。
List 是向 kube-apiserver 调用 list API 获取资源列表,基于 HTTP 短链接实现。
Watch 则是向 kube-apiserver 调用 watch API 监听资源变更事件,基于 HTTP 长链接,通过 Chunked transfer encoding(分块传输编码) 来实现音讯告诉。
当客户端调用 watch API 时,kube-apiserver 在 response 的 HTTP Header 中设置 Transfer-Encoding 的值为 chunked,示意采纳分块传输编码,客户端收到该信息后,便和服务端连贯,并期待下一个数据块,即资源的事件信息。例如:
$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/endpoints?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 14 Seo 2022 20:22:59 GMT
Transfer-Encoding: chunked
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
Dubbo 基于 Watch 的服务发现
以上图为例,dubbo-kubernetes 服务发现以 Kubernetes 为 Registry,provider 注册地址到注册核心,consumer 从注册核心读取和订阅 provider 地址列表。在 Dubbo3.1 版本之前,consumer 订阅是通过 Fabric8 Kubernetes Java Client 提供的 watch API 实现,监听 kube-apiserver 中资源的 create、update 和 delete 事件,如下:
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {Watch watch = kubernetesClient.endpoints()
.inNamespace(namespace).withName(serviceName).watch(new Watcher<Endpoints>() {
// 资源更改的事件回调
@Override
public void eventReceived(Action action, Endpoints resource) {notifyServiceChanged(serviceName, listener);
...
}
});
...
}
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
...
listener.onEvent(event);
...
}
监听到资源变动后,调用 notifyServiceChanged 办法从 kube-apiserver 全量拉取资源 list 数据,放弃 Dubbo 本地侧服务列表。
@Override
public List<ServiceInstance> getInstances(String serviceName){
// 间接调用 kube-apiserver
Endpoints endpoints = kubernetesClient.endpoints().inNamespace(namespace)
.withName(serviceName).get();
return toServiceInstance(endpoints, serviceName);
}
这样的操作存在很重大的问题,因为 watch 对应的回调函数会将更新的资源返回,Dubbo 社区思考到保护老本较高,之前并没有在本地保护对于 CRD 资源的缓存,这样每次监听到变动后调用 list 从 kube-apiserver 获取对应 serviceName 的 endpoints 信息,无疑减少了一次对 kube-apiserver 的间接拜访。
kubernetes-client 为解决客户端须要自行保护缓存的问题,推出了 informer 机制。
Informer 机制介绍
Informer 模块是 Kubernetes 中的根底组件,以 List/Watch 为根底,负责各组件与 kube-apiserver 的资源与事件同步。Kubernetes 中的组件,如果要拜访 Kubernetes 中的 Object,绝大部分状况下会应用 Informer 中的 Lister()办法,而非间接调用 kube-apiserver。
以 Pod 资源为例,介绍下 informer 的要害逻辑(与下图步骤一一对应):
- Informer 在初始化时,Reflector 会先调用 List 取得所有的 Pod,同时调用 Watch 长连贯监听 kube-apiserver。
- Reflector 拿到全副 Pod 后,将 Add Pod 这个事件发送到 DeltaFIFO。
- DeltaFIFO 随后 pop 这个事件到 Informer 解决。
- Informer 向 Indexer 公布 Add Pod 事件。
- Indexer 接到告诉后,间接操作 Store 中的数据(key->value 格局)。
- Informer 触发 EventHandler 回调。
- 将 key 推到 Workqueue 队列中。
- 从 WorkQueue 中 pop 一个 key。
- 而后依据 key 去 Indexer 取到 val。依据以后的 EventHandler 进行 Add Pod 操作(用户自定义的回调函数)。
- 随后当 Watch 到 kube-apiserver 资源有扭转的时候,再反复 2-9 步骤。
(来源于 kubernetes/sample-controller)
Informer 要害设计
- 本地缓存:Informer 只会调用 K8s List 和 Watch 两种类型的 API。Informer 在初始化的时,先调用 List 取得某种 resource 的全副 Object,缓存在内存中; 而后,调用 Watch API 去 watch 这种 resource,去保护这份缓存; 最初,Informer 就不再调用 kube-apiserver。Informer 形象了 cache 这个组件,并且实现了 store 接口,后续获取资源间接通过本地的缓存来进行获取。
- 无界队列:为了协调数据生产与生产的不统一状态,在客户端中通过实现了一个无界队列 DeltaFIFO 来进行数据的缓冲,当 reflector 获取到数据之后,只须要将数据推到到 DeltaFIFO 中,则就能够持续 watch 后续事件,从而缩小阻塞工夫,如上图 2-3 步骤所示。
- 事件去重:在 DeltaFIFO 中,如果针对某个资源的事件反复被触发,则就只会保留雷同事件最初一个事件作为后续解决,有 resourceVersion 惟一键保障,不会反复生产。
- 复用连贯:每一种资源都实现了 Informer 机制,容许监控不同的资源事件。为了防止同一个资源建设多个 Informer,每个 Informer 应用一个 Reflector 与 apiserver 建设链接,导致 kube-apiserver 负载过高的状况,K8s 中形象了 sharedInformer 的概念,即共享的 Informer, 能够使同一类资源 Informer 共享一个 Reflector。外部定义了一个 map 字段,用于寄存所有 Infromer 的字段。针对同一资源只建设一个连贯,减小 kube-apiserver 的负载。
Dubbo 引入 Informer 机制后的服务发现
Dubbo 3.1.1 后引入 Informer 机制,Informer 组件会利用其个性在 consumer 侧内存中保护 Kubernetes 环境中的所有地址列表。
资源监听由 Watch API 更换为 Informer API
以 Endpoints 为例,将本来的 watch 替换为 Informer,回调函数别离为 onAdd、onUpdate、onDelete,回调参数传的都是 Informer store 中的资源全量值。
/**
* 监听 Endpoints
*/
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
SharedIndexInformer<Endpoints> endInformer = kubernetesClient
.endpoints().inNamespace(namespace)
.withName(serviceName).inform(new ResourceEventHandler<Endpoints>() {
@Override
public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {notifyServiceChanged(serviceName, listener, toServiceInstance(newEndpoints, serviceName));
}
// 省略掉 onAdd 和 onDelete
...
});
...
}
/**
* 告诉订阅者 Service 扭转
*/
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
// 公布事件
listener.onEvent(event);
}
getInstances() 优化
引入 Informer 后,无需间接调用 List 接口,而是间接从 Informer 的 store 中获取,缩小对 kube-apiserver 的间接调用。
public List<ServiceInstance> getInstances(String serviceName) {
Endpoints endpoints = null;
SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
if (endInformer != null) {
// 间接从 informer 的 store 中获取 Endpoints 信息
List<Endpoints> endpointsList = endInformer.getStore().list();
if (endpointsList.size() > 0) {endpoints = endpointsList.get(0);
}
}
// 如果 endpoints 通过下面解决仍为空,属于异常情况,那就从 kube-apiserver 拉取
if (endpoints == null) {endpoints = kubernetesClient.endpoints()
.inNamespace(namespace).withName(serviceName).get();}
return toServiceInstance(endpoints, serviceName);
}
论断
优化为 Informer 后,Dubbo 的服务发现不必每次间接调用 kube-apiserver,减小了 kube-apiserver 的压力,也大大减少了响应工夫,助力 Dubbo 从传统架构迁徙到 Kubernetes 中。