关于kubernetes:Kubernetes学习笔记之Calico-CNI-Plugin源码解析二

2次阅读

共计 12540 个字符,预计需要花费 32 分钟才能阅读完成。

Overview

calico 插件代码仓库在 projectcalico/cni-plugin,并且会编译两个二进制文件:calico 和 calico-ipam,其中 calico 会为 sandbox container 创立 route 和虚构网卡 virtual interface,以及 veth pair 等网络资源,并且会把相干数据写入 calico datastore 数据库里;calico-ipam 会为以后 pod 从以后节点的 pod 网段内调配 ip 地址,当然以后节点还没有 pod 网段,会从集群网段 cluster cidr 中先调配出该节点的 pod cidr,并把相干数据写入 calico datastore 数据库里,这里 cluster cidr 是用户本人定义的,曾经提前写入了 calico datastore,并且从 cluster cidr 中划分的 block size 也是能够自定义的 (新版本 calico/node 容器能够反对自定义,老版本 calico 不反对),能够参考官网文档 change-block-size

接下来重点看下 calico 二进制插件具体是如何工作的,后续再看 calico-ipam 二进制插件如何调配 ip 地址的。

calico plugin 源码解析

calico 插件是遵循 cni 标准接口,实现了 ADDDEL 命令,这里重点看看 ADD 命令时如何实现的。calico 首先会注册 ADDDEL 命令,代码在 L614-L677


func Main(version string) {
    // ...
    err := flagSet.Parse(os.Args[1:])
    // ...
    // 注册 `ADD` 和 `DEL` 命令
    skel.PluginMain(cmdAdd, nil, cmdDel,
        cniSpecVersion.PluginSupports("0.1.0", "0.2.0", "0.3.0", "0.3.1"),
        "Calico CNI plugin"+version)
}

ADD 命令里,次要做了三个逻辑:

  • 查问 calico datastore 里有没有 WorkloadEndpoint 对象和以后的 pod 名字匹配,没有匹配,则会创立新的 WorkloadEndpoint 对象,该对象内次要保留该 pod 在 host network namespace 内的网卡名字和 pod ip 地址,以及 container network namespace 的网卡名字等等信息,对象示例如下。
  • 创立一个 veth pair,并把其中一个网卡置于宿主机端网络命名空间,另一个置于容器端网络命名空间。在 container network namespace 内创立网卡如 eth0,并通过调用 calico-ipam 取得的 IP 地址赋值给该 eth0 网卡;在 host network namespace 内创立网卡,网卡名格局为 "cali" + sha1(namespace.pod)[:11],并设置 MAC 地址 ”ee:ee:ee:ee:ee:ee”。
  • 在容器端和宿主机端创立路由。在容器端,设置默认网关为 169.254.1.1,该网关地址代码写死的;在宿主机端,增加路由如 10.217.120.85 dev calid0bda9976d5 scope link,其中 10.217.120.85 是 pod ip 地址,calid0bda9976d5 是该 pod 在宿主机端的网卡,也就是 veth pair 在宿主机这端的 virtual ethernet interface 虚构网络设备。

一个 WorkloadEndpoint 对象示例如下,一个 k8s pod 对象对应着 calico 中的一个 workloadendpoint 对象,能够通过 calicoctl get wep -o wide 查看所有 workloadendpoint。
记得配置 calico datastore 为 kubernetes 的,为不便能够在 ~/.zshrc 里配置环境变量:

# calico
export CALICO_DATASTORE_TYPE=kubernetes
export  CALICO_KUBECONFIG=~/.kube/config

apiVersion: projectcalico.org/v3
kind: WorkloadEndpoint
metadata:
  creationTimestamp: "2021-01-09T08:38:56Z"
  generateName: nginx-demo-1-7f67f8bdd8-
  labels:
    app: nginx-demo-1
    pod-template-hash: 7f67f8bdd8
    projectcalico.org/namespace: default
    projectcalico.org/orchestrator: k8s
    projectcalico.org/serviceaccount: default
  name: minikube-k8s-nginx--demo--1--7f67f8bdd8--d5wsc-eth0
  namespace: default
  resourceVersion: "557760"
  uid: 85d1d33f-f55f-4f28-a89d-0a55394311db
