前言

常常有人问:“浏览器输出url后产生了什么”,这个问题看似简略,然而却能全面的考查一个人对系统的理解水平。如果把这个问题引申到k8s畛域,就能够问出“K8sClient提交Yaml后产生了什么”这样相似的问题。同样的,要答复这个问题,须要咱们对k8s的设计有一个比拟残缺的理解。本文就试图答复这个问题,带着大家体验一下一份Yaml的K8S之旅。

k8s的设计

k8s能够说是松耦合设计的一个典型,如下图所示,各个组件都和ApiServer进行通信,只有ApiServer能够写Etcd,这样做的的益处有许多:各个组件解耦了,能够独立倒退;各组件也能够散布在不同的机器上,防止单机忙碌,甚至对某些要害组件能够多实例部署,加强性能和可用性;因为数据库Etcd保护了集群的外围元数据和状态,由ApiServer对立验证鉴权更正当;等等。

理解了k8s的整体设计后,上面咱们以Service这个利用最关怀的资源的Yaml文件提交后的成果进行剖析,其它资源也是大同小异。

以Service为例

零碎的整体解决流程图大抵如下,首先在k8s启动后,各个组件包含CoreDNS、各个Controller都会连贯到ApiServer(list/watch),在client如kubectl提交yaml后,API server会把相干资源存储到Ectd中并告诉各个组件,各个组件而后各自进行本人的相干操作,最初产生了一个能够对外提供服务的service。

理解了整体流程后,咱们来对流程中波及的各个组件进行粗疏的剖析。

ApiServer

ApiServer 会启动一个httpsserver,并把相干端点注册到具体的storage,其中以“api”结尾的属于legacy,其注册的局部常见端点有:

restStorageMap := map[string]rest.Storage{    "pods":             podStorage.Pod,    "services":        serviceRest,    "endpoints": endpointsStorage,    "nodes":        nodeStorage.Node,    ......}

首先咱们来看看通用的storage

