杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前次要从事镜像仓库、Kubernetes集群高可用&备份还原,以及边缘计算相干研发工作。

前言

SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,实现了同一个 nodeunit 内服务的闭环拜访

在深入分析 application-grid-wrapper 之前,这里先简略介绍一下社区 Kubernetes 原生反对的拓扑感知个性

Kubernetes service topology awareness 个性于v1.17公布alpha版本,用于实现路由拓扑以及就近拜访个性。用户须要在 service 中增加 topologyKeys 字段标示拓扑key类型,只有具备雷同拓扑域的endpoint会被拜访到,目前有三种 topologyKeys可供选择:

  • "kubernetes.io/hostname":拜访本节点内(kubernetes.io/hostname label value雷同)的 endpoint,如果没有则 service 拜访失败
  • "topology.kubernetes.io/zone":拜访雷同zone域内(topology.kubernetes.io/zone label value 雷同)的 endpoint,如果没有则 service 拜访失败
  • "topology.kubernetes.io/region":拜访雷同region域内(topology.kubernetes.io/region label value雷同)的 endpoint,如果没有则 service 拜访失败

除了独自填写如上某一个拓扑key之外,还能够将这些key结构成列表进行填写,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],这示意:优先拜访本节点内的 endpoint;如果不存在,则拜访同一个 zone 内的 endpoint;如果再不存在,则拜访同一个 region 内的 endpoint,如果都不存在则拜访失败

另外,还能够在列表最初(只能最初一项)增加"*"示意:如果后面拓扑域都失败,则拜访任何无效的 endpoint,也即没有限度拓扑了,示例如下:

# A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints.apiVersion: v1kind: Servicemetadata:  name: my-servicespec:  selector:    app: my-app  ports:    - protocol: TCP      port: 80      targetPort: 9376  topologyKeys:    - "kubernetes.io/hostname"    - "topology.kubernetes.io/zone"    - "topology.kubernetes.io/region"    - "*"

而service group实现的拓扑感知和社区比照,有如下区别:

  • service group 拓扑key能够自定义,也即为 gridUniqKey,应用起来更加灵便;而社区实现目前只有三种抉择:"kubernetes.io/hostname","topology.kubernetes.io/zone" 以及 "topology.kubernetes.io/region"
  • service group 只能填写一个拓扑 key,也即只能拜访本拓扑域内无效的 endpoint,无法访问其它拓扑域的 endpoint;而社区能够通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的拜访

service group 实现的拓扑感知,service 配置如下:

# A Service that only prefers node zone1al endpoints.apiVersion: v1kind: Servicemetadata:  annotations:    topologyKeys: '["zone1"]'  labels:    superedge.io/grid-selector: servicegrid-demo  name: servicegrid-demo-svcspec:  ports:  - port: 80    protocol: TCP    targetPort: 8080  selector:    appGrid: echo

在介绍完 service group 实现的拓扑感知后,咱们深刻到源码剖析实现细节。同样的,这里以一个应用示例开始剖析:

# step1: labels edge nodes$ kubectl  get nodesNAME    STATUS   ROLES    AGE   VERSIONnode0   Ready    <none>   16d   v1.16.7node1   Ready    <none>   16d   v1.16.7node2   Ready    <none>   16d   v1.16.7# nodeunit1(nodegroup and servicegroup zone1)$ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1  # nodeunit2(nodegroup and servicegroup zone1)$ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2$ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2...# step3: deploy echo ServiceGrid$ cat <<EOF | kubectl --kubeconfig config apply -f -apiVersion: superedge.io/v1kind: ServiceGridmetadata:  name: servicegrid-demo  namespace: defaultspec:  gridUniqKey: zone1  template:    selector:      appGrid: echo    ports:    - protocol: TCP      port: 80      targetPort: 8080EOFservicegrid.superedge.io/servicegrid-demo created# note that there is only one relevant service generated$ kubectl  get svcNAME                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)   AGEkubernetes             ClusterIP   192.168.0.1       <none>        443/TCP   16dservicegrid-demo-svc   ClusterIP   192.168.6.139     <none>        80/TCP    10m# step4: access servicegrid-demo-svc(service topology and closed-looped)# execute on node0$ curl 192.168.6.139|grep "node name"        node name:      node0# execute on node1 and node2$ curl 192.168.6.139|grep "node name"        node name:      node2$ curl 192.168.6.139|grep "node name"        node name:      node1

