共计 18816 个字符,预计需要花费 48 分钟才能阅读完成。
前言
常常有人问:“浏览器输出 url 后产生了什么”,这个问题看似简略,然而却能全面的考查一个人对系统的理解水平。如果把这个问题引申到 k8s 畛域,就能够问出“K8sClient 提交 Yaml 后产生了什么”这样相似的问题。同样的,要答复这个问题,须要咱们对 k8s 的设计有一个比拟残缺的理解。本文就试图答复这个问题,带着大家体验一下一份 Yaml 的 K8S 之旅。
k8s 的设计
k8s 能够说是松耦合设计的一个典型,如下图所示,各个组件都和 ApiServer 进行通信,只有 ApiServer 能够写 Etcd,这样做的的益处有许多:各个组件解耦了,能够独立倒退;各组件也能够散布在不同的机器上,防止单机忙碌,甚至对某些要害组件能够多实例部署,加强性能和可用性;因为数据库 Etcd 保护了集群的外围元数据和状态,由 ApiServer 对立验证鉴权更正当;等等。
理解了 k8s 的整体设计后,上面咱们以 Service 这个利用最关怀的资源的 Yaml 文件提交后的成果进行剖析,其它资源也是大同小异。
以 Service 为例
零碎的整体解决流程图大抵如下,首先在 k8s 启动后,各个组件包含 CoreDNS、各个 Controller 都会连贯到 ApiServer(list/watch),在 client 如 kubectl 提交 yaml 后,API server 会把相干资源存储到 Ectd 中并告诉各个组件,各个组件而后各自进行本人的相干操作,最初产生了一个能够对外提供服务的 service。
理解了整体流程后,咱们来对流程中波及的各个组件进行粗疏的剖析。
ApiServer
ApiServer 会启动一个 httpsserver,并把相干端点注册到具体的 storage,其中以“api”结尾的属于 legacy,其注册的局部常见端点有:
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"services": serviceRest,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
......
}
首先咱们来看看通用的 storage
type Store struct {
// NewFunc returns a new instance of the type this registry returns for a
// GET of a single object, e.g.:
//
// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
NewFunc func() runtime.Object
// NewListFunc returns a new list of the type this registry; it is the
// type returned when the resource is listed, e.g.:
//
// curl GET /apis/group/version/namespaces/my-ns/myresource
NewListFunc func() runtime.Object}
// 资源创立办法
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
// 校验资源合法性
if createValidation != nil {if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {return nil, err}
}
name, err := e.ObjectNameFunc(obj)
key, err := e.KeyFunc(ctx, name)
qualifiedResource := e.qualifiedResourceFromContext(ctx)
ttl, err := e.calculateTTL(obj, 0, false)
out := e.NewFunc()
// 最终调用的要么是 dryrun,要么是 etcd3
// https://github.com/kubernetes/kubernetes/blob/7f7378eddfe7a817c47fc75c220a729f4b78b913/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L144
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {err = storeerr.InterpretCreateError(err, qualifiedResource, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
// 资源已存在则能够原谅
if !apierrors.IsAlreadyExists(err) {return nil, err}
// 创立后没法取得则不能原谅
if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {return nil, err}
}
// 切面
if e.AfterCreate != nil {if err := e.AfterCreate(out); err != nil {return nil, err}
}
return out, nil
}
从 restStorageMap 可见解决逻辑是 Service 和 Endpoints 对象都要被存入 etcd,其中 Service 还有一些非凡的逻辑(如调配 IP,健康检查等),而 Endpoint 没啥额定的逻辑,间接应用通用的 storage 即可。
// service 创立逻辑
func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {service := obj.(*api.Service)
// 切面
if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil {return nil, err}
// 是否须要开释 IP,相似于事务,如果调配出错的话,把 ip 还给资源池
releaseServiceIP := false
defer func() {
if releaseServiceIP {if helper.IsServiceIPSet(service) {allocator := rs.getAllocatorByClusterIP(service)
allocator.Release(net.ParseIP(service.Spec.ClusterIP))
}
}
}()
var err error
if !dryrun.IsDryRun(options.DryRun) {
// 对于不是 ExternalName 类型的 service 才调配 IP
if service.Spec.Type != api.ServiceTypeExternalName {
// 这个 分配器 实际上基于 etcd
allocator := rs.getAllocatorBySpec(service)
if releaseServiceIP, err = initClusterIP(service, allocator); err != nil {return nil, err}
}
}
// 由 分配器 调配端口
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
// 同样须要判断是否须要回收
defer nodePortOp.Finish()
// 对于 NodePort 和 LoadBalance 类型的 service 都要调配端口
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {if err := initNodePorts(service, nodePortOp); err != nil {return nil, err}
}
// 对于须要健康检查的 service 调配专门的端口
// 至于 loadbalance 类型且 ExternalTrafficPolicy 为 Local 的才须要调配
if apiservice.NeedsHealthCheck(service) {if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {return nil, errors.NewInternalError(err)
}
}
// 理论创立
out, err := rs.services.Create(ctx, service, createValidation, options)
if err != nil {err = rest.CheckGeneratedNameError(registry.Strategy, err, service)
}
}
// 基于 etcd 的 ip 分配器
serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
if err != nil {return nil, err}
serviceClusterIPRegistry = etcd
return etcd, nil
})
if err != nil {return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
对于 Pod 对象来说,除了存储外,还要将 pod 绑定到特定的机器下来:
func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMachine, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {podKey, err := r.store.KeyFunc(ctx, podID)
if err != nil {return nil, err}
err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {pod, ok := obj.(*api.Pod)
// 调配到机器
pod.Spec.NodeName = machine
// 设置注解
if pod.Annotations == nil {pod.Annotations = make(map[string]string)
}
for k, v := range annotations {pod.Annotations[k] = v
}
// condition,表明已被调度
podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
Type: api.PodScheduled,
Status: api.ConditionTrue,
})
finalPod = pod
return pod, nil
}), dryRun)
return finalPod, err
}
EndpointController
EndpointController 监听 Service 和 Pod 的变动事件,并注册回调函数,通过 Informer 实现。同时利用 Informer 缓存最新的 endpoint 到本地,然而并不注册回调事件,因为 endpoint 基本上是最底层的概念,不须要额定的解决逻辑。
// 监听 service
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// 增删改
AddFunc: e.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {e.onServiceUpdate(cur)
},
DeleteFunc: e.onServiceDelete,
})
...
// 监听 pod
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
// 利用 Informer 能够获取最新的 endpoint 情况
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced
收到相干资源减少事件后,把须要解决的 service 退出队列
func (e *EndpointController) onServiceUpdate(obj interface{}) {
// 取得 service key 可能是 name 或则 namespace/name
key, err := controller.KeyFunc(obj)
if err != nil {utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
// 更新 service 的 selector
_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
// 将 service 退出待处理队列
e.queue.Add(key)
}
func (e *EndpointController) addPod(obj interface{}) {pod := obj.(*v1.Pod)
// 取得该 pod 相干的 service,这些 service 的 selector 蕴含这个 pod
services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
if err != nil {utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
return
}
// 将该 pod 相干的 service 退出待处理队列
for key := range services {e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
}
另一方面,当 EndpointController Run 起来过后,其实是循环解决队列中的 service,解决内容包含批改 Service 自身和其对应的 Endpoints
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
...
// 能够启动多个 goroutine,来解决 endpoint 变动
for i := 0; i < workers; i++ {go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
}
go func() {defer utilruntime.HandleCrash()
// 解决无主(没有对应 service)的 endpoint,相似垃圾回收,// 当然这个办法只是遍历 service 的 key 并退出队列,理论解决由 syncService 实现
e.checkLeftoverEndpoints()}()}
// 具体解决办法
func (e *EndpointController) syncService(key string) error {
// 取得 service
service, err := e.serviceLister.Services(namespace).Get(name)
if err != nil {
// 不是没找到,返回谬误
if !errors.IsNotFound(err) {return err}
// 没有这个 service,删掉相应的 endpoint。这两者由 key 关联
err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
return nil
}
if service.Spec.Selector == nil {
// 没有 selector 的 service,其 endpoint 只能是手动创立的,与本 Controller 无关,间接返回
// https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors
return nil
}
// 取得相应 pod
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
// 遍历这些 pod,把适合的 pod 的 ip 退出该 service 的 endpoints 汇合
for _, pod := range pods {
// 返回这个 pod 的端点地址,须要解决 v4 v6 两类状况
ep, err := podToEndpointAddressForService(service, pod)
// headless service 能够不指定端口.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
}
} else {
// 针对每个 port 映射,生成端点地址
for i := range service.Spec.Ports {servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
epp := endpointPortFromServicePort(servicePort, portNum)
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
}
}
}
// 检测 service 是否真的有变动
// 首先取得最新的端点情况
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
// 不存在该 endpoint 就创立
if errors.IsNotFound(err) {
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {return err}
}
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
// 如果不是新创建的 endpoint,则比拟是否雷同,雷同则阐明不须要批改
if !createEndpoints &&
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
newEndpoints := currentEndpoints.DeepCopy()
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
// 调用 go client,让 APIservier 创立 / 更新 endpoint 对象
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})
}
return nil
}
ServiceController
ServiceController 监听 Service 和 Node 的变动事件,原理与 EndpointController 统一,都是利用 Informer,Informer 的事件回调办法次要也是把须要解决的 service 退出队列,以及解决 node。
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{AddFunc: func(cur interface{}) {svc, ok := cur.(*v1.Service)
// 将须要调配负载均衡器或者清理的 service 退出待处理队列
if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {s.enqueueService(cur)
}
},
UpdateFunc: func(old, cur interface{}) {oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {s.enqueueService(cur)
}
},
},
serviceSyncPeriod,
)
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{AddFunc: func(cur interface{}) {s.nodeSyncLoop()
},
UpdateFunc: func(old, cur interface{}) {s.nodeSyncLoop()
},
DeleteFunc: func(old interface{}) {s.nodeSyncLoop()
},
其中解决 node 的办法 nodeSyncLoop,次要工作是比照最新节点和原有节点,若有变动则更新对应的 service。
func (s *Controller) nodeSyncLoop() {
// 最新且 ready 的所有节点
// 要所有节点是因为 loadbalancer 可能须要挂载到所有节点
// 取决于具体策略 externalTrafficPolicy,不同云厂商实现大同小异
// https://aws.amazon.com/cn/blogs/opensource/network-load-balancer-support-in-kubernetes-1-9/
// https://help.aliyun.com/document_detail/86531.html#title-cn3-euk-ij6
newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())
// 节点未变动,原本不须要变动,然而能够在这里解决上次解决失败的 service
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
return
}
// 取得所有 service
s.servicesToUpdate = s.cache.allServices()
// 解决 service,保留本次解决失败的 service 留给下次解决
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
// 更新本地 service
s.knownHosts = newHosts
}
// 解决 service,保留本次解决失败的 service
func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
for _, service := range services {func() {if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {servicesToRetry = append(servicesToRetry, service)
}
}()}
return servicesToRetry
}
func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
// 只解决 loadbalance 类型的 service
if !wantsLoadBalancer(service) {return nil}
// 由云厂商实现 loadBalancer 的调配,比方 aws aliyun 等
err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts)
if err == nil {return nil}
if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil {runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))
} else if !exists {return nil}
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
return err
}
另一方面,当 ServiceController Run 起来过后,其实是循环解决队列中的 service 和 node,次要实现的工作是 LoadBalancer 类型的 service 与后端 node 的映射关系的保护。
func (s *Controller) Run(stopCh <-chan struct{}, workers int) {
// 启动多个协程来解决 service
for i := 0; i < workers; i++ {go wait.Until(s.worker, time.Second, stopCh)
}
// 解决节点,也就是说不仅有事件触发,也有被动循环来解决节点变动
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)
}
// 具体解决 service 的办法
func (s *Controller) syncService(key string) error {
// 由 key 取得命名空间和 service name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 最新的 service
service, err := s.serviceLister.Services(namespace).Get(name)
switch {case errors.IsNotFound(err):
// 没找到,阐明该删除这个 service 了
err = s.processServiceDeletion(key)
case err != nil:
runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))
default:
// 创立或者更新 service
err = s.processServiceCreateOrUpdate(service, key)
}
return err
}
CoreDNS
CoreDNS 的 kubernetes 插件配置好并启动后,以 service 的模式 (名字就叫 kube-dns 兼容之前的 dns 插件名称) 运行在 k8s 集群中,DNSController 监听 namespace、service 和 pod(可选)、endpoint(可选)的变动,并通过 Informer 机制缓存在本地。
func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
// 获取配置
config, err := k.getClientConfig()
// 根据配置取得 client
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {return fmt.Errorf("failed to create kubernetes notification controller: %q", err)
}
k.opts.initPodCache = k.podMode == podModeVerified
// controller 中监听各资源
k.APIConn = newdnsController(ctx, kubeClient, k.opts)
return err
}
// 在 controller 中监听各个资源的变动,并存储在本地
dns.svcLister, dns.svcController = object.NewIndexerInformer(
&cache.ListWatch{ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Service{},)
dns.nsLister, dns.nsController = cache.NewInformer(
&cache.ListWatch{ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector),
WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector),
},
&api.Namespace{},)
pod 和 endpoint 可选
...
CoreDNS 解决域名查问通过每一个插件的 ServeDNS 办法实现,在 kubernetes 插件 中如下:
// ServeDNS implements the plugin.Handler interface.
func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {state := request.Request{W: w, Req: r}
// 解决多种申请类型
switch state.QType() {
case dns.TypeAXFR, dns.TypeIXFR:
k.Transfer(ctx, state)
case dns.TypeA:
records, err = plugin.A(ctx, &k, zone, state, nil, plugin.Options{})
case dns.TypeAAAA:
records, err = plugin.AAAA(ctx, &k, zone, state, nil, plugin.Options{})
case dns.TypeTXT:
records, err = plugin.TXT(ctx, &k, zone, state, nil, plugin.Options{})
case dns.TypeCNAME:
records, err = plugin.CNAME(ctx, &k, zone, state, plugin.Options{})
case dns.TypePTR:
records, err = plugin.PTR(ctx, &k, zone, state, plugin.Options{})
....
default:
// Do a fake A lookup, so we can distinguish between NODATA and NXDOMAIN
fake := state.NewWithQuestion(state.QName(), dns.TypeA)
fake.Zone = state.Zone
_, err = plugin.A(ctx, &k, zone, fake, nil, plugin.Options{})
}
return dns.RcodeSuccess, nil
}
该办法解决具体申请时,是通过 informer 查找存在本地的 service 或者 pod 的 endpoints 信息实现域名和 ip 的映射。
func (k *Kubernetes) Services(ctx context.Context, state request.Request, exact bool, opt plugin.Options) (svcs []msg.Service, err error) {
// 非凡 dns 申请类型间接能够返回
switch state.QType() {
case dns.TypeTXT:
return []msg.Service{svc}, nil
case dns.TypeNS:
return svcs, nil
}
if isDefaultNS(state.Name(), state.Zone) {return svcs, nil}
// 其余类型须要查问 k8s 返回记录
s, e := k.Records(ctx, state, false)
internal := []msg.Service{}
for _, svc := range s {if t, _ := svc.HostType(); t != dns.TypeCNAME {internal = append(internal, svc)
}
}
return internal, e
}
// records 办法解析申请,做些验证,而后查找 k8s 中的相应记录
func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) {r, e := parseRequest(state.Name(), state.Zone)
...
if r.podOrSvc == Pod {
// 解决 pod 申请
pods, err := k.findPods(r, state.Zone)
return pods, err
}
// 解决 service 申请
services, err := k.findServices(r, state.Zone)
return services, err
}
// 这个办法解决 pod 的 dns 申请
func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) {
// 个性敞开,返回空
if k.podMode == podModeDisabled {return nil, errNoItems}
podname := r.service
// pod 的 name 间接能够解析 ip,辨别 ipv4 ipv6
if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") {ip = strings.Replace(podname, "-", ".", -1)
} else {ip = strings.Replace(podname, "-", ":", -1)
}
if k.podMode == podModeInsecure {
// 不需查看模式,间接返回记录
return []msg.Service{{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl}}, err
}
// 须要查看的模式,只返回存在的 pod 的记录
for _, p := range k.APIConn.PodIndex(ip) {
// check for matching ip and namespace
if ip == p.PodIP && match(namespace, p.Namespace) {s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl}
pods = append(pods, s)
}
}
return pods, err
}
// 这个办法解决各个类型的 service 申请,从本地 cache 中读取相应记录
func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) {
// 确定是否是含糊查问
if wildcard(r.service) || wildcard(r.namespace) {
// 返回所有状态正确的 service
serviceList = k.APIConn.ServiceList()
endpointsListFunc = func() []*object.Endpoints {return k.APIConn.EndpointsList() }
} else {
// 只返回 name + "." + namespace 类型的 service
idx := object.ServiceKey(r.service, r.namespace)
serviceList = k.APIConn.SvcIndex(idx)
endpointsListFunc = func() []*object.Endpoints {return k.APIConn.EpIndex(idx) }
}
zonePath := msg.Path(zone, coredns)
for _, svc := range serviceList {
// service name 和 namespace 都要匹配才行
if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) {continue}
// 如果是含糊查问,须要 namespace 被裸露才行
if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) {continue}
// 解决 endpoint 或者 headless service,这两类申请都需遍历 endpoint
if svc.ClusterIP == api.ClusterIPNone || r.endpoint != "" {
for _, ep := range endpointsList {
if ep.Name != svc.Name || ep.Namespace != svc.Namespace {continue}
// 遍历 endpoint 的每一个 ip 和 port
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
for _, p := range eps.Ports {s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, endpointHostname(addr, k.endpointNameMode)}, "/")
err = nil
services = append(services, s)
}
}
}
}
continue
}
// 解决 External service
if svc.Type == api.ServiceTypeExternalName {
// 如 cluster.local/svc/namespace/service
s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.ExternalName, TTL: k.ttl}
if t, _ := s.HostType(); t == dns.TypeCNAME {
// 只有 cname 记录
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
services = append(services, s)
err = nil
}
continue
}
// 解决 ClusterIP service
for _, p := range svc.Ports {if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {continue}
s := msg.Service{Host: svc.ClusterIP, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
services = append(services, s)
}
}
return services, err
}
KubeProxy
KubeProxy 的次要工作是监听 Service 和 Endpoints 等的变动,并把路由规定(如何依据 service 的域名或者 ip 取得后端实在 pod ip)刷新到节点上。
这样,每个 pod 在拜访 service 时,就会向 CoreDNS 要到对应的 service ip 或者间接是每个 backend pod 的 ip(如 headless service),对于前者由本地路由规定将 service ip 的流量疏导至真正的 pod ip。咱们的这份 yaml 也终于成了一个能够对外提供服务的 service。
KubeProxy 这部分因为通过了多个版本的迭代,目前包含三种类型,限于篇幅本文不开展,且待下回分解。
参考
- 服务、负载平衡和联网 – Kubernetes: https://kubernetes.io/zh/docs/concepts/services-networking/
- How to Add Plugins to CoreDNS: https://coredns.io/2017/03/01/how-to-add-plugins-to-coredns/