关于阿里云:OpenYurt-之-Yurthub-数据过滤框架解析

34次阅读

共计 8443 个字符,预计需要花费 22 分钟才能阅读完成。

作者:应健健,新华智云计算中心

OpenYurt 是业界首个非侵入的边缘计算云原生开源我的项目,通过边缘自治,云边协同,边缘单元化,边缘流量闭环等能力为用户提供云边一体化的应用体验。在 Openyurt 里边缘网络能够应用数据过滤框架在不同节点池里实现边缘流量闭环能力。

Yurthub 数据过滤框架解析

Yurthub 实质上是一层 kube-apiserver 的代理,在代理的根底上加了一层 cache,一来保障边缘节点离线的状况下能够应用本地 cache 保障业务稳定性, 无效的解决了边缘自治的问题。二来能够升高大量的 list & watch 操作对云上 api 产生肯定的负载。

Yurthub 的数据过滤通过节点上的 pod 以及 kubelet 的申请通过 Load Balancer 发送给 kube-apiserver,代理接管到响应音讯进行数据过滤解决,之后再将过滤后的数据返回给申请方。如果节点是边缘节点会依据申请类型对响应申请体中的资源进行本地缓存,如果是云端节点思考到网络状态良好不进行本地缓存。

Yurthub 的过滤框架实现原理图:

Yurthub 目前蕴含四种过滤规定,通过 addons 申请的 user-agent,resource,verb 判断通过那个过滤器进行相应的数据过滤。

四种过滤规定性能及实现

ServiceTopologyFilter 

次要针对 EndpointSlice 资源进行数据过滤, 但 Endpoint Slice 个性须要在 Kubernetes v1.18 或以上版本能力反对,如果在 1.18 版本以下倡议应用 endpointsFilter 过滤器。当通过该过滤器首先通过 kubernetes.io/service-name 找到 endpointSlice 资源所对应的 services 资源,之后判断 servces 资源是否存在 openyurt.io/topologyKeys 这个 Annotations,如果存在那么通过这个 Annotations 的值判断数据过滤规定,最初更新 response data 返回给 addons。

Annotations 的值分为两大类:

1、kubernetes.io/hostname: 只过滤出雷同节点的 endpoint ip

2、openyurt.io/nodepool 或者 kubernetes.io/zone: 通过这个 Annotations 获取对应节点池,最初遍历 endpointSlice 资源,通过 endpointSlice 里的 topology 字段中的 kubernetes.io/hostname 字段在 endpointSlice 对象里找到对应的 Endpoints,之后重组 endpointSlice 里的 Endpoints 后返回给 addons。

代码实现:

func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice {
   var serviceTopologyType string
   // get the service Topology type
   if svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok {svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName)
      if err != nil {klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err)
         return endpointSlice
      }
 
      if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok {klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey)
         return endpointSlice
      }
   }
 
   var newEps []discovery.Endpoint
   // if type of service Topology is 'kubernetes.io/hostname'
   // filter the endpoint just on the local host
   if serviceTopologyType == AnnotationServiceTopologyValueNode {
      for i := range endpointSlice.Endpoints {if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {newEps = append(newEps, endpointSlice.Endpoints[i])
         }
      }
      endpointSlice.Endpoints = newEps
   } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone {
      // if type of service Topology is openyurt.io/nodepool
      // filter the endpoint just on the node which is in the same nodepool with current node
      currentNode, err := fh.nodeGetter(fh.nodeName)
      if err != nil {klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err)
         return endpointSlice
      }
      if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {nodePool, err := fh.nodePoolLister.Get(nodePoolName)
         if err != nil {klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err)
            return endpointSlice
         }
         for i := range endpointSlice.Endpoints {if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {newEps = append(newEps, endpointSlice.Endpoints[i])
            }
         }
         endpointSlice.Endpoints = newEps
      }
   }
   return endpointSlice
}

EndpointsFilter

针对 endpoints 资源进行相应的数据过滤,首先判断 endpoint 是否存在对应的 service,通过 node 的 label: apps.openyurt.io/nodepool 获取节点池,之后获取节点池下的所有节点,遍历 endpoints.Subsets 下的资源找出同一个节点池的 Ready pod address 以及 NotReady pod address 重组成新的 endpoints 之后返回给 addons。

func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
   svcName := endpoints.Name
   _, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
   if err != nil {klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
      return endpoints
   }
   // filter the endpoints on the node which is in the same nodepool with current node
   currentNode, err := fh.nodeGetter(fh.nodeName)
   if err != nil {klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
      return endpoints
   }
   if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {nodePool, err := fh.nodePoolLister.Get(nodePoolName)
      if err != nil {klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
         return endpoints
      }
      var newEpSubsets []v1.EndpointSubset
      for i := range endpoints.Subsets {endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool)
         endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool)
         if endpoints.Subsets[i].Addresses != nil || endpoints.Subsets[i].NotReadyAddresses != nil {newEpSubsets = append(newEpSubsets, endpoints.Subsets[i])
         }
      }
      endpoints.Subsets = newEpSubsets
      if len(endpoints.Subsets) == 0 {
         // this endpoints has no nodepool valid addresses for ingress controller, return nil to ignore it
         return nil
      }
   }
   return endpoints
}