type Store struct {    // NewFunc returns a new instance of the type this registry returns for a    // GET of a single object, e.g.:    //    // curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object    NewFunc func() runtime.Object    // NewListFunc returns a new list of the type this registry; it is the    // type returned when the resource is listed, e.g.:    //    // curl GET /apis/group/version/namespaces/my-ns/myresource    NewListFunc func() runtime.Object}// 资源创立办法func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {    // 校验资源合法性    if createValidation != nil {        if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {            return nil, err        }    }    name, err := e.ObjectNameFunc(obj)    key, err := e.KeyFunc(ctx, name)    qualifiedResource := e.qualifiedResourceFromContext(ctx)    ttl, err := e.calculateTTL(obj, 0, false)        out := e.NewFunc()    // 最终调用的要么是 dryrun,要么是etcd3    // https://github.com/kubernetes/kubernetes/blob/7f7378eddfe7a817c47fc75c220a729f4b78b913/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L144    if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {        err = storeerr.InterpretCreateError(err, qualifiedResource, name)        err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)        // 资源已存在则能够原谅        if !apierrors.IsAlreadyExists(err) {            return nil, err        }        // 创立后没法取得则不能原谅        if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {            return nil, err        }    }    // 切面    if e.AfterCreate != nil {        if err := e.AfterCreate(out); err != nil {            return nil, err        }    }    return out, nil}

从restStorageMap可见解决逻辑是Service和Endpoints对象都要被存入etcd,其中Service还有一些非凡的逻辑(如调配IP,健康检查等),而Endpoint没啥额定的逻辑,间接应用通用的storage即可。

// service 创立逻辑func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {    service := obj.(*api.Service)    // 切面    if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil {        return nil, err    }    // 是否须要开释IP,相似于事务,如果调配出错的话,把ip还给资源池    releaseServiceIP := false    defer func() {        if releaseServiceIP {            if helper.IsServiceIPSet(service) {                allocator := rs.getAllocatorByClusterIP(service)                allocator.Release(net.ParseIP(service.Spec.ClusterIP))            }        }    }()    var err error    if !dryrun.IsDryRun(options.DryRun) {        // 对于不是ExternalName类型的service才调配IP        if service.Spec.Type != api.ServiceTypeExternalName {            // 这个 分配器 实际上基于etcd            allocator := rs.getAllocatorBySpec(service)            if releaseServiceIP, err = initClusterIP(service, allocator); err != nil {                return nil, err            }        }    }    // 由 分配器 调配端口    nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun))    // 同样须要判断是否须要回收    defer nodePortOp.Finish()    // 对于 NodePort和LoadBalance类型的service都要调配端口    if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {        if err := initNodePorts(service, nodePortOp); err != nil {            return nil, err        }    }    // 对于须要健康检查的service调配专门的端口    // 至于loadbalance类型且ExternalTrafficPolicy为Local的才须要调配    if apiservice.NeedsHealthCheck(service) {        if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {            return nil, errors.NewInternalError(err)        }    }    // 理论创立    out, err := rs.services.Create(ctx, service, createValidation, options)    if err != nil {        err = rest.CheckGeneratedNameError(registry.Strategy, err, service)    }}// 基于 etcd 的 ip 分配器serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {    mem := allocator.NewAllocationMap(max, rangeSpec)    // TODO etcdallocator package to return a storage interface via the storageFactory    etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)    if err != nil {        return nil, err    }    serviceClusterIPRegistry = etcd    return etcd, nil})if err != nil {    return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)}restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry

对于Pod对象来说,除了存储外,还要将pod绑定到特定的机器下来:

func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMachine, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {    podKey, err := r.store.KeyFunc(ctx, podID)    if err != nil {        return nil, err    }    err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {        pod, ok := obj.(*api.Pod)        // 调配到机器        pod.Spec.NodeName = machine        // 设置注解        if pod.Annotations == nil {            pod.Annotations = make(map[string]string)        }        for k, v := range annotations {            pod.Annotations[k] = v        }        // condition,表明已被调度        podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{            Type:   api.PodScheduled,            Status: api.ConditionTrue,        })        finalPod = pod        return pod, nil    }), dryRun)    return finalPod, err}

EndpointController

EndpointController监听Service和Pod的变动事件,并注册回调函数,通过Informer实现。同时利用Informer缓存最新的endpoint到本地,然而并不注册回调事件,因为endpoint基本上是最底层的概念,不须要额定的解决逻辑。

// 监听serviceserviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    // 增删改    AddFunc: e.onServiceUpdate,    UpdateFunc: func(old, cur interface{}) {        e.onServiceUpdate(cur)    },    DeleteFunc: e.onServiceDelete,})...// 监听podpodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    e.addPod,    UpdateFunc: e.updatePod,    DeleteFunc: e.deletePod,})// 利用Informer能够获取最新的endpoint情况e.endpointsLister = endpointsInformer.Lister()e.endpointsSynced = endpointsInformer.Informer().HasSynced

收到相干资源减少事件后,把须要解决的service退出队列

func (e *EndpointController) onServiceUpdate(obj interface{}) {    // 取得service key 可能是 name 或则 namespace/name    key, err := controller.KeyFunc(obj)    if err != nil {        utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))        return    }    // 更新service的selector    _ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)    // 将service退出待处理队列    e.queue.Add(key)}func (e *EndpointController) addPod(obj interface{}) {    pod := obj.(*v1.Pod)    // 取得该pod相干的service,这些service的selector蕴含这个pod    services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)    if err != nil {        utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))        return    }    // 将该pod相干的service退出待处理队列    for key := range services {        e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)    }}

另一方面,当 EndpointController Run 起来过后,其实是循环解决队列中的service,解决内容包含批改Service自身和其对应的Endpoints

func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {    ...    // 能够启动多个 goroutine ,来解决endpoint变动    for i := 0; i < workers; i++ {        go wait.Until(e.worker, e.workerLoopPeriod, stopCh)    }    go func() {        defer utilruntime.HandleCrash()        // 解决无主(没有对应service)的endpoint,相似垃圾回收,        // 当然这个办法只是遍历service的key并退出队列,理论解决由syncService实现        e.checkLeftoverEndpoints()    }()}// 具体解决办法func (e *EndpointController) syncService(key string) error {    // 取得service    service, err := e.serviceLister.Services(namespace).Get(name)    if err != nil {        // 不是没找到,返回谬误        if !errors.IsNotFound(err) {            return err        }        // 没有这个service,删掉相应的endpoint。这两者由key关联        err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})        return nil    }    if service.Spec.Selector == nil {        // 没有selector的service,其endpoint只能是手动创立的,与本Controller无关,间接返回        // https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors        return nil    }    // 取得相应pod    pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())        // 遍历这些pod,把适合的pod的ip退出该service的endpoints汇合    for _, pod := range pods {        // 返回这个pod的端点地址,须要解决v4 v6两类状况        ep, err := podToEndpointAddressForService(service, pod)        // headless service 能够不指定端口.        if len(service.Spec.Ports) == 0 {            if service.Spec.ClusterIP == api.ClusterIPNone {                subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)            }        } else {            // 针对每个port映射,生成端点地址            for i := range service.Spec.Ports {                servicePort := &service.Spec.Ports[i]                portNum, err := podutil.FindPort(pod, servicePort)                epp := endpointPortFromServicePort(servicePort, portNum)                subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)            }        }    }        // 检测service是否真的有变动    // 首先取得最新的端点情况    currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)    if err != nil {        // 不存在该endpoint就创立        if errors.IsNotFound(err) {            currentEndpoints = &v1.Endpoints{                ObjectMeta: metav1.ObjectMeta{                    Name:   service.Name,                    Labels: service.Labels,                },            }        } else {            return err        }    }        createEndpoints := len(currentEndpoints.ResourceVersion) == 0    // 如果不是新创建的endpoint,则比拟是否雷同,雷同则阐明不须要批改    if !createEndpoints &&        apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&        apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {        klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)        return nil    }        newEndpoints := currentEndpoints.DeepCopy()    newEndpoints.Subsets = subsets    newEndpoints.Labels = service.Labels        // 调用go client ,让APIservier 创立/更新 endpoint对象    if createEndpoints {        // No previous endpoints, create them        _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})    } else {        // Pre-existing        _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})    }    return nil}

ServiceController

ServiceController监听Service和Node的变动事件,原理与EndpointController统一,都是利用Informer,Informer的事件回调办法次要也是把须要解决的service退出队列,以及解决node。

serviceInformer.Informer().AddEventHandlerWithResyncPeriod(    cache.ResourceEventHandlerFuncs{        AddFunc: func(cur interface{}) {            svc, ok := cur.(*v1.Service)            // 将须要调配负载均衡器或者清理的service退出待处理队列            if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {                s.enqueueService(cur)            }        },        UpdateFunc: func(old, cur interface{}) {            oldSvc, ok1 := old.(*v1.Service)            curSvc, ok2 := cur.(*v1.Service)            if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {                s.enqueueService(cur)            }        },    },    serviceSyncPeriod,)nodeInformer.Informer().AddEventHandlerWithResyncPeriod(    cache.ResourceEventHandlerFuncs{        AddFunc: func(cur interface{}) {            s.nodeSyncLoop()        },        UpdateFunc: func(old, cur interface{}) {            s.nodeSyncLoop()        },        DeleteFunc: func(old interface{}) {            s.nodeSyncLoop()        },

其中解决node的办法nodeSyncLoop,次要工作是比照最新节点和原有节点,若有变动则更新对应的service。

func (s *Controller) nodeSyncLoop() {    // 最新且ready的所有节点    // 要所有节点是因为loadbalancer可能须要挂载到所有节点    // 取决于具体策略externalTrafficPolicy,不同云厂商实现大同小异    // https://aws.amazon.com/cn/blogs/opensource/network-load-balancer-support-in-kubernetes-1-9/    // https://help.aliyun.com/document_detail/86531.html#title-cn3-euk-ij6    newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())    // 节点未变动,原本不须要变动,然而能够在这里解决上次解决失败的service    if nodeSlicesEqualForLB(newHosts, s.knownHosts) {        s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)        return    }    // 取得所有service    s.servicesToUpdate = s.cache.allServices()    // 解决service,保留本次解决失败的service留给下次解决    s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)    // 更新本地service    s.knownHosts = newHosts}// 解决service,保留本次解决失败的servicefunc (s *Controller) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {    for _, service := range services {        func() {            if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {                servicesToRetry = append(servicesToRetry, service)            }        }()    }    return servicesToRetry}func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {    // 只解决 loadbalance 类型的service    if !wantsLoadBalancer(service) {        return nil    }    // 由云厂商实现loadBalancer的调配,比方 aws aliyun等    err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts)    if err == nil {        return nil    }    if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil {        runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))    } else if !exists {        return nil    }    s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)    return err}

另一方面,当 ServiceController Run 起来过后,其实是循环解决队列中的service和node,次要实现的工作是LoadBalancer类型的service与后端node的映射关系的保护。

func (s *Controller) Run(stopCh <-chan struct{}, workers int) {    // 启动多个协程来解决service    for i := 0; i < workers; i++ {        go wait.Until(s.worker, time.Second, stopCh)    }    // 解决节点,也就是说不仅有事件触发,也有被动循环来解决节点变动    go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)}// 具体解决service的办法func (s *Controller) syncService(key string) error {    // 由key取得命名空间和service name    namespace, name, err := cache.SplitMetaNamespaceKey(key)    // 最新的service    service, err := s.serviceLister.Services(namespace).Get(name)    switch {    case errors.IsNotFound(err):        // 没找到,阐明该删除这个service了        err = s.processServiceDeletion(key)    case err != nil:        runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))    default:        // 创立或者更新service        err = s.processServiceCreateOrUpdate(service, key)    }    return err}

CoreDNS

CoreDNS的 kubernetes 插件配置好并启动后,以 service 的模式(名字就叫 kube-dns 兼容之前的dns插件名称)运行在k8s集群中,DNSController 监听 namespace、service和pod(可选)、endpoint(可选)的变动,并通过 Informer 机制缓存在本地。

func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {    // 获取配置    config, err := k.getClientConfig()    // 根据配置取得client    kubeClient, err := kubernetes.NewForConfig(config)    if err != nil {        return fmt.Errorf("failed to create kubernetes notification controller: %q", err)    }    k.opts.initPodCache = k.podMode == podModeVerified    // controller中监听各资源    k.APIConn = newdnsController(ctx, kubeClient, k.opts)    return err}// 在 controller 中监听各个资源的变动,并存储在本地dns.svcLister, dns.svcController = object.NewIndexerInformer(    &cache.ListWatch{        ListFunc:  serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),        WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),    },    &api.Service{},)dns.nsLister, dns.nsController = cache.NewInformer(    &cache.ListWatch{        ListFunc:  namespaceListFunc(ctx, dns.client, dns.namespaceSelector),        WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector),    },    &api.Namespace{},)pod和endpoint可选...

CoreDNS解决域名查问通过每一个插件的ServeDNS办法实现,在 kubernetes插件 中如下:

// ServeDNS implements the plugin.Handler interface.func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {    state := request.Request{W: w, Req: r}    // 解决多种申请类型    switch state.QType() {    case dns.TypeAXFR, dns.TypeIXFR:        k.Transfer(ctx, state)    case dns.TypeA:        records, err = plugin.A(ctx, &k, zone, state, nil, plugin.Options{})    case dns.TypeAAAA:        records, err = plugin.AAAA(ctx, &k, zone, state, nil, plugin.Options{})    case dns.TypeTXT:        records, err = plugin.TXT(ctx, &k, zone, state, nil, plugin.Options{})    case dns.TypeCNAME:        records, err = plugin.CNAME(ctx, &k, zone, state, plugin.Options{})    case dns.TypePTR:        records, err = plugin.PTR(ctx, &k, zone, state, plugin.Options{})    ....    default:        // Do a fake A lookup, so we can distinguish between NODATA and NXDOMAIN        fake := state.NewWithQuestion(state.QName(), dns.TypeA)        fake.Zone = state.Zone        _, err = plugin.A(ctx, &k, zone, fake, nil, plugin.Options{})    }    return dns.RcodeSuccess, nil}

该办法解决具体申请时,是通过informer查找存在本地的service或者pod的endpoints信息实现域名和ip的映射。

func (k *Kubernetes) Services(ctx context.Context, state request.Request, exact bool, opt plugin.Options) (svcs []msg.Service, err error) {    // 非凡dns申请类型间接能够返回    switch state.QType() {    case dns.TypeTXT:        return []msg.Service{svc}, nil    case dns.TypeNS:        return svcs, nil    }    if isDefaultNS(state.Name(), state.Zone) {        return svcs, nil    }    // 其余类型须要查问k8s返回记录    s, e := k.Records(ctx, state, false)    internal := []msg.Service{}    for _, svc := range s {        if t, _ := svc.HostType(); t != dns.TypeCNAME {            internal = append(internal, svc)        }    }    return internal, e}// records 办法解析申请,做些验证,而后查找k8s中的相应记录func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) {    r, e := parseRequest(state.Name(), state.Zone)    ...    if r.podOrSvc == Pod {        // 解决pod申请        pods, err := k.findPods(r, state.Zone)        return pods, err    }    // 解决service申请    services, err := k.findServices(r, state.Zone)    return services, err}// 这个办法解决pod的dns申请func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) {    // 个性敞开,返回空    if k.podMode == podModeDisabled {        return nil, errNoItems    }    podname := r.service    // pod的name间接能够解析ip,辨别ipv4 ipv6    if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") {        ip = strings.Replace(podname, "-", ".", -1)    } else {        ip = strings.Replace(podname, "-", ":", -1)    }    if k.podMode == podModeInsecure {        // 不需查看模式,间接返回记录        return []msg.Service{{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl}}, err    }    // 须要查看的模式,只返回存在的pod的记录    for _, p := range k.APIConn.PodIndex(ip) {        // check for matching ip and namespace        if ip == p.PodIP && match(namespace, p.Namespace) {            s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl}            pods = append(pods, s)        }    }    return pods, err}// 这个办法解决各个类型的service申请,从本地cache中读取相应记录func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) {    //  确定是否是含糊查问    if wildcard(r.service) || wildcard(r.namespace) {        // 返回所有状态正确的service        serviceList = k.APIConn.ServiceList()        endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EndpointsList() }    } else {        // 只返回  name + "." + namespace 类型的service        idx := object.ServiceKey(r.service, r.namespace)        serviceList = k.APIConn.SvcIndex(idx)        endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EpIndex(idx) }    }    zonePath := msg.Path(zone, coredns)    for _, svc := range serviceList {        // service name和 namespace都要匹配才行        if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) {            continue        }        // 如果是含糊查问,须要namespace被裸露才行        if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) {            continue        }        // 解决endpoint或者headless service,这两类申请都需遍历endpoint        if svc.ClusterIP == api.ClusterIPNone || r.endpoint != "" {            for _, ep := range endpointsList {                if ep.Name != svc.Name || ep.Namespace != svc.Namespace {                    continue                }                // 遍历endpoint的每一个ip和port                for _, eps := range ep.Subsets {                    for _, addr := range eps.Addresses {                        for _, p := range eps.Ports {                            s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl}                            s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, endpointHostname(addr, k.endpointNameMode)}, "/")                            err = nil                            services = append(services, s)                        }                    }                }            }            continue        }        // 解决 External service        if svc.Type == api.ServiceTypeExternalName {            // 如 cluster.local/svc/namespace/service            s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.ExternalName, TTL: k.ttl}            if t, _ := s.HostType(); t == dns.TypeCNAME {                // 只有 cname 记录                s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")                services = append(services, s)                err = nil            }            continue        }        // 解决 ClusterIP service        for _, p := range svc.Ports {            if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {                continue            }            s := msg.Service{Host: svc.ClusterIP, Port: int(p.Port), TTL: k.ttl}            s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")            services = append(services, s)        }    }    return services, err}

KubeProxy

KubeProxy的次要工作是监听Service和Endpoints等的变动,并把路由规定(如何依据service的域名或者ip取得后端实在pod ip)刷新到节点上。

这样,每个pod在拜访service时,就会向CoreDNS要到对应的service ip 或者间接是每个backend pod的ip(如 headless service),对于前者由本地路由规定将service ip的流量疏导至真正的pod ip。咱们的这份yaml也终于成了一个能够对外提供服务的service。

KubeProxy这部分因为通过了多个版本的迭代,目前包含三种类型,限于篇幅本文不开展,且待下回分解。

参考

  • 服务、负载平衡和联网 - Kubernetes: https://kubernetes.io/zh/docs/concepts/services-networking/
  • How to Add Plugins to CoreDNS: https://coredns.io/2017/03/01/how-to-add-plugins-to-coredns/