关于腾讯云:一文读懂-SuperEdge-分布式健康检查-边端

37次阅读

共计 22691 个字符,预计需要花费 57 分钟才能阅读完成。

作者:杜杨浩,腾讯云高级工程师,热衷于开源、容器和 Kubernetes。目前次要从事镜像仓库、Kubernetes 集群高可用 & 备份还原,以及边缘计算相干研发工作。

前言

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该零碎把云原生能力扩大到边缘侧,很好的实现了云端对边缘端的治理和管制,极大简化了利用从云端部署到边缘端的过程。同时 SuperEdge 设计了分布式健康检查机制躲避了云边网络不稳固造成的大量 pod 迁徙和重建,保障了服务的稳固。

边缘计算场景下,边缘节点与云端的网络环境十分复杂,连贯并不牢靠,在原生 Kubernetes 集群中,会造成 apiserver 和节点连贯的中断,节点状态的异样,最终导致 pod 的驱赶和 endpoint 的缺失,造成服务的中断和稳定,具体来说原生 Kubernetes 解决如下:

  • 失联的节点被置为 ConditionUnknown 状态,并被增加 NoSchedule 和 NoExecute 的 taints
  • 失联的节点上的 pod 被驱赶,并在其余节点上进行重建
  • 失联的节点上的 pod 从 Service 的 Endpoint 列表中移除

因而,边缘计算场景仅仅依赖边端和 apiserver 的连贯状况是不足以判断节点是否异样的,会因为网络的不牢靠造成误判,影响失常服务。而相较于云端和边缘端的连贯,显然边端节点之间的连贯更为稳固,具备更高的参考价值,因而 superedge 提出了边缘分布式健康检查机制。该机制中节点状态断定除了要思考 apiserver 的因素外,还引入了节点的评估因素,进而对节点进行更为全面的状态判断。通过这个性能,可能防止因为云边网络不牢靠造成的大量的 pod 迁徙和重建,保障服务的稳固

具体来说,次要通过如下三个层面加强节点状态判断的准确性:

  • 每个节点定期探测其余节点衰弱状态
  • 集群内所有节点定期投票决定各节点的状态
  • 云端和边端节点独特决定节点状态

而分布式健康检查最终的判断解决如下:

edge-health-daemon 源码剖析

在深刻源码之前先介绍一下分布式健康检查的实现原理,其架构图如下所示:

Kubernetes 每个 node 在 kube-node-lease namespace 下会对应一个 Lease object,kubelet 每隔 node-status-update-frequency 工夫 (默认 10s) 会更新对应 node 的 Lease object

node-controller 会每隔 node-monitor-period 工夫 (默认 5s) 查看 Lease object 是否更新,如果超过 node-monitor-grace-period 工夫 (默认 40s) 没有产生过更新,则认为这个 node 不衰弱,会更新 NodeStatus(ConditionUnknown)

而当节点心跳超时 (ConditionUnknown) 之后,node controller 会给该 node 增加如下 taints:

spec:
  ...
  taints:
  - effect: NoSchedule
    key: node.kubernetes.io/unreachable
    timeAdded: "2020-07-02T03:50:47Z"
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    timeAdded: "2020-07-02T03:50:53Z"

同时,endpoint controller 会从 endpoint backend 中踢掉该母机上的所有 pod

对于打上 NoSchedule taint 的母机,Scheduler 不会调度新的负载在该 node 上了;而对于打上 NoExecute(node.kubernetes.io/unreachable) taint 的母机,node controller 会在节点心跳超时之后一段时间 (默认 5mins) 驱赶该节点上的 pod

分布式健康检查边端的 edge-health-daemon 组件会对同区域边缘节点执行分布式健康检查,并向 apiserver 发送衰弱状态投票后果(给 node 打 annotation)

此外,为了实现在云边断连且分布式健康检查状态失常的状况下:

  • 失联的节点上的 pod 不会从 Service 的 Endpoint 列表中移除
  • 失联的节点上的 pod 不会被驱赶

还须要在云端运行 edge-health-admission(Kubernetes mutating admission webhook),一直依据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉 NoExecute taint)以及 endpoints (将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses 中),从而实现云端和边端独特决定节点状态