在创立完 ServiceGrid CR 后,ServiceGrid Controller 负责依据 ServiceGrid 产生对应的 service (蕴含由serviceGrid.Spec.GridUniqKey 形成的 topologyKeys annotations);而 application-grid-wrapper 依据 service 实现拓扑感知,上面顺次剖析。

ServiceGrid Controller 剖析

ServiceGrid Controller 逻辑和 DeploymentGrid Controller 整体统一,如下:

  • 1、创立并保护 service group 须要的若干CRDs(包含:ServiceGrid)
  • 2、监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创立并且保护对应的 service
  • 3、监听 service event,并将相干的 ServiceGrid 塞到工作队列中进行上述解决,帮助上述逻辑达到整体 reconcile 逻辑

留神这里区别于 DeploymentGrid Controller:

  • 一个 ServiceGrid 对象只产生一个 service
  • 只需额定监听 service event,无需监听 node 事件。因为 node的CRUD 与 ServiceGrid 无关
  • ServiceGrid 对应产生的 service,命名为:{ServiceGrid}-svc
func (sgc *ServiceGridController) syncServiceGrid(key string) error {    startTime := time.Now()    klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime)    defer func() {        klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime))    }()    namespace, name, err := cache.SplitMetaNamespaceKey(key)    if err != nil {        return err    }    sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name)    if errors.IsNotFound(err) {        klog.V(2).Infof("service grid %v has been deleted", key)        return nil    }    if err != nil {        return err    }    if sg.Spec.GridUniqKey == "" {        sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty", "This service grid has an empty grid key")        return nil    }    // get service workload list of this grid    svcList, err := sgc.getServiceForGrid(sg)    if err != nil {        return err    }    if sg.DeletionTimestamp != nil {        return nil    }    // sync service grid relevant services workload    return sgc.reconcile(sg, svcList)}func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) {    svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything())    if err != nil {        return nil, err    }    labelSelector, err := common.GetDefaultSelector(sg.Name)    if err != nil {        return nil, err    }    canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {        fresh, err := sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{})        if err != nil {            return nil, err        }        if fresh.UID != sg.UID {            return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace,                sg.Name, fresh.UID, sg.UID)        }        return fresh, nil    })    cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc)    return cm.ClaimService(svcList)}func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList []*corev1.Service) error {    var (        adds    []*corev1.Service        updates []*corev1.Service        deletes []*corev1.Service    )    sgTargetSvcName := util.GetServiceName(g)    isExistingSvc := false    for _, svc := range svcList {        if svc.Name == sgTargetSvcName {            isExistingSvc = true            template := util.KeepConsistence(g, svc)            if !apiequality.Semantic.DeepEqual(template, svc) {                updates = append(updates, template)            }        } else {            deletes = append(deletes, svc)        }    }    if !isExistingSvc {        adds = append(adds, util.CreateService(g))    }    return sgc.syncService(adds, updates, deletes)}func CreateService(sg *crdv1.ServiceGrid) *corev1.Service {    svc := &corev1.Service{        ObjectMeta: metav1.ObjectMeta{            Name:      GetServiceName(sg),            Namespace: sg.Namespace,            // Append existed ServiceGrid labels to service to be created            Labels: func() map[string]string {                if sg.Labels != nil {                    newLabels := sg.Labels                    newLabels[common.GridSelectorName] = sg.Name                    newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey                    return newLabels                } else {                    return map[string]string{                        common.GridSelectorName:        sg.Name,                        common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey,                    }                }            }(),            Annotations: make(map[string]string),        },        Spec: sg.Spec.Template,    }    keys := make([]string, 1)    keys[0] = sg.Spec.GridUniqKey    keyData, _ := json.Marshal(keys)    svc.Annotations[common.TopologyAnnotationsKey] = string(keyData)    return svc}

因为逻辑与DeploymentGrid相似,这里不开展细节,重点关注 application-grid-wrapper 局部

application-grid-wrapper 剖析

在 ServiceGrid Controller 创立完 service 之后,application-grid-wrapper 的作用就开始启动了:

apiVersion: v1kind: Servicemetadata:  annotations:    topologyKeys: '["zone1"]'  creationTimestamp: "2021-03-03T07:33:30Z"  labels:    superedge.io/grid-selector: servicegrid-demo  name: servicegrid-demo-svc  namespace: default  ownerReferences:  - apiVersion: superedge.io/v1    blockOwnerDeletion: true    controller: true    kind: ServiceGrid    name: servicegrid-demo    uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581  resourceVersion: "127987090"  selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc  uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfcspec:  clusterIP: 192.168.161.1  ports:  - port: 80    protocol: TCP    targetPort: 8080  selector:    appGrid: echo  sessionAffinity: None  type: ClusterIPstatus:  loadBalancer: {}

为了实现 Kubernetes 零侵入,须要在 kube-proxy与apiserver 通信之间增加一层 wrapper,架构如下:

调用链路如下:

kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver

因而application-grid-wrapper会起服务,承受来自kube-proxy的申请,如下:

func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    ...    klog.Infof("Start to run interceptor server")    /* filter     */    server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    if insecure {        return server.ListenAndServe()    }    ...    server.TLSConfig = tlsConfig    return server.ListenAndServeTLS("", "")}func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {    handler := http.Handler(http.NewServeMux())    handler = s.interceptEndpointsRequest(handler)    handler = s.interceptServiceRequest(handler)    handler = s.interceptEventRequest(handler)    handler = s.interceptNodeRequest(handler)    handler = s.logger(handler)    if debug {        handler = s.debugger(handler)    }    return handler}

