Overview
calico 插件代码仓库在 projectcalico/cni-plugin,并且会编译两个二进制文件:calico 和 calico-ipam,其中 calico 会为 sandbox container 创立 route 和虚构网卡 virtual interface,以及 veth pair 等网络资源,并且会把相干数据写入 calico datastore 数据库里;calico-ipam 会为以后 pod 从以后节点的 pod 网段内调配 ip 地址,当然以后节点还没有 pod 网段,会从集群网段 cluster cidr 中先调配出该节点的 pod cidr,并把相干数据写入 calico datastore 数据库里,这里 cluster cidr 是用户本人定义的,曾经提前写入了 calico datastore,并且从 cluster cidr 中划分的 block size 也是能够自定义的 (新版本 calico/node 容器能够反对自定义,老版本 calico 不反对),能够参考官网文档 change-block-size。
接下来重点看下 calico 二进制插件具体是如何工作的,后续再看 calico-ipam 二进制插件如何调配 ip 地址的。
calico plugin 源码解析
calico 插件是遵循 cni 标准接口,实现了 ADD
和 DEL
命令,这里重点看看 ADD
命令时如何实现的。calico 首先会注册 ADD
和 DEL
命令,代码在 L614-L677:
func Main(version string) {
// ...
err := flagSet.Parse(os.Args[1:])
// ...
// 注册 `ADD` 和 `DEL` 命令
skel.PluginMain(cmdAdd, nil, cmdDel,
cniSpecVersion.PluginSupports("0.1.0", "0.2.0", "0.3.0", "0.3.1"),
"Calico CNI plugin"+version)
}
ADD
命令里,次要做了三个逻辑:
- 查问 calico datastore 里有没有 WorkloadEndpoint 对象和以后的 pod 名字匹配,没有匹配,则会创立新的 WorkloadEndpoint 对象,该对象内次要保留该 pod 在 host network namespace 内的网卡名字和 pod ip 地址,以及 container network namespace 的网卡名字等等信息,对象示例如下。
- 创立一个 veth pair,并把其中一个网卡置于宿主机端网络命名空间,另一个置于容器端网络命名空间。在 container network namespace 内创立网卡如 eth0,并通过调用 calico-ipam 取得的 IP 地址赋值给该 eth0 网卡;在 host network namespace 内创立网卡,网卡名格局为
"cali" + sha1(namespace.pod)[:11]
,并设置 MAC 地址 ”ee:ee:ee:ee:ee:ee”。 - 在容器端和宿主机端创立路由。在容器端,设置默认网关为
169.254.1.1
,该网关地址代码写死的;在宿主机端,增加路由如10.217.120.85 dev calid0bda9976d5 scope link
,其中10.217.120.85
是 pod ip 地址,calid0bda9976d5
是该 pod 在宿主机端的网卡,也就是 veth pair 在宿主机这端的 virtual ethernet interface 虚构网络设备。
一个 WorkloadEndpoint 对象示例如下,一个 k8s pod 对象对应着 calico 中的一个 workloadendpoint 对象,能够通过 calicoctl get wep -o wide
查看所有 workloadendpoint。
记得配置 calico datastore 为 kubernetes 的,为不便能够在 ~/.zshrc
里配置环境变量:
# calico
export CALICO_DATASTORE_TYPE=kubernetes
export CALICO_KUBECONFIG=~/.kube/config
apiVersion: projectcalico.org/v3
kind: WorkloadEndpoint
metadata:
creationTimestamp: "2021-01-09T08:38:56Z"
generateName: nginx-demo-1-7f67f8bdd8-
labels:
app: nginx-demo-1
pod-template-hash: 7f67f8bdd8
projectcalico.org/namespace: default
projectcalico.org/orchestrator: k8s
projectcalico.org/serviceaccount: default
name: minikube-k8s-nginx--demo--1--7f67f8bdd8--d5wsc-eth0
namespace: default
resourceVersion: "557760"
uid: 85d1d33f-f55f-4f28-a89d-0a55394311db
spec:
endpoint: eth0
interfaceName: calife8e5922caa
ipNetworks:
- 10.217.120.84/32
node: minikube
orchestrator: k8s
pod: nginx-demo-1-7f67f8bdd8-d5wsc
profiles:
- kns.default
- ksa.default.default
依据以上三个次要逻辑,看下 cmdAdd 函数代码:
func cmdAdd(args *skel.CmdArgs) (err error) {
// ...
// 从 args.StdinData 里加载配置数据,这些配置数据其实就是
// `--cni-conf-dir` 传进来的文件内容,即 cni 配置参数,见第一篇文章
// types.NetConf 构造体数据结构也对应着 cni 配置文件里的数据
conf := types.NetConf{}
if err := json.Unmarshal(args.StdinData, &conf); err != nil {return fmt.Errorf("failed to load netconf: %v", err)
}
// 这里能够通过 cni 参数设置,把 calico 插件的日志落地到宿主机文件内
// "log_level": "debug", "log_file_path": "/var/log/calico/cni/cni.log",
utils.ConfigureLogging(conf)
// ...
// 能够在 cni 文件内设置 MTU,即 Max Transmit Unit 最大传输单元,配置网卡时须要
if mtu, err := utils.MTUFromFile("/var/lib/calico/mtu"); err != nil {return fmt.Errorf("failed to read MTU file: %s", err)
} else if conf.MTU == 0 && mtu != 0 {conf.MTU = mtu}
// 结构一个 WEPIdentifiers 对象,并赋值
nodename := utils.DetermineNodename(conf)
wepIDs, err := utils.GetIdentifiers(args, nodename)
calicoClient, err := utils.CreateClient(conf)
// 查看 datastore 是否曾经 ready 了,能够 `calicoctl get clusterinformation default -o yaml` 查看
ci, err := calicoClient.ClusterInformation().Get(ctx, "default", options.GetOptions{})
if !*ci.Spec.DatastoreReady {return}
// list 出前缀为 wepPrefix 的 workloadEndpoint,一个 pod 对应一个 workloadEndpoint,如果数据库里能匹配出 workloadEndpoint,就应用这个 workloadEndpoint
// 否则最初创立完 pod network 资源后,会往 calico 数据库里写一个 workloadEndpoint
wepPrefix, err := wepIDs.CalculateWorkloadEndpointName(true)
endpoints, err := calicoClient.WorkloadEndpoints().List(ctx, options.ListOptions{Name: wepPrefix, Namespace: wepIDs.Namespace, Prefix: true})
if err != nil {return}
// 对于新建的 pod,最初会在 calico datastore 里写一个对应的新的 workloadendpoint 对象
var endpoint *api.WorkloadEndpoint
// 这里因为咱们是新建的 pod,数据库里也不会有对应的 workloadEndpoint 对象,所以 endpoints 必然是 nil 的
if len(endpoints.Items) > 0 {// ...}
// 既然 endpoint 是 nil,则填充 WEPIdentifiers 对象默认值,这里 args.IfName 是 kubelet 那边传过来的,就是容器端网卡名字,个别是 eth0
// 这里 WEPName 的格局为:{node_name}-k8s-{strings.replace(pod_name, "-", "--")}-{wepIDs.Endpoint},比方上文
// minikube-k8s-nginx--demo--1--7f67f8bdd8--d5wsc-eth0 WorkloadEndpoint 对象
if endpoint == nil {
wepIDs.Endpoint = args.IfName
wepIDs.WEPName, err = wepIDs.CalculateWorkloadEndpointName(false)
}
// Orchestrator 是 k8s
if wepIDs.Orchestrator == api.OrchestratorKubernetes {
// k8s.CmdAddK8s 函数里做以上三个逻辑工作
if result, err = k8s.CmdAddK8s(ctx, args, conf, *wepIDs, calicoClient, endpoint); err != nil {return}
} else {// ...}
// 咱们的配置文件里 policy.type 是 k8s,可见上文配置文件
if conf.Policy.PolicyType == "" {// ...}
// Print result to stdout, in the format defined by the requested cniVersion.
err = cnitypes.PrintResult(result, conf.CNIVersion)
return
}
以上 cmdAdd() 函数根本构造合乎 cni 规范里的函数构造,最初会把后果打印到 stdout。看下 k8s.CmdAddK8s() 函数的次要逻辑:
// 次要做三件事:// 1. 往 calico store 里写个 WorkloadEndpoint 对象,和 pod 对应
// 2. 创立 veth pair,一端在容器端,并赋值 IP/MAC 地址;一端在宿主机端,赋值 MAC 地址
// 3. 创立路由,容器端创立默认网关路由;宿主机端创立该 pod ip/mac 的路由
func CmdAddK8s(ctx context.Context, args *skel.CmdArgs, conf types.NetConf, epIDs utils.WEPIdentifiers, calicoClient calicoclient.Interface, endpoint *api.WorkloadEndpoint) (*current.Result, error) {
// ...
// 这里依据操作系统生成不同的数据立体 data plane,这里是 linuxDataplane 对象
d, err := dataplane.GetDataplane(conf, logger)
// 创立 k8s client
client, err := NewK8sClient(conf, logger)
// 咱们的配置文件里 ipam.type=calico-ipam
if conf.IPAM.Type == "host-local" {// ...}
// ...
// 这里会查看该 pod 和 namespace 的 annotation: cni.projectcalico.org/ipv4pools
// 咱们没有设置,这里逻辑跳过
if conf.Policy.PolicyType == "k8s" {annotNS, err := getK8sNSInfo(client, epIDs.Namespace)
labels, annot, ports, profiles, generateName, err = getK8sPodInfo(client, epIDs.Pod, epIDs.Namespace)
// ...
if conf.IPAM.Type == "calico-ipam" {
var v4pools, v6pools string
// Sets the Namespace annotation for IP pools as default
v4pools = annotNS["cni.projectcalico.org/ipv4pools"]
v6pools = annotNS["cni.projectcalico.org/ipv6pools"]
// Gets the POD annotation for IP Pools and overwrites Namespace annotation if it exists
v4poolpod := annot["cni.projectcalico.org/ipv4pools"]
if len(v4poolpod) != 0 {v4pools = v4poolpod}
// ...
}
}
ipAddrsNoIpam := annot["cni.projectcalico.org/ipAddrsNoIpam"]
ipAddrs := annot["cni.projectcalico.org/ipAddrs"]
switch {
// 次要走这个逻辑:调用 calico-ipam 插件调配一个 IP 地址
case ipAddrs == ""&& ipAddrsNoIpam =="":
// 咱们的 pod 没有设置 annotation "cni.projectcalico.org/ipAddrsNoIpam" 和 "cni.projectcalico.org/ipAddrs" 值
// 这里调用 calico-ipam 插件获取 pod ip 值
// 无关 calico-ipam 插件如何调配 pod ip 值,后续有空再学习下
result, err = utils.AddIPAM(conf, args, logger)
// ...
case ipAddrs != ""&& ipAddrsNoIpam !="":
// Can't have both ipAddrs and ipAddrsNoIpam annotations at the same time.
e := fmt.Errorf("can't have both annotations: 'ipAddrs' and 'ipAddrsNoIpam' in use at the same time")
logger.Error(e)
return nil, e
case ipAddrsNoIpam != "":
// ...
case ipAddrs != "":
// ...
}
// 开始创立 WorkloadEndpoint 对象,赋值相干参数
endpoint.Name = epIDs.WEPName
endpoint.Namespace = epIDs.Namespace
endpoint.Labels = labels
endpoint.GenerateName = generateName
endpoint.Spec.Endpoint = epIDs.Endpoint
endpoint.Spec.Node = epIDs.Node
endpoint.Spec.Orchestrator = epIDs.Orchestrator
endpoint.Spec.Pod = epIDs.Pod
endpoint.Spec.Ports = ports
endpoint.Spec.IPNetworks = []string{}
if conf.Policy.PolicyType == "k8s" {endpoint.Spec.Profiles = profiles} else {endpoint.Spec.Profiles = []string{conf.Name}
}
// calico-ipam 调配的 ip 地址值,写到 endpoint.Spec.IPNetworks 中
if err = utils.PopulateEndpointNets(endpoint, result); err != nil {// ...}
// 这里 desiredVethName 网卡名格局为:`"cali" + sha1(namespace.pod)[:11]`,这个网卡为置于宿主机一端
desiredVethName := k8sconversion.NewConverter().VethNameForWorkload(epIDs.Namespace, epIDs.Pod)
// DoNetworking() 函数很重要,该函数会创立 veth pair 和路由
// 这里是调用 linuxDataplane 对象的 DoNetworking() 函数
hostVethName, contVethMac, err := d.DoNetworking(ctx, calicoClient, args, result, desiredVethName, routes, endpoint, annot)
// ...
mac, err := net.ParseMAC(contVethMac)
endpoint.Spec.MAC = mac.String()
endpoint.Spec.InterfaceName = hostVethName
endpoint.Spec.ContainerID = epIDs.ContainerID
// ...
// 创立或更新 WorkloadEndpoint 对象,至此到这里,会依据新建的一个 pod 对象,往 calico datastore 里写一个对应的 workloadendpoint 对象
if _, err := utils.CreateOrUpdate(ctx, calicoClient, endpoint); err != nil {// ...}
// Add the interface created above to the CNI result.
result.Interfaces = append(result.Interfaces, ¤t.Interface{Name: endpoint.Spec.InterfaceName},
)
return result, nil
}
以上代码最初会创立个 workloadendpoint 对象,同时 DoNetworking() 函数很重要,这个函数里会创立路由和 veth pair。
而后看下 linuxDataplane 对象的 DoNetworking() 函数,是如何创立 veth pair 和 routes 的。这里次要调用了 github.com/vishvananda/netlink
golang 包来增删改查网卡和路由等操作,等同于执行 ip link add/delete/set xxx
等命令,该 golang 包也是个很好用的包,被很多次要我的项目如 k8s 我的项目应用,在学习 linux 网络相干常识时能够利用这个包写一写相干 demo,效率也高很多。这里看看 calico 如何应用 netlink 这个包来创立 routes 和 veth pair 的:
func (d *linuxDataplane) DoNetworking(
ctx context.Context,
calicoClient calicoclient.Interface,
args *skel.CmdArgs,
result *current.Result,
desiredVethName string,
routes []*net.IPNet,
endpoint *api.WorkloadEndpoint,
annotations map[string]string,
) (hostVethName, contVethMAC string, err error) {// 这里 desiredVethName 网卡名格局为:`"cali" + sha1(namespace.pod)[:11]`,这个网卡为置于宿主机一端
hostVethName = desiredVethName
// 容器这端网卡名个别为 eth0
contVethName := args.IfName
err = ns.WithNetNSPath(args.Netns, func(hostNS ns.NetNS) error {
veth := &netlink.Veth{
LinkAttrs: netlink.LinkAttrs{
Name: contVethName,
MTU: d.mtu,
},
PeerName: hostVethName,
}
// 创立 veth peer,容器端网卡名是 eth0,宿主机端网卡名是 "cali" + sha1(namespace.pod)[:11]
// 等于 ip link add xxx type veth peer name xxx 命令
if err := netlink.LinkAdd(veth); err != nil { }
hostVeth, err := netlink.LinkByName(hostVethName)
if mac, err := net.ParseMAC("EE:EE:EE:EE:EE:EE"); err != nil { } else {
// 设置宿主机端网卡的 mac 地址,为 ee:ee:ee:ee:ee:ee
if err = netlink.LinkSetHardwareAddr(hostVeth, mac); err != nil {d.logger.Warnf("failed to Set MAC of %q: %v. Using kernel generated MAC.", hostVethName, err)
}
}
// ...
hasIPv4 = true
// ip link set up 起来宿主机端这边的网卡
if err = netlink.LinkSetUp(hostVeth); err != nil { }
// ip link set up 起来容器端这边的网卡
contVeth, err := netlink.LinkByName(contVethName)
if err = netlink.LinkSetUp(contVeth); err != nil { }
// Fetch the MAC from the container Veth. This is needed by Calico.
contVethMAC = contVeth.Attrs().HardwareAddr.String()
if hasIPv4 {
// 容器端这边增加默认网关路由,如:// default via 169.254.1.1 dev eth0
// 169.254.1.1 dev eth0 scope link
gw := net.IPv4(169, 254, 1, 1)
gwNet := &net.IPNet{IP: gw, Mask: net.CIDRMask(32, 32)}
err := netlink.RouteAdd(
&netlink.Route{LinkIndex: contVeth.Attrs().Index,
Scope: netlink.SCOPE_LINK,
Dst: gwNet,
},
)
}
// 把从 calico-ipam 插件调配来的 pod ip 地址赋值给容器端这边的网卡
for _, addr := range result.IPs {if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil {return fmt.Errorf("failed to add IP addr to %q: %v", contVeth, err)
}
}
// ...
// 切换到宿主机端 network namespace
if err = netlink.LinkSetNsFd(hostVeth, int(hostNS.Fd())); err != nil {return fmt.Errorf("failed to move veth to host netns: %v", err)
}
return nil
})
// 设置 veth pair 宿主机端的网卡 sysctls 配置,设置这个网卡能够转发和 arp_proxy
err = d.configureSysctls(hostVethName, hasIPv4, hasIPv6)
// ip link set up 起来宿主机这端的 veth pair 的网卡
hostVeth, err := netlink.LinkByName(hostVethName)
if err = netlink.LinkSetUp(hostVeth); err != nil {return "","", fmt.Errorf("failed to set %q up: %v", hostVethName, err)
}
// 配置宿主机这端的路由
err = SetupRoutes(hostVeth, result)
return hostVethName, contVethMAC, err
}
func SetupRoutes(hostVeth netlink.Link, result *current.Result) error {
// 配置宿主机端这边的路由,但凡目标地址为 pod ip 10.217.120.85,数据包进入 calid0bda9976d5 网卡,路由如:// 10.217.120.85 dev calid0bda9976d5 scope link
for _, ipAddr := range result.IPs {
route := netlink.Route{LinkIndex: hostVeth.Attrs().Index,
Scope: netlink.SCOPE_LINK,
Dst: &ipAddr.Address,
}
err := netlink.RouteAdd(&route)
// ...
}
return nil
}
// 这里英文就不翻译解释了,英文备注说的更具体通透。// configureSysctls configures necessary sysctls required for the host side of the veth pair for IPv4 and/or IPv6.
func (d *linuxDataplane) configureSysctls(hostVethName string, hasIPv4, hasIPv6 bool) error {
var err error
if hasIPv4 {
// Normally, the kernel has a delay before responding to proxy ARP but we know
// that's not needed in a Calico network so we disable it.
if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/proxy_delay", hostVethName), "0"); err != nil {return fmt.Errorf("failed to set net.ipv4.neigh.%s.proxy_delay=0: %s", hostVethName, err)
}
// Enable proxy ARP, this makes the host respond to all ARP requests with its own
// MAC. We install explicit routes into the containers network
// namespace and we use a link-local address for the gateway. Turing on proxy ARP
// means that we don't need to assign the link local address explicitly to each
// host side of the veth, which is one fewer thing to maintain and one fewer
// thing we may clash over.
if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/proxy_arp", hostVethName), "1"); err != nil {return fmt.Errorf("failed to set net.ipv4.conf.%s.proxy_arp=1: %s", hostVethName, err)
}
// Enable IP forwarding of packets coming _from_ this interface. For packets to
// be forwarded in both directions we need this flag to be set on the fabric-facing
// interface too (or for the global default to be set).
if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/forwarding", hostVethName), "1"); err != nil {return fmt.Errorf("failed to set net.ipv4.conf.%s.forwarding=1: %s", hostVethName, err)
}
}
if hasIPv6 {// ...}
return nil
}
总结
至此,calico 二进制插件就为一个 sandbox container 创立好了网络资源,即创立了一个 veth pair,并别离为宿主机端和容器端网卡设置好对应 MAC 地址,以及为容器段配置好了 IP 地址,同时还在容器端配置好了路由默认网关,以及宿主机端配置好路由,让指标地址是 sandbox container ip 的进入宿主机端 veth pair 网卡,同时还为宿主机端网卡配置 arp proxy 和 packet forwarding 性能,
最初,会依据这些网络数据生成一个 workloadendpoint 对象存入 calico datastore 里。
然而,还是短少了一个要害逻辑,calico-ipam 是如何调配 IP 地址的,后续有空在学习记录。