本章将次要介绍 edge-health-daemon 原理,如下为 edge-health-daemon 的相干数据结构:

type EdgeHealthMetadata struct {
    *NodeMetadata
    *CheckMetadata
}
type NodeMetadata struct {NodeList []v1.Node
    sync.RWMutex
}
type CheckMetadata struct {CheckInfo            map[string]map[string]CheckDetail // Checker ip:{Checked ip:Check detail}
    CheckPluginScoreInfo map[string]map[string]float64     // Checked ip:{Plugin name:Check score}
    sync.RWMutex
}
type CheckDetail struct {
    Normal bool
    Time   time.Time
}
type CommunInfo struct {
    SourceIP    string                 // ClientIP,Checker ip
    CheckDetail map[string]CheckDetail // Checked ip:Check detail
    Hmac        string
}

含意如下:

  • NodeMetadata:为了实现分区域分布式健康检查机制而保护的边缘节点 cache,其中蕴含该区域内的所有边缘节点列表 NodeList
  • CheckMetadata:寄存健康检查的后果,具体来说包含两个数据结构:

    • CheckPluginScoreInfo:为 Checked ip:{Plugin name:Check score} 组织模式。第一级 key 示意:被查看的 ip;第二级 key 示意:查看插件的名称;value 示意:查看分数
    • CheckInfo:为 Checker ip:{Checked ip:Check detail} 组织模式。第一级 key 示意:执行查看的 ip;第二级 key 示意:被查看的 ip;value 示意查看后果 CheckDetail
  • CheckDetail:代表健康检查的后果

    • Normal:Normal 为 true 示意查看后果失常;false 示意异样
    • Time:示意得出该后果时的工夫,用于后果有效性的判断(超过一段时间没有更新的后果将有效)
  • CommunInfo:边缘节点向其它节点发送健康检查后果时应用的数据,其中包含:

    • SourceIP:示意执行查看的 ip
    • CheckDetail:为 Checked ip:Check detail 组织模式,蕴含被查看的 ip 以及查看后果
    • Hmac:SourceIP 以及 CheckDetail 进行 hmac 失去,用于边缘节点通信过程中判断传输数据的有效性(是否被篡改)

edge-health-daemon 主体逻辑包含四局部性能:

  • SyncNodeList:依据边缘节点所在的 zone 刷新 node cache,同时更新 CheckMetadata 相干数据
  • ExecuteCheck:对每个边缘节点执行若干品种的健康检查插件(ping,kubelet 等),并将各插件查看分数汇总,依据用户设置的基准线得出节点是否衰弱的后果
  • Commun:将本节点对其它各节点健康检查的后果发送给其它节点
  • Vote:对所有节点健康检查的后果分类,如果某个节点被大多数 (>1/2) 节点断定为失常,则对该节点增加superedgehealth/node-health:true annotation,表明该节点分布式健康检查后果为失常;否则,对该节点增加superedgehealth/node-health:false annotation,表明该节点分布式健康检查后果为异样

上面顺次对上述性能进行源码剖析:

1、SyncNodeList

SyncNodeList 每隔 HealthCheckPeriod 秒 (health-check-period 选项) 执行一次,会依照如下状况分类刷新 node cache:

  • 如果 kube-system namespace 下不存在名为 edge-health-zone-config 的 configmap,则没有开启多地区探测,因而会获取所有边缘节点列表并刷新 node cache
  • 否则,如果 edge-health-zone-config 的 configmap 数据局部 TaintZoneAdmission 为 false,则没有开启多地区探测,因而会获取所有边缘节点列表并刷新 node cache
  • 如果 TaintZoneAdmission 为 true,且 node 有 ”superedgehealth/topology-zone” 标签(标示区域),则获取 ”superedgehealth/topology-zone” label value 雷同的节点列表并刷新 node cache
  • 如果 node 没有 ”superedgehealth/topology-zone” label,则只会将边缘节点自身增加到分布式健康检查节点列表中并刷新 node cache(only itself)