这里会首先创立 interceptorServer,而后注册处理函数,由外到内顺次如下:

  • debug:承受 debug 申请,返回 wrapper pprof 运行信息
  • logger:打印申请日志
  • node:承受 kube-proxy node GET(/api/v1/nodes/{node}) 申请,并返回node信息
  • event:承受 kube-proxy events POST (/events)申请,并将申请转发给 lite-apiserver

    func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler {  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {      if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") {          handler.ServeHTTP(w, r)          return      }      targetURL, _ := url.Parse(s.restConfig.Host)      reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)      reverseProxy.Transport, _ = rest.TransportFor(s.restConfig)      reverseProxy.ServeHTTP(w, r)  })}
  • service:承受 kube-proxy service List&Watch(/api/v1/services) 申请,并依据 storageCache 内容返回(GetServices)
  • endpoint:承受 kube-proxy endpoint List&Watch(/api/v1/endpoints) 申请,并依据 storageCache内容返回 (GetEndpoints)

上面先重点剖析 cache 局部的逻辑,而后再回过头来剖析具体的 http handler List&Watch 解决逻辑

wrapper 为了实现拓扑感知,本人保护了一个 cache,包含:node,service,endpoint。能够看到在 setupInformers 中注册了这三类资源的处理函数:

type storageCache struct {    // hostName is the nodeName of node which application-grid-wrapper deploys on    hostName         string    wrapperInCluster bool    // mu lock protect the following map structure    mu           sync.RWMutex    servicesMap  map[types.NamespacedName]*serviceContainer    endpointsMap map[types.NamespacedName]*endpointsContainer    nodesMap     map[types.NamespacedName]*nodeContainer    // service watch channel    serviceChan chan<- watch.Event    // endpoints watch channel    endpointsChan chan<- watch.Event}...func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {    msc := &storageCache{        hostName:         hostName,        wrapperInCluster: wrapperInCluster,        servicesMap:      make(map[types.NamespacedName]*serviceContainer),        endpointsMap:     make(map[types.NamespacedName]*endpointsContainer),        nodesMap:         make(map[types.NamespacedName]*nodeContainer),        serviceChan:      serviceNotifier,        endpointsChan:    endpointsNotifier,    }    return msc}...func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    ...    if err := s.setupInformers(ctx.Done()); err != nil {        return err    }    klog.Infof("Start to run interceptor server")    /* filter     */    server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    ...    return server.ListenAndServeTLS("", "")}func (s *interceptorServer) setupInformers(stop <-chan struct{}) error {    klog.Infof("Start to run service and endpoints informers")    noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)    if err != nil {        klog.Errorf("can't parse proxy label, %v", err)        return err    }    noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)    if err != nil {        klog.Errorf("can't parse headless label, %v", err)        return err    }    labelSelector := labels.NewSelector()    labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)    resyncPeriod := time.Minute * 5    client := kubernetes.NewForConfigOrDie(s.restConfig)    nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)    informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,        informers.WithTweakListOptions(func(options *metav1.ListOptions) {            options.LabelSelector = labelSelector.String()        }))    nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()    serviceInformer := informerFactory.Core().V1().Services().Informer()    endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()    /*     */    nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod)    serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod)    endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod)    go nodeInformer.Run(stop)    go serviceInformer.Run(stop)    go endpointsInformer.Run(stop)    if !cache.WaitForNamedCacheSync("node", stop,        nodeInformer.HasSynced,        serviceInformer.HasSynced,        endpointsInformer.HasSynced) {        return fmt.Errorf("can't sync informers")    }    return nil}func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler {    return &nodeHandler{cache: sc}}func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler {    return &serviceHandler{cache: sc}}func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler {    return &endpointsHandler{cache: sc}}

这里顺次剖析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:

1、NodeEventHandler

NodeEventHandler 负责监听 node 资源相干 event,并将 node 以及 node Labels 增加到storageCache.nodesMap 中 (key 为 nodeName,value 为 node 以及 node labels)

func (nh *nodeHandler) add(node *v1.Node) {    sc := nh.cache    sc.mu.Lock()    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    klog.Infof("Adding node %v", nodeKey)    sc.nodesMap[nodeKey] = &nodeContainer{        node:   node,        labels: node.Labels,    }    // update endpoints    changedEps := sc.rebuildEndpointsMap()    sc.mu.Unlock()    for _, eps := range changedEps {        sc.endpointsChan <- eps    }}func (nh *nodeHandler) update(node *v1.Node) {    sc := nh.cache    sc.mu.Lock()    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    klog.Infof("Updating node %v", nodeKey)    nodeContainer, found := sc.nodesMap[nodeKey]    if !found {        sc.mu.Unlock()        klog.Errorf("Updating non-existed node %v", nodeKey)        return    }    nodeContainer.node = node    // return directly when labels of node stay unchanged    if reflect.DeepEqual(node.Labels, nodeContainer.labels) {        sc.mu.Unlock()        return    }    nodeContainer.labels = node.Labels    // update endpoints    changedEps := sc.rebuildEndpointsMap()    sc.mu.Unlock()    for _, eps := range changedEps {        sc.endpointsChan <- eps    }}...

同时因为 node 的扭转会影响 endpoint,因而会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified eventsfunc (sc *storageCache) rebuildEndpointsMap() []watch.Event {    evts := make([]watch.Event, 0)    for name, endpointsContainer := range sc.endpointsMap {        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            continue        }        sc.endpointsMap[name].modified = newEps        evts = append(evts, watch.Event{            Type:   watch.Modified,            Object: newEps,        })    }    return evts}

rebuildEndpointsMap 是 cache 的外围函数,同时也是拓扑感知的算法实现:

// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labelsfunc pruneEndpoints(hostName string,    nodes map[types.NamespacedName]*nodeContainer,    services map[types.NamespacedName]*serviceContainer,    eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints {    epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}    if wrapperInCluster {        eps = genLocalEndpoints(eps)    }    // dangling endpoints    svc, ok := services[epsKey]    if !ok {        klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets)        return eps    }    // normal service    if len(svc.keys) == 0 {        klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets)        return eps    }    // topology endpoints    newEps := eps.DeepCopy()    for si := range newEps.Subsets {        subnet := &newEps.Subsets[si]        subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses)        subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses)    }    klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets)    return newEps}// filterConcernedAddresses aims to filter out endpoints addresses within the same node unitfunc filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer,    addresses []v1.EndpointAddress) []v1.EndpointAddress {    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    if !found {        return nil    }    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    for i := range addresses {        addr := addresses[i]        if nodeName := addr.NodeName; nodeName != nil {            epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]            if !found {                continue            }            if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            }        }    }    return filteredEndpointAddresses}func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {    if n1 == nil || n2 == nil {        return false    }    for _, key := range keys {        val1, v1found := n1[key]        val2, v2found := n2[key]        if v1found && v2found && val1 == val2 {            return true        }    }    return false}

算法逻辑如下:

  • 判断 endpoint 是否为 default kubernetes service,如果是,则将该 endpoint 转化为 wrapper 所在边缘节点的 lite-apiserver 地址(127.0.0.1)和端口(51003)
apiVersion: v1kind: Endpointsmetadata:  annotations:    superedge.io/local-endpoint: 127.0.0.1    superedge.io/local-port: "51003"  name: kubernetes  namespace: defaultsubsets:- addresses:  - ip: 172.31.0.60  ports:  - name: https    port: xxx    protocol: TCP
func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints {    if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName {        return eps    }    klog.V(4).Infof("begin to gen local ep %v", eps)    ipAddress, e := eps.Annotations[EdgeLocalEndpoint]    if !e {        return eps    }    portStr, e := eps.Annotations[EdgeLocalPort]    if !e {        return eps    }    klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr)    port, err := strconv.ParseInt(portStr, 10, 32)    if err != nil {        klog.Errorf("parse int %s err %v", portStr, err)        return eps    }    ip := net.ParseIP(ipAddress)    if ip == nil {        klog.Warningf("parse ip %s nil", ipAddress)        return eps    }    nep := eps.DeepCopy()    nep.Subsets = []v1.EndpointSubset{        {            Addresses: []v1.EndpointAddress{                {                    IP: ipAddress,                },            },            Ports: []v1.EndpointPort{                {                    Protocol: v1.ProtocolTCP,                    Port:     int32(port),                    Name:     "https",                },            },        },    }    klog.V(4).Infof("gen new endpoint complete %v", nep)    return nep}

这样做的目标是使边缘节点上的服务采纳集群内 (InCluster) 形式拜访的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver

  • 从 storageCache.servicesMap cache 中依据 endpoint 名称 (namespace/name) 取出对应 service,如果该 service 没有 topologyKeys 则无需做拓扑转化 (非service group)
func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string {    if !hasTopologyKey(objectMeta) {        return nil    }    var keys []string    keyData := objectMeta.Annotations[TopologyAnnotationsKey]    if err := json.Unmarshal([]byte(keyData), &keys); err != nil {        klog.Errorf("can't parse topology keys %s, %v", keyData, err)        return nil    }    return keys}
  • 调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint
// filterConcernedAddresses aims to filter out endpoints addresses within the same node unitfunc filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer,    addresses []v1.EndpointAddress) []v1.EndpointAddress {    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    if !found {        return nil    }    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    for i := range addresses {        addr := addresses[i]        if nodeName := addr.NodeName; nodeName != nil {            epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]            if !found {                continue            }            if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            }        }    }    return filteredEndpointAddresses}func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {    if n1 == nil || n2 == nil {        return false    }    for _, key := range keys {        val1, v1found := n1[key]        val2, v2found := n2[key]        if v1found && v2found && val1 == val2 {            return true        }    }    return false}

留神:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service

回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将批改后的 endpoints 赋值给 storageCache.endpointsMap [endpoint]. modified (该字段记录了拓扑感知后批改的endpoints)。

func (nh *nodeHandler) add(node *v1.Node) {    sc := nh.cache    sc.mu.Lock()    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    klog.Infof("Adding node %v", nodeKey)    sc.nodesMap[nodeKey] = &nodeContainer{        node:   node,        labels: node.Labels,    }    // update endpoints    changedEps := sc.rebuildEndpointsMap()    sc.mu.Unlock()    for _, eps := range changedEps {        sc.endpointsChan <- eps    }}// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified eventsfunc (sc *storageCache) rebuildEndpointsMap() []watch.Event {    evts := make([]watch.Event, 0)    for name, endpointsContainer := range sc.endpointsMap {        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            continue        }        sc.endpointsMap[name].modified = newEps        evts = append(evts, watch.Event{            Type:   watch.Modified,            Object: newEps,        })    }    return evts}

另外,如果 endpoints (拓扑感知后批改的 endpoints) 产生扭转,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest) 解决

2、ServiceEventHandler

storageCache.servicesMap 构造体 key 为 service 名称 (namespace/name),value 为 serviceContainer,蕴含如下数据:

  • svc:service对象
  • keys:service topologyKeys

对于 service 资源的改变,这里用 Update event 阐明:

func (sh *serviceHandler) update(service *v1.Service) {    sc := sh.cache    sc.mu.Lock()    serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}    klog.Infof("Updating service %v", serviceKey)    newTopologyKeys := getTopologyKeys(&service.ObjectMeta)    serviceContainer, found := sc.servicesMap[serviceKey]    if !found {        sc.mu.Unlock()        klog.Errorf("update non-existed service, %v", serviceKey)        return    }    sc.serviceChan <- watch.Event{        Type:   watch.Modified,        Object: service,    }    serviceContainer.svc = service    // return directly when topologyKeys of service stay unchanged    if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) {        sc.mu.Unlock()        return    }    serviceContainer.keys = newTopologyKeys    // update endpoints    changedEps := sc.rebuildEndpointsMap()    sc.mu.Unlock()    for _, eps := range changedEps {        sc.endpointsChan <- eps    }}

逻辑如下:

  • 获取 service topologyKeys
  • 构建 service event.Modified event
  • 比拟 service topologyKeys 与曾经存在的是否有差别
  • 如果有差别则更新 topologyKeys,且调用 rebuildEndpointsMap刷新该 service 对应的 endpoints,如果 endpoints 发生变化,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest) 解决

3、EndpointsEventHandler

storageCache.endpointsMap 构造体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,蕴含如下数据:

  • endpoints:拓扑批改前的 endpoints
  • modified:拓扑批改后的 endpoints

对于 endpoints 资源的改变,这里用 Update event 阐明:

func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {    sc := eh.cache    sc.mu.Lock()    endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}    klog.Infof("Updating endpoints %v", endpointsKey)    endpointsContainer, found := sc.endpointsMap[endpointsKey]    if !found {        sc.mu.Unlock()        klog.Errorf("Updating non-existed endpoints %v", endpointsKey)        return    }    endpointsContainer.endpoints = endpoints    newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)    changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)    if changed {        endpointsContainer.modified = newEps    }    sc.mu.Unlock()    if changed {        sc.endpointsChan <- watch.Event{            Type:   watch.Modified,            Object: newEps,        }    }}