MasterServiceFilter

针对 services 下的域名进行 ip 以及端口替换,这个过滤器的场景次要在于边缘端的 pod 无缝应用 InClusterConfig 拜访集群资源。

func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err)
      return b, nil
   }
 
   // return data un-mutated if not ServiceList
   serviceList, ok := list.(*v1.ServiceList)
   if !ok {return b, nil}
 
   // mutate master service
   for i := range serviceList.Items {if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName {serviceList.Items[i].Spec.ClusterIP = fh.host
         for j := range serviceList.Items[i].Spec.Ports {if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName {serviceList.Items[i].Spec.Ports[j].Port = fh.port
               break
            }
         }
         klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req))
         break
      }
   }
 
   // return the mutated serviceList
   return fh.serializer.Encode(serviceList)
}

DiscardCloudService

该过滤器针对两种 service 其中的一种类型是 LoadBalancer,因为边缘端无法访问 LoadBalancer 类型的资源,所以该过滤器会将这种类型的资源间接过滤掉。另外一种是针对 kube-system 名称空间下的 x-tunnel-server-internal-svc,这个 services 次要存在 cloud 节点用于拜访 yurt-tunnel-server,对于 edge 节点会间接过滤掉该 service。

func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err)
      return b, nil
   }
 
   serviceList, ok := list.(*v1.ServiceList)
   if ok {var svcNew []v1.Service
      for i := range serviceList.Items {nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name)
         // remove lb service
         if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer {if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" {klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
               continue
            }
         }
 
         // remove cloud clusterIP service
         if _, ok := cloudClusterIPService[nsName]; ok {klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
            continue
         }
 
         svcNew = append(svcNew, serviceList.Items[i])
      }
      serviceList.Items = svcNew
      return fh.serializer.Encode(serviceList)
   }
 
   return b, nil
}

过滤框架现状

目前的过滤框架比拟生硬,将资源过滤硬编码至代码中,只能是已注册的资源能力进行相应的过滤, 为了解决这个问题,须要对过滤框架进行相应的革新。

解决方案

计划一:

应用参数或者环境变量的模式自定义过滤配置,然而这种形式有以下弊病:

1、配置简单须要将所以须要自定义的配置写入到启动参数或者读取环境变量 例如下格局:

--filter_serviceTopology=coredns/endpointslices#list,kube-proxy/services#list;watch --filter_endpointsFilter=nginx-ingress-controller/endpoints#list;watch

2、无奈热更新,每次批改配置都须要重启 Yurthub 失效。

计划二:

1、应用 configmap 的模式自定义过滤配置升高配置复杂度配置格局 (user-agent/resource#list,watch) 多个资源通过逗号隔开。如下所示:

filter_endpoints: coredns/endpoints#list;watch,test/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""filter_masterservice:""

2、利用 Informer 机制保障配置实时失效

综合以上两点在 OpenYurt 中咱们抉择了解决方案二。

开发过程中遇到的问题

在边缘端 Informer watch 的 api 地址是 Yurthub 的代理地址,那么 Yurthub 在启动代理端口之前都是无奈保障 configmap 的数据是失常的。如果在启动实现之后 addons 的申请先于 configmap 数据更新 这个时候会导致数据在没有过滤的状况下就返回给了 addons,这样会导致很多预期以外的问题。

为了解决这个问题 咱们须要在 apporve 中退出 WaitForCacheSync 保证数据同步实现之后能力返回相应的过滤数据,然而在 apporve 中退出 WaitForCacheSync 也间接导致 configmap 进行 watch 的时候也会被阻塞,所以须要在 WaitForCacheSync 之前退出一个白名单机制,当 Yurthub 应用 list & watch 拜访 configmap 的时候咱们间接不进行数据过滤,相应的代码逻辑如下:

func (a *approver) Approve(comp, resource, verb string) bool {if a.isWhitelistReq(comp, resource, verb) {return false}
   if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {panic("wait for configMap cache sync timeout")
   }
   a.Lock()
   defer a.Unlock()
   for _, requests := range a.nameToRequests {
      for _, request := range requests {if request.Equal(comp, resource, verb) {return true}
      }
   }
   return false
}

总结

1、通过上述的扩大能力能够看出,YurtHub 不仅仅是边缘节点上的带有数据缓存能力的反向代理。而是对 Kubernetes 节点利用生命周期治理加了一层新的封装,提供边缘计算所须要的外围管控能力。

2、YurtHub 不仅仅实用于边缘计算场景,其实能够作为节点侧的一个常备组件,实用于应用 Kubernetes 的任意场景。置信这也会驱动 YurtHub 向更高性能,更高稳定性倒退。

点击​​ 此处 ​​​,立刻理解 OpenYurt 我的项目!​

正文完
 0