func (ehd *EdgeHealthDaemon) SyncNodeList() {
    // Only sync nodes when self-located found
    var host *v1.Node
    if host = ehd.metadata.GetNodeByName(ehd.cfg.Node.HostName); host == nil {klog.Errorf("Self-hostname %s not found", ehd.cfg.Node.HostName)
        return
    }
    // Filter cloud nodes and retain edge ones
    masterRequirement, err := labels.NewRequirement(common.MasterLabel, selection.DoesNotExist, []string{})
    if err != nil {klog.Errorf("New masterRequirement failed %+v", err)
        return
    }
    masterSelector := labels.NewSelector()
    masterSelector = masterSelector.Add(*masterRequirement)
    if mrc, err := ehd.cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.TaintZoneConfigMap); err != nil {if apierrors.IsNotFound(err) { // multi-region configmap not found
            if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {klog.Errorf("Multi-region configmap not found and get nodes err %+v", err)
                return
            } else {ehd.metadata.SetByNodeList(NodeList)
            }
        } else {klog.Errorf("Get multi-region configmap err %+v", err)
            return
        }
    } else { // multi-region configmap found
        mrcv := mrc.Data[common.TaintZoneConfigMapKey]
        klog.V(4).Infof("Multi-region value is %s", mrcv)
        if mrcv == "false" { // close multi-region check
            if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {klog.Errorf("Multi-region configmap exist but disabled and get nodes err %+v", err)
                return
            } else {ehd.metadata.SetByNodeList(NodeList)
            }
        } else { // open multi-region check
            if hostZone, existed := host.Labels[common.TopologyZone]; existed {klog.V(4).Infof("Host %s has HostZone %s", host.Name, hostZone)
                zoneRequirement, err := labels.NewRequirement(common.TopologyZone, selection.Equals, []string{hostZone})
                if err != nil {klog.Errorf("New masterZoneRequirement failed: %+v", err)
                    return
                }
                masterZoneSelector := labels.NewSelector()
                masterZoneSelector = masterZoneSelector.Add(*masterRequirement, *zoneRequirement)
                if nodeList, err := ehd.nodeLister.List(masterZoneSelector); err != nil {klog.Errorf("TopologyZone label for hostname %s but get nodes err: %+v", host.Name, err)
                    return
                } else {ehd.metadata.SetByNodeList(nodeList)
                }
            } else { // Only check itself if there is no TopologyZone label
                klog.V(4).Infof("Only check itself since there is no TopologyZone label for hostname %s", host.Name)
                ehd.metadata.SetByNodeList([]*v1.Node{host})
            }
        }
    }
    // Init check plugin score
    ipList := make(map[string]struct{})
    for _, node := range ehd.metadata.Copy() {
        for _, addr := range node.Status.Addresses {
            if addr.Type == v1.NodeInternalIP {ipList[addr.Address] = struct{}{}
                ehd.metadata.InitCheckPluginScore(addr.Address)
            }
        }
    }
    // Delete redundant check plugin score
    for _, checkedIp := range ehd.metadata.CopyCheckedIp() {if _, existed := ipList[checkedIp]; !existed {ehd.metadata.DeleteCheckPluginScore(checkedIp)
        }
    }
    // Delete redundant check info
    for checkerIp := range ehd.metadata.CopyAll() {if _, existed := ipList[checkerIp]; !existed {ehd.metadata.DeleteByIp(ehd.cfg.Node.LocalIp, checkerIp)
        }
    }
    klog.V(4).Infof("SyncNodeList check info %+v successfully", ehd.metadata)
}
...
func (cm *CheckMetadata) DeleteByIp(localIp, ip string) {cm.Lock()
    defer cm.Unlock()
    delete(cm.CheckInfo[localIp], ip)
    delete(cm.CheckInfo, ip)
}

在依照如上逻辑更新 node cache 之后,会初始化 CheckMetadata.CheckPluginScoreInfo,将节点 ip 赋值给 CheckPluginScoreInfo key(Checked ip:被查看的 ip)

另外,会删除 CheckMetadata.CheckPluginScoreInfo 以及 CheckMetadata.CheckInfo 中多余的 items(不属于该边缘节点查看范畴)

2、ExecuteCheck