逻辑如下:

  • 更新 endpointsContainer.endpoint 为新的 endpoints 对象
  • 调用 pruneEndpoints 获取拓扑刷新后的 endpoints
  • 比拟 endpointsContainer.modified 与新刷新后的 endpoints
  • 如果有差别则更新 endpointsContainer.modified,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest) 解决

在剖析完NodeEventHandler,ServiceEventHandler以及EndpointsEventHandler之后,咱们回到具体的http handler List&Watch解决逻辑上,这里以endpoints为例:

func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler {    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {        if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") {            handler.ServeHTTP(w, r)            return        }        queries := r.URL.Query()        acceptType := r.Header.Get("Accept")        info, found := s.parseAccept(acceptType, s.mediaSerializer)        if !found {            klog.Errorf("can't find %s serializer", acceptType)            w.WriteHeader(http.StatusBadRequest)            return        }        encoder := scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion)        // list request        if queries.Get("watch") == "" {            w.Header().Set("Content-Type", info.MediaType)            allEndpoints := s.cache.GetEndpoints()            epsItems := make([]v1.Endpoints, 0, len(allEndpoints))            for _, eps := range allEndpoints {                epsItems = append(epsItems, *eps)            }            epsList := &v1.EndpointsList{                Items: epsItems,            }            err := encoder.Encode(epsList, w)            if err != nil {                klog.Errorf("can't marshal endpoints list, %v", err)                w.WriteHeader(http.StatusInternalServerError)                return            }            return        }        // watch request        timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")        timeout := time.Minute        if timeoutSecondsStr != "" {            timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))        }        timer := time.NewTimer(timeout)        defer timer.Stop()        flusher, ok := w.(http.Flusher)        if !ok {            klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)            w.WriteHeader(http.StatusMethodNotAllowed)            return        }        e := restclientwatch.NewEncoder(            streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),                scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),            encoder)        if info.MediaType == runtime.ContentTypeProtobuf {            w.Header().Set("Content-Type", runtime.ContentTypeProtobuf+";stream=watch")        } else {            w.Header().Set("Content-Type", runtime.ContentTypeJSON)        }        w.Header().Set("Transfer-Encoding", "chunked")        w.WriteHeader(http.StatusOK)        flusher.Flush()        for {            select {            case <-r.Context().Done():                return            case <-timer.C:                return            case evt := <-s.endpointsWatchCh:                klog.V(4).Infof("Send endpoint watch event: %+#v", evt)                err := e.Encode(&evt)                if err != nil {                    klog.Errorf("can't encode watch event, %v", err)                    return                }                if len(s.endpointsWatchCh) == 0 {                    flusher.Flush()                }            }        }    })}