spec:
  endpoint: eth0
  interfaceName: calife8e5922caa
  ipNetworks:
  - 10.217.120.84/32
  node: minikube
  orchestrator: k8s
  pod: nginx-demo-1-7f67f8bdd8-d5wsc
  profiles:
  - kns.default
  - ksa.default.default

依据以上三个次要逻辑,看下 cmdAdd 函数代码:


func cmdAdd(args *skel.CmdArgs) (err error) {
    // ...
    // 从 args.StdinData 里加载配置数据,这些配置数据其实就是
    // `--cni-conf-dir` 传进来的文件内容,即 cni 配置参数,见第一篇文章
    // types.NetConf 构造体数据结构也对应着 cni 配置文件里的数据
    conf := types.NetConf{}
    if err := json.Unmarshal(args.StdinData, &conf); err != nil {return fmt.Errorf("failed to load netconf: %v", err)
    }
    // 这里能够通过 cni 参数设置,把 calico 插件的日志落地到宿主机文件内
    // "log_level": "debug", "log_file_path": "/var/log/calico/cni/cni.log",
    utils.ConfigureLogging(conf)
    
    // ...
    
    // 能够在 cni 文件内设置 MTU,即 Max Transmit Unit 最大传输单元,配置网卡时须要
    if mtu, err := utils.MTUFromFile("/var/lib/calico/mtu"); err != nil {return fmt.Errorf("failed to read MTU file: %s", err)
    } else if conf.MTU == 0 && mtu != 0 {conf.MTU = mtu}

    // 结构一个 WEPIdentifiers 对象,并赋值
    nodename := utils.DetermineNodename(conf)
    wepIDs, err := utils.GetIdentifiers(args, nodename)
    calicoClient, err := utils.CreateClient(conf)
    
    // 查看 datastore 是否曾经 ready 了,能够 `calicoctl get clusterinformation default -o yaml` 查看
    ci, err := calicoClient.ClusterInformation().Get(ctx, "default", options.GetOptions{})
    if !*ci.Spec.DatastoreReady {return}

    // list 出前缀为 wepPrefix 的 workloadEndpoint,一个 pod 对应一个 workloadEndpoint,如果数据库里能匹配出 workloadEndpoint,就应用这个 workloadEndpoint
    // 否则最初创立完 pod network 资源后,会往 calico 数据库里写一个 workloadEndpoint
    wepPrefix, err := wepIDs.CalculateWorkloadEndpointName(true)
    endpoints, err := calicoClient.WorkloadEndpoints().List(ctx, options.ListOptions{Name: wepPrefix, Namespace: wepIDs.Namespace, Prefix: true})
    if err != nil {return}

    // 对于新建的 pod,最初会在 calico datastore 里写一个对应的新的 workloadendpoint 对象
    var endpoint *api.WorkloadEndpoint

    // 这里因为咱们是新建的 pod,数据库里也不会有对应的 workloadEndpoint 对象,所以 endpoints 必然是 nil 的
    if len(endpoints.Items) > 0 {// ...}

    // 既然 endpoint 是 nil,则填充 WEPIdentifiers 对象默认值,这里 args.IfName 是 kubelet 那边传过来的,就是容器端网卡名字,个别是 eth0
    // 这里 WEPName 的格局为:{node_name}-k8s-{strings.replace(pod_name, "-", "--")}-{wepIDs.Endpoint},比方上文
    // minikube-k8s-nginx--demo--1--7f67f8bdd8--d5wsc-eth0 WorkloadEndpoint 对象
    if endpoint == nil {
        wepIDs.Endpoint = args.IfName
        wepIDs.WEPName, err = wepIDs.CalculateWorkloadEndpointName(false)
    }

    // Orchestrator 是 k8s
    if wepIDs.Orchestrator == api.OrchestratorKubernetes {
        // k8s.CmdAddK8s 函数里做以上三个逻辑工作
        if result, err = k8s.CmdAddK8s(ctx, args, conf, *wepIDs, calicoClient, endpoint); err != nil {return}
    } else {// ...}

    // 咱们的配置文件里 policy.type 是 k8s,可见上文配置文件
    if conf.Policy.PolicyType == "" {// ...}

    // Print result to stdout, in the format defined by the requested cniVersion.
    err = cnitypes.PrintResult(result, conf.CNIVersion)
    return
}

以上 cmdAdd() 函数根本构造合乎 cni 规范里的函数构造,最初会把后果打印到 stdout。看下 k8s.CmdAddK8s() 函数的次要逻辑:

// 次要做三件事:// 1. 往 calico store 里写个 WorkloadEndpoint 对象,和 pod 对应
// 2. 创立 veth pair,一端在容器端,并赋值 IP/MAC 地址;一端在宿主机端,赋值 MAC 地址
// 3. 创立路由,容器端创立默认网关路由;宿主机端创立该 pod ip/mac 的路由
func CmdAddK8s(ctx context.Context, args *skel.CmdArgs, conf types.NetConf, epIDs utils.WEPIdentifiers, calicoClient calicoclient.Interface, endpoint *api.WorkloadEndpoint) (*current.Result, error) {
    // ...
    // 这里依据操作系统生成不同的数据立体 data plane,这里是 linuxDataplane 对象
    d, err := dataplane.GetDataplane(conf, logger)
    // 创立 k8s client
    client, err := NewK8sClient(conf, logger)
    
    // 咱们的配置文件里 ipam.type=calico-ipam
    if conf.IPAM.Type == "host-local" {// ...}

    // ...
    
    // 这里会查看该 pod 和 namespace 的 annotation: cni.projectcalico.org/ipv4pools
    // 咱们没有设置,这里逻辑跳过
    if conf.Policy.PolicyType == "k8s" {annotNS, err := getK8sNSInfo(client, epIDs.Namespace)
        labels, annot, ports, profiles, generateName, err = getK8sPodInfo(client, epIDs.Pod, epIDs.Namespace)
        // ...
        if conf.IPAM.Type == "calico-ipam" {
            var v4pools, v6pools string
            // Sets  the Namespace annotation for IP pools as default
            v4pools = annotNS["cni.projectcalico.org/ipv4pools"]
            v6pools = annotNS["cni.projectcalico.org/ipv6pools"]
            // Gets the POD annotation for IP Pools and overwrites Namespace annotation if it exists
            v4poolpod := annot["cni.projectcalico.org/ipv4pools"]
            if len(v4poolpod) != 0 {v4pools = v4poolpod}
            // ...
        }
    }

    ipAddrsNoIpam := annot["cni.projectcalico.org/ipAddrsNoIpam"]
    ipAddrs := annot["cni.projectcalico.org/ipAddrs"]
    
    switch {
    // 次要走这个逻辑:调用 calico-ipam 插件调配一个 IP 地址
    case ipAddrs == ""&& ipAddrsNoIpam =="":
        // 咱们的 pod 没有设置 annotation "cni.projectcalico.org/ipAddrsNoIpam" 和 "cni.projectcalico.org/ipAddrs" 值
        // 这里调用 calico-ipam 插件获取 pod ip 值
        // 无关 calico-ipam 插件如何调配 pod ip 值,后续有空再学习下
        result, err = utils.AddIPAM(conf, args, logger)
        // ...
    case ipAddrs != ""&& ipAddrsNoIpam !="":
        // Can't have both ipAddrs and ipAddrsNoIpam annotations at the same time.
        e := fmt.Errorf("can't have both annotations: 'ipAddrs' and 'ipAddrsNoIpam' in use at the same time")
        logger.Error(e)
        return nil, e
    case ipAddrsNoIpam != "":
        // ...
    case ipAddrs != "":
        // ...
    }
    
    // 开始创立 WorkloadEndpoint 对象,赋值相干参数
    endpoint.Name = epIDs.WEPName
    endpoint.Namespace = epIDs.Namespace
    endpoint.Labels = labels
    endpoint.GenerateName = generateName
    endpoint.Spec.Endpoint = epIDs.Endpoint
    endpoint.Spec.Node = epIDs.Node
    endpoint.Spec.Orchestrator = epIDs.Orchestrator
    endpoint.Spec.Pod = epIDs.Pod
    endpoint.Spec.Ports = ports
    endpoint.Spec.IPNetworks = []string{}
    if conf.Policy.PolicyType == "k8s" {endpoint.Spec.Profiles = profiles} else {endpoint.Spec.Profiles = []string{conf.Name}
    }

    // calico-ipam 调配的 ip 地址值,写到 endpoint.Spec.IPNetworks 中
    if err = utils.PopulateEndpointNets(endpoint, result); err != nil {// ...}

    // 这里 desiredVethName 网卡名格局为:`"cali" + sha1(namespace.pod)[:11]`,这个网卡为置于宿主机一端
    desiredVethName := k8sconversion.NewConverter().VethNameForWorkload(epIDs.Namespace, epIDs.Pod)
    
    // DoNetworking() 函数很重要,该函数会创立 veth pair 和路由
    // 这里是调用 linuxDataplane 对象的 DoNetworking() 函数
    hostVethName, contVethMac, err := d.DoNetworking(ctx, calicoClient, args, result, desiredVethName, routes, endpoint, annot)
    
    // ...
    mac, err := net.ParseMAC(contVethMac)
    endpoint.Spec.MAC = mac.String()
    endpoint.Spec.InterfaceName = hostVethName
    endpoint.Spec.ContainerID = epIDs.ContainerID

    // ...

    // 创立或更新 WorkloadEndpoint 对象,至此到这里,会依据新建的一个 pod 对象,往 calico datastore 里写一个对应的 workloadendpoint 对象
    if _, err := utils.CreateOrUpdate(ctx, calicoClient, endpoint); err != nil {// ...}

    // Add the interface created above to the CNI result.
    result.Interfaces = append(result.Interfaces, &current.Interface{Name: endpoint.Spec.InterfaceName},
    )

    return result, nil
}

以上代码最初会创立个 workloadendpoint 对象,同时 DoNetworking() 函数很重要,这个函数里会创立路由和 veth pair。
而后看下 linuxDataplane 对象的 DoNetworking() 函数,是如何创立 veth pair 和 routes 的。这里次要调用了 github.com/vishvananda/netlink golang 包来增删改查网卡和路由等操作,等同于执行 ip link add/delete/set xxx 等命令,该 golang 包也是个很好用的包,被很多次要我的项目如 k8s 我的项目应用,在学习 linux 网络相干常识时能够利用这个包写一写相干 demo,效率也高很多。这里看看 calico 如何应用 netlink 这个包来创立 routes 和 veth pair 的:


func (d *linuxDataplane) DoNetworking(
    ctx context.Context,
    calicoClient calicoclient.Interface,
    args *skel.CmdArgs,
    result *current.Result,
    desiredVethName string,
    routes []*net.IPNet,
    endpoint *api.WorkloadEndpoint,
    annotations map[string]string,
) (hostVethName, contVethMAC string, err error) {// 这里 desiredVethName 网卡名格局为:`"cali" + sha1(namespace.pod)[:11]`,这个网卡为置于宿主机一端
    hostVethName = desiredVethName
    // 容器这端网卡名个别为 eth0
    contVethName := args.IfName

    err = ns.WithNetNSPath(args.Netns, func(hostNS ns.NetNS) error {
        veth := &netlink.Veth{
            LinkAttrs: netlink.LinkAttrs{
                Name: contVethName,
                MTU:  d.mtu,
            },
            PeerName: hostVethName,
        }
        // 创立 veth peer,容器端网卡名是 eth0,宿主机端网卡名是 "cali" + sha1(namespace.pod)[:11]
        // 等于 ip link add xxx type veth peer name xxx 命令
        if err := netlink.LinkAdd(veth); err != nil { }
        hostVeth, err := netlink.LinkByName(hostVethName)
        if mac, err := net.ParseMAC("EE:EE:EE:EE:EE:EE"); err != nil { } else {
            // 设置宿主机端网卡的 mac 地址,为 ee:ee:ee:ee:ee:ee
            if err = netlink.LinkSetHardwareAddr(hostVeth, mac); err != nil {d.logger.Warnf("failed to Set MAC of %q: %v. Using kernel generated MAC.", hostVethName, err)
            }
        }

        // ...
        hasIPv4 = true

        // ip link set up 起来宿主机端这边的网卡
        if err = netlink.LinkSetUp(hostVeth); err != nil { }
        // ip link set up 起来容器端这边的网卡
        contVeth, err := netlink.LinkByName(contVethName)
        if err = netlink.LinkSetUp(contVeth); err != nil { }
        // Fetch the MAC from the container Veth. This is needed by Calico.
        contVethMAC = contVeth.Attrs().HardwareAddr.String()
        if hasIPv4 {
            // 容器端这边增加默认网关路由,如:// default via 169.254.1.1 dev eth0
            // 169.254.1.1 dev eth0 scope link
            gw := net.IPv4(169, 254, 1, 1)
            gwNet := &net.IPNet{IP: gw, Mask: net.CIDRMask(32, 32)}
            err := netlink.RouteAdd(
                &netlink.Route{LinkIndex: contVeth.Attrs().Index,
                    Scope:     netlink.SCOPE_LINK,
                    Dst:       gwNet,
                },
            )
        }

        // 把从 calico-ipam 插件调配来的 pod ip 地址赋值给容器端这边的网卡
        for _, addr := range result.IPs {if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil {return fmt.Errorf("failed to add IP addr to %q: %v", contVeth, err)
            }
        }
        // ...
        // 切换到宿主机端 network namespace
        if err = netlink.LinkSetNsFd(hostVeth, int(hostNS.Fd())); err != nil {return fmt.Errorf("failed to move veth to host netns: %v", err)
        }

        return nil
    })

    // 设置 veth pair 宿主机端的网卡 sysctls 配置,设置这个网卡能够转发和 arp_proxy
    err = d.configureSysctls(hostVethName, hasIPv4, hasIPv6)

    // ip link set up 起来宿主机这端的 veth pair 的网卡
    hostVeth, err := netlink.LinkByName(hostVethName)
    if err = netlink.LinkSetUp(hostVeth); err != nil {return "","", fmt.Errorf("failed to set %q up: %v", hostVethName, err)
    }

    // 配置宿主机这端的路由
    err = SetupRoutes(hostVeth, result)

    return hostVethName, contVethMAC, err
}

func SetupRoutes(hostVeth netlink.Link, result *current.Result) error {
    // 配置宿主机端这边的路由,但凡目标地址为 pod ip 10.217.120.85,数据包进入 calid0bda9976d5 网卡,路由如:// 10.217.120.85 dev calid0bda9976d5 scope link
    for _, ipAddr := range result.IPs {
        route := netlink.Route{LinkIndex: hostVeth.Attrs().Index,
            Scope:     netlink.SCOPE_LINK,
            Dst:       &ipAddr.Address,
        }
        err := netlink.RouteAdd(&route)
        // ...
    }
    return nil
}

// 这里英文就不翻译解释了,英文备注说的更具体通透。// configureSysctls configures necessary sysctls required for the host side of the veth pair for IPv4 and/or IPv6.
func (d *linuxDataplane) configureSysctls(hostVethName string, hasIPv4, hasIPv6 bool) error {
  var err error
  if hasIPv4 {
    // Normally, the kernel has a delay before responding to proxy ARP but we know
    // that's not needed in a Calico network so we disable it.
    if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/proxy_delay", hostVethName), "0"); err != nil {return fmt.Errorf("failed to set net.ipv4.neigh.%s.proxy_delay=0: %s", hostVethName, err)
    }
    
    // Enable proxy ARP, this makes the host respond to all ARP requests with its own
    // MAC. We install explicit routes into the containers network
    // namespace and we use a link-local address for the gateway.  Turing on proxy ARP
    // means that we don't need to assign the link local address explicitly to each
    // host side of the veth, which is one fewer thing to maintain and one fewer
    // thing we may clash over.
    if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/proxy_arp", hostVethName), "1"); err != nil {return fmt.Errorf("failed to set net.ipv4.conf.%s.proxy_arp=1: %s", hostVethName, err)
    }
    
    // Enable IP forwarding of packets coming _from_ this interface.  For packets to
    // be forwarded in both directions we need this flag to be set on the fabric-facing
    // interface too (or for the global default to be set).
    if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/forwarding", hostVethName), "1"); err != nil {return fmt.Errorf("failed to set net.ipv4.conf.%s.forwarding=1: %s", hostVethName, err)
    }
  }

  if hasIPv6 {// ...}
  
  return nil
}

总结

至此,calico 二进制插件就为一个 sandbox container 创立好了网络资源,即创立了一个 veth pair,并别离为宿主机端和容器端网卡设置好对应 MAC 地址,以及为容器段配置好了 IP 地址,同时还在容器端配置好了路由默认网关,以及宿主机端配置好路由,让指标地址是 sandbox container ip 的进入宿主机端 veth pair 网卡,同时还为宿主机端网卡配置 arp proxy 和 packet forwarding 性能,
最初,会依据这些网络数据生成一个 workloadendpoint 对象存入 calico datastore 里。

然而,还是短少了一个要害逻辑,calico-ipam 是如何调配 IP 地址的,后续有空在学习记录。

正文完
 0