ExecuteCheck 也是每隔 HealthCheckPeriod 秒 (health-check-period 选项) 执行一次,会对每个边缘节点执行若干品种的健康检查插件 (ping,kubelet 等),并将各插件查看分数汇总,依据用户设置的基准线 HealthCheckScoreLine(health-check-scoreline 选项) 得出节点是否衰弱的后果

func (ehd *EdgeHealthDaemon) ExecuteCheck() {util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)
    })
    klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)
    for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {
        totalScore := 0.0
        for _, score := range pluginScores {totalScore += score}
        if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})
        } else {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})
        }
    }
    klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}

这里会调用 ParallelizeUntil 并发执行各查看插件,edge-health 目前反对 ping 以及 kubelet 两种查看插件,在 checkplugin 目录 (github.com/superedge/superedge/pkg/edge-health/checkplugin),通过 Register 注册到 PluginInfo 单例(plugin 列表) 中,如下:

// TODO: handle flag parse errors
func (pcp *PingCheckPlugin) Set(s string) error {
    var err error
    for _, para := range strings.Split(s, ",") {if len(para) == 0 {continue}
        arr := strings.Split(para, "=")
        trimKey := strings.TrimSpace(arr[0])
        switch trimKey {
        case "timeout":
            timeout, _ := strconv.Atoi(strings.TrimSpace(arr[1]))
            pcp.HealthCheckoutTimeOut = timeout
        case "retries":
            retries, _ := strconv.Atoi(strings.TrimSpace(arr[1]))
            pcp.HealthCheckRetries = retries
        case "weight":
            weight, _ := strconv.ParseFloat(strings.TrimSpace(arr[1]), 64)
            pcp.Weight = weight
        case "port":
            port, _ := strconv.Atoi(strings.TrimSpace(arr[1]))
            pcp.Port = port
        }
    }
    PluginInfo = NewPlugin()
    PluginInfo.Register(pcp)
    return err
}
func (p *Plugin) Register(plugin CheckPlugin) {p.Plugins = append(p.Plugins, plugin)
    klog.V(4).Info("Register check plugin: %+v", plugin)
}
...
var (
    PluginOnce sync.Once
    PluginInfo Plugin
)
type Plugin struct {Plugins []CheckPlugin
}
func NewPlugin() Plugin {PluginOnce.Do(func() {
        PluginInfo = Plugin{Plugins: []CheckPlugin{},}
    })
    return PluginInfo
}

每种插件具体执行健康检查的逻辑封装在 CheckExecute 中,这里以 ping plugin 为例:

// github.com/superedge/superedge/pkg/edge-health/checkplugin/pingcheck.go
func (pcp *PingCheckPlugin) CheckExecute(checkMetadata *metadata.CheckMetadata) {copyCheckedIp := checkMetadata.CopyCheckedIp()
    util.ParallelizeUntil(context.TODO(), 16, len(copyCheckedIp), func(index int) {checkedIp := copyCheckedIp[index]
        var err error
        for i := 0; i < pcp.HealthCheckRetries; i++ {if _, err := net.DialTimeout("tcp", checkedIp+":"+strconv.Itoa(pcp.Port), time.Duration(pcp.HealthCheckoutTimeOut)*time.Second); err == nil {break}
        }
        if err == nil {klog.V(4).Infof("Edge ping health check plugin %s for ip %s succeed", pcp.Name(), checkedIp)
            checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMax)
        } else {klog.Warning("Edge ping health check plugin %s for ip %s failed, possible reason %s", pcp.Name(), checkedIp, err.Error())
            checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMin)
        }
    })
}
// CheckPluginScoreInfo relevant functions
func (cm *CheckMetadata) SetByPluginScore(checkedIp, pluginName string, weight float64, score int) {cm.Lock()
    defer cm.Unlock()
    if _, existed := cm.CheckPluginScoreInfo[checkedIp]; !existed {cm.CheckPluginScoreInfo[checkedIp] = make(map[string]float64)
    }
    cm.CheckPluginScoreInfo[checkedIp][pluginName] = float64(score) * weight
}

CheckExecute 会对同区域每个节点执行 ping 探测(net.DialTimeout),如果失败,则给该节点打 CheckScoreMin 分(0);否则,打 CheckScoreMax 分(100)