逻辑如下:

  • 如果为List申请,则调用 GetEndpoints 获取拓扑批改后的 endpoints 列表,并返回
func (sc *storageCache) GetEndpoints() []*v1.Endpoints {    sc.mu.RLock()    defer sc.mu.RUnlock()    epList := make([]*v1.Endpoints, 0, len(sc.endpointsMap))    for _, v := range sc.endpointsMap {        epList = append(epList, v.modified)    }    return epList}
  • 如果为 Watch 申请,则一直从 storageCache.endpointsWatchCh 管道中承受 watch event,并返回

interceptServiceRequest 逻辑与 interceptEndpointsRequest 统一,这里不再赘述。

总结

  • SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,实现了同一个 nodeunit 内服务的闭环拜访
  • service group 实现的拓扑感知和 Kubernetes 社区原生实现比照,有如下区别:

    • service group 拓扑 key 能够自定义,也即为 gridUniqKey,应用起来更加灵便;而社区实现目前只有三种抉择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
    • service group 只能填写一个拓扑 key,也即只能拜访本拓扑域内无效的 endpoint,无法访问其它拓扑域的 endpoint;而社区能够通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的拜访
  • ServiceGrid Controller 负责依据 ServiceGrid 产生对应的 service(蕴含由serviceGrid.Spec.GridUniqKey形成的 topologyKeys annotations),逻辑和 DeploymentGrid Controller 整体统一,如下:

    • 创立并保护 service group 须要的若干 CRDs (包含:ServiceGrid)
    • 监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出ServiceGrid进行解析,创立并且保护对应的 service
    • 监听 service event,并将相干的 ServiceGrid 塞到工作队列中进行上述解决,帮助上述逻辑达到整体 reconcile 逻辑
  • 为了实现Kubernetes零侵入,须要在 kube-proxy 与 apiserver 通信之间增加一层 wrapper,调用链路如下:kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
  • application-grid-wrapper 是一个 http server,承受来自 kube-proxy 的申请,同时保护一个资源缓存,处理函数由外到内顺次如下:

    • debug:承受 debug 申请,返回 wrapper pprof 运行信息
    • logger:打印申请日志
    • node:承受kube-proxy node GET (/api/v1/nodes/{node}) 申请,并返回 node 信息
    • event:承受 kube-proxy events POST (/events) 申请,并将申请转发给 lite-apiserver
    • service:承受 kube-proxy service List&Watch (/api/v1/services) 申请,并依据 storageCache 内容返回 (GetServices)。
    • endpoint:承受 kube-proxy endpoint List&Watch(/api/v1/endpoints) 申请,并依据storageCache 内容返回 (GetEndpoints)。
  • wrapper 为了实现拓扑感知,保护了一个资源cache,包含:node,service,endpoint,同时注册了相干 event 处理函数。外围拓扑算法逻辑为:调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。另外,如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service。
  • wrapper 承受来自 kube-proxy 对 endpoints 以及 service 的 List&Watch 申请,以 endpoints 为例:如果为List申请,则调用 GetEndpoints 获取拓扑批改后的 endpoints 列表,并返回;如果为 Watch 申请,则一直从 storageCache.endpointsWatchCh 管道中承受 watch event,并返回。service逻辑与 endpoints 统一。

瞻望

目前 SuperEdge service group 实现的拓扑算法性能更加灵便不便,如何解决与 Kubernetes 社区 service topology awareness 之间的关系值得摸索,倡议将 SuperEdge 拓扑算法推到社区

Refs

  • duyanghao kubernetes-reading-notes
【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!