每种查看插件会有一个 Weight 参数,示意了该查看插件分数的权重值,所有权重参数之和应该为 1,对应基准分数线 HealthCheckScoreLine 范畴 0 -100。因而这里在设置分数时,会乘以权重

回到 ExecuteCheck 函数,在调用各插件执行健康检查得出权重分数 (CheckPluginScoreInfo) 后,还须要将该分数与基准线 HealthCheckScoreLine 比照:如果高于 (>=) 分数线,则认为该节点本次查看失常;否则异样

func (ehd *EdgeHealthDaemon) ExecuteCheck() {util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)
    })
    klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)
    for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {
        totalScore := 0.0
        for _, score := range pluginScores {totalScore += score}
        if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})
        } else {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})
        }
    }
    klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}

3、Commun

在对同区域各边缘节点执行健康检查后,须要将查看的后果传递给其它各节点,这也就是 commun 模块负责的事件:

func (ehd *EdgeHealthDaemon) Run(stopCh <-chan struct{}) {
    // Execute edge health prepare and check
    ehd.PrepareAndCheck(stopCh)
    // Execute vote
    vote := vote.NewVoteEdge(&ehd.cfg.Vote)
    go vote.Vote(ehd.metadata, ehd.cfg.Kubeclient, ehd.cfg.Node.LocalIp, stopCh)
    // Execute communication
    communEdge := commun.NewCommunEdge(&ehd.cfg.Commun)
    communEdge.Commun(ehd.metadata.CheckMetadata, ehd.cmLister, ehd.cfg.Node.LocalIp, stopCh)
    <-stopCh
}

既然是相互传递后果给其它节点,则必然会有承受和发送模块:

func (c *CommunEdge) Commun(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string, stopCh <-chan struct{}) {go c.communReceive(checkMetadata, cmLister, stopCh)
    wait.Until(func() {c.communSend(checkMetadata, cmLister, localIp)
    }, time.Duration(c.CommunPeriod)*time.Second, stopCh)
}

其中 communSend 负责向其它节点发送本节点对它们的查看后果;而 communReceive 负责承受其它边缘节点的查看后果。上面顺次剖析:

func (c *CommunEdge) communSend(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string) {copyLocalCheckDetail := checkMetadata.CopyLocal(localIp)
    var checkedIps []string
    for checkedIp := range copyLocalCheckDetail {checkedIps = append(checkedIps, checkedIp)
    }
    util.ParallelizeUntil(context.TODO(), 16, len(checkedIps), func(index int) {// Only send commun information to other edge nodes(excluding itself)
        dstIp := checkedIps[index]
        if dstIp == localIp {return}
        // Send commun information
        communInfo := metadata.CommunInfo{SourceIP: localIp, CheckDetail: copyLocalCheckDetail}
        if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {log.Errorf("communSend: generateHmac err %+v", err)
            return
        } else {communInfo.Hmac = hmac}
        commonInfoBytes, err := json.Marshal(communInfo)
        if err != nil {log.Errorf("communSend: json.Marshal commun info err %+v", err)
            return
        }
        commonInfoReader := bytes.NewReader(commonInfoBytes)
        for i := 0; i < c.CommunRetries; i++ {req, err := http.NewRequest("PUT", "http://"+dstIp+":"+strconv.Itoa(c.CommunServerPort)+"/result", commonInfoReader)
            if err != nil {log.Errorf("communSend: NewRequest for remote edge node %s err %+v", dstIp, err)
                continue
            }
            if err = util.DoRequestAndDiscard(c.client, req); err != nil {log.Errorf("communSend: DoRequestAndDiscard for remote edge node %s err %+v", dstIp, err)
            } else {log.V(4).Infof("communSend: put commun info %+v to remote edge node %s successfully", communInfo, dstIp)
                break
            }
        }
    })
}

发送逻辑如下:

  • 构建 CommunInfo 构造体,包含:

    • SourceIP:示意执行查看的 ip
    • CheckDetail:为 Checked ip:Check detail 组织模式,蕴含被查看的 ip 以及查看后果
  • 调用 GenerateHmac 构建 Hmac:实际上是以 kube-system 下的 hmac-config configmap hmackey 字段为 key,对 SourceIP 以及 CheckDetail 进行 hmac 失去,用于判断传输数据的有效性(是否被篡改)
func GenerateHmac(communInfo metadata.CommunInfo, cmLister corelisters.ConfigMapLister) (string, error) {addrBytes, err := json.Marshal(communInfo.SourceIP)
    if err != nil {return "", err}
    detailBytes, _ := json.Marshal(communInfo.CheckDetail)
    if err != nil {return "", err}
    hmacBefore := string(addrBytes) + string(detailBytes)
    if hmacConf, err := cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.HmacConfig); err != nil {return "", err} else {return GetHmacCode(hmacBefore, hmacConf.Data[common.HmacKey])
    }
}
func GetHmacCode(s, key string) (string, error) {h := hmac.New(sha256.New, []byte(key))
    if _, err := io.WriteString(h, s); err != nil {return "", err}
    return fmt.Sprintf("%x", h.Sum(nil)), nil
}
  • 发送上述构建的 CommunInfo 给其它边缘节点(DoRequestAndDiscard)

communReceive 逻辑也很清晰:

// TODO: support changeable server listen port
func (c *CommunEdge) communReceive(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, stopCh <-chan struct{}) {svr := &http.Server{Addr: ":" + strconv.Itoa(c.CommunServerPort)}
    svr.ReadTimeout = time.Duration(c.CommunTimeout) * time.Second
    svr.WriteTimeout = time.Duration(c.CommunTimeout) * time.Second
    http.HandleFunc("/debug/flags/v", pkgutil.UpdateLogLevel)
    http.HandleFunc("/result", func(w http.ResponseWriter, r *http.Request) {
        var communInfo metadata.CommunInfo
        if r.Body == nil {http.Error(w, "Invalid commun information", http.StatusBadRequest)
            return
        }
        err := json.NewDecoder(r.Body).Decode(&communInfo)
        if err != nil {http.Error(w, fmt.Sprintf("Invalid commun information %+v", err), http.StatusBadRequest)
            return
        }
        log.V(4).Infof("Received common information from %s : %+v", communInfo.SourceIP, communInfo.CheckDetail)
        if _, err := io.WriteString(w, "Received!\n"); err != nil {log.Errorf("communReceive: send response err %+v", err)
            http.Error(w, fmt.Sprintf("Send response err %+v", err), http.StatusInternalServerError)
            return
        }
        if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {log.Errorf("communReceive: server GenerateHmac err %+v", err)
            http.Error(w, fmt.Sprintf("GenerateHmac err %+v", err), http.StatusInternalServerError)
            return
        } else {
            if hmac != communInfo.Hmac {log.Errorf("communReceive: Hmac not equal, hmac is %s but received commun info hmac is %s", hmac, communInfo.Hmac)
                http.Error(w, "Hmac not match", http.StatusForbidden)
                return
            }
        }
        log.V(4).Infof("communReceive: Hmac match")
        checkMetadata.SetByCommunInfo(communInfo)
        log.V(4).Infof("After communicate, check info is %+v", checkMetadata.CheckInfo)
    })
    go func() {if err := svr.ListenAndServe(); err != http.ErrServerClosed {log.Fatalf("Server: exit with error %+v", err)
        }
    }()
    for {
        select {
        case <-stopCh:
            ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
            defer cancel()
            if err := svr.Shutdown(ctx); err != nil {log.Errorf("Server: program exit, server exit error %+v", err)
            }
            return
        default:
        }
    }
}

负责承受其它边缘节点的查看后果,并写入本身查看后果 CheckInfo,流程如下:

  • 通过 /result 路由承受申请,并将申请内容解析成 CommunInfo
  • 对 CommunInfo 执行 GenerateHmac 获取 hmac 值,并与 CommunInfo.Hmac 字段进行比照,查看承受数据的有效性
  • 最初将 CommunInfo 查看后果写入 CheckInfo,留神:CheckDetail.Time 设置为写入时的工夫

    // CheckInfo relevant functions
    func (cm *CheckMetadata) SetByCommunInfo(c CommunInfo) {cm.Lock()
      defer cm.Unlock()
      if _, existed := cm.CheckInfo[c.SourceIP]; !existed {cm.CheckInfo[c.SourceIP] = make(map[string]CheckDetail)
      }
      for k, detail := range c.CheckDetail {
          // Update time to local timestamp since different machines have different ones
          detail.Time = time.Now()
          c.CheckDetail[k] = detail
      }
      cm.CheckInfo[c.SourceIP] = c.CheckDetail
    }
    
  • 最初在承受到 stopCh 信号时,通过 svr.Shutdown 平滑敞开服务

4、Vote

在承受到其它节点的健康检查后果后,vote 模块会对后果进行统计得出最终裁决,并向 apiserver 报告:

func (v *VoteEdge) Vote(edgeHealthMetadata *metadata.EdgeHealthMetadata, kubeclient clientset.Interface,
    localIp string, stopCh <-chan struct{}) {go wait.Until(func() {v.vote(edgeHealthMetadata, kubeclient, localIp, stopCh)
    }, time.Duration(v.VotePeriod)*time.Second, stopCh)
}

首先依据查看后果统计出状态失常以及异样的节点列表:

type votePair struct {
    pros int
    cons int
}
...
var (prosVoteIpList, consVoteIpList []string
    // Init votePair since cannot assign to struct field voteCountMap[checkedIp].pros in map
    vp votePair
)
voteCountMap := make(map[string]votePair) // {"127.0.0.1":{"pros":1,"cons":2}}
copyCheckInfo := edgeHealthMetadata.CopyAll()
// Note that voteThreshold should be calculated by checked instead of checker
// since checked represents the total valid edge health nodes while checker may contain partly ones.
voteThreshold := (edgeHealthMetadata.GetCheckedIpLen() + 1) / 2
for _, checkedDetails := range copyCheckInfo {
    for checkedIp, checkedDetail := range checkedDetails {if !time.Now().After(checkedDetail.Time.Add(time.Duration(v.VoteTimeout) * time.Second)) {if _, existed := voteCountMap[checkedIp]; !existed {voteCountMap[checkedIp] = votePair{0, 0}
            }
            vp = voteCountMap[checkedIp]
            if checkedDetail.Normal {
                vp.pros++
                if vp.pros >= voteThreshold {prosVoteIpList = append(prosVoteIpList, checkedIp)
                }
            } else {
                vp.cons++
                if vp.cons >= voteThreshold {consVoteIpList = append(consVoteIpList, checkedIp)
                }
            }
            voteCountMap[checkedIp] = vp
        }
    }
}
log.V(4).Infof("Vote: voteCountMap is %+v", voteCountMap)
...

其中状态判断的逻辑如下:

  • 如果超过一半 (>) 的节点对该节点的查看后果为失常,则认为该节点状态失常(留神时间差在 VoteTimeout 内)
  • 如果超过一半 (>) 的节点对该节点的查看后果为异样,则认为该节点状态异样(留神时间差在 VoteTimeout 内)
  • 除开上述情况,认为节点状态判断有效,对这些节点不做任何解决(可能存在脑裂的状况)

对状态失常的节点做如下解决:

...
// Handle prosVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(prosVoteIpList), func(index int) {if node := edgeHealthMetadata.GetNodeByAddr(prosVoteIpList[index]); node != nil {log.V(4).Infof("Vote: vote pros to edge node %s begin ...", node.Name)
        nodeCopy := node.DeepCopy()
        needUpdated := false
        if nodeCopy.Annotations == nil {nodeCopy.Annotations = map[string]string{common.NodeHealthAnnotation: common.NodeHealthAnnotationPros,}
            needUpdated = true
        } else {if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {
                if healthy != common.NodeHealthAnnotationPros {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros
                    needUpdated = true
                }
            } else {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros
                needUpdated = true
            }
        }
        if index, existed := admissionutil.TaintExistsPosition(nodeCopy.Spec.Taints, common.UnreachableNoExecuteTaint); existed {nodeCopy.Spec.Taints = append(nodeCopy.Spec.Taints[:index], nodeCopy.Spec.Taints[index+1:]...)
            needUpdated = true
        }
        if needUpdated {if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {log.Errorf("Vote: update pros vote to edge node %s error %+v", nodeCopy.Name, err)
            } else {log.V(2).Infof("Vote: update pros vote to edge node %s successfully", nodeCopy.Name)
            }
        }
    } else {log.Warningf("Vote: edge node addr %s not found", prosVoteIpList[index])
    }
})
...
  • 增加或者更新 ”superedgehealth/node-health” annotation 值为 ”true”,表明分布式健康检查判断该节点状态失常。
  • 如果 node 存在 NoExecute(node.kubernetes.io/unreachable) taint,则将其去掉,并更新 node.

而对状态异样的节点会增加或者更新 ”superedgehealth/node-health” annotation 值为 ”false”,表明分布式健康检查判断该节点状态异样:

// Handle consVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(consVoteIpList), func(index int) {if node := edgeHealthMetadata.GetNodeByAddr(consVoteIpList[index]); node != nil {log.V(4).Infof("Vote: vote cons to edge node %s begin ...", node.Name)
        nodeCopy := node.DeepCopy()
        needUpdated := false
        if nodeCopy.Annotations == nil {nodeCopy.Annotations = map[string]string{common.NodeHealthAnnotation: common.NodeHealthAnnotationCons,}
            needUpdated = true
        } else {if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {
                if healthy != common.NodeHealthAnnotationCons {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons
                    needUpdated = true
                }
            } else {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons
                needUpdated = true
            }
        }
        if needUpdated {if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {log.Errorf("Vote: update cons vote to edge node %s error %+v", nodeCopy.Name, err)
            } else {log.V(2).Infof("Vote: update cons vote to edge node %s successfully", nodeCopy.Name)
            }
        }
    } else {log.Warningf("Vote: edge node addr %s not found", consVoteIpList[index])
    }
})

在边端 edge-health-daemon 向 apiserver 发送节点衰弱后果后,云端运行 edge-health-admission(Kubernetes mutating admission webhook),会一直依据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉 NoExecute taint) 以及 endpoints(将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses 中),从而实现即使云边断连,然而分布式健康检查状态失常的状况下:

  • 失联的节点上的 pod 不会从 Service 的 Endpoint 列表中移除
  • 失联的节点上的 pod 不会被驱赶

总结

  • 分布式健康检查对于云边断连状况的解决区别原生 Kubernetes 如下:

    • 原生 Kubernetes:

      • 失联的节点被置为 ConditionUnknown 状态,并被增加 NoSchedule 和 NoExecute 的 taints
      • 失联的节点上的 pod 被驱赶,并在其余节点上进行重建
      • 失联的节点上的 pod 从 Service 的 Endpoint 列表中移除
    • 分布式健康检查:
  • 分布式健康检查次要通过如下三个层面加强节点状态判断的准确性:

    • 每个节点定期探测其余节点衰弱状态
    • 集群内所有节点定期投票决定各节点的状态
    • 云端和边端节点独特决定节点状态
  • 分布式健康检查性能由边端的 edge-health-daemon 以及云端的 edge-health-admission 组成,性能别离如下:

    • edge-health-daemon:对同区域边缘节点执行分布式健康检查,并向 apiserver 发送衰弱状态投票后果(给 node 打 annotation),主体逻辑包含四局部性能:

      • SyncNodeList:依据边缘节点所在的 zone 刷新 node cache,同时更新 CheckMetadata 相干数据
      • ExecuteCheck:对每个边缘节点执行若干品种的健康检查插件(ping,kubelet 等),并将各插件查看分数汇总,依据用户设置的基准线得出节点是否衰弱的后果
      • Commun:将本节点对其它各节点健康检查的后果发送给其它节点
      • Vote:对所有节点健康检查的后果分类,如果某个节点被大多数 (>1/2) 节点断定为失常,则对该节点增加 superedgehealth/node-health:true annotation,表明该节点分布式健康检查后果为失常;否则,对该节点增加 superedgehealth/node-health:false annotation,表明该节点分布式健康检查后果为异样
    • edge-health-admission(Kubernetes mutating admission webhook):一直依据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉 NoExecute taint)以及 endpoints(将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses 中),从而实现云端和边端独特决定节点状态

duyanghao kubernetes-reading-notes

【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!

正文完
 0