关于kubernetes:源码剖析KEDA是如何工作的

KEDA 在2020年11月4号release了2.0版本,蕴含了一些新的比拟有用的个性,比方ScaledObject/ScaledJob中反对多触发器、反对HPA原始的CPU、Memory scaler等。

具体的装置应用请参考上一篇文章应用keda实现基于事件的弹性伸缩,这篇文章次要深刻的看下KEDA外部机制以及是如何工作的。

咱们先提出几个问题,带着问题去看代码,不便咱们了解整个机制:

  • KEDA是如何获取到多种事件的指标,以及如何判断扩缩容的?
  • KEDA是如何做到将利用的正本数缩容0,根据是什么?

代码构造

对一些次要目录阐明,其余一些MD文件次要是文字说明:

├── BRANDING.md
├── BUILD.md    //如何在本地编译和运行
├── CHANGELOG.md
├── CONTRIBUTING.md   //如何参加奉献次我的项目
├── CREATE-NEW-SCALER.md
├── Dockerfile
├── Dockerfile.adapter
├── GOVERNANCE.md
├── LICENSE
├── MAINTAINERS.md
├── Makefile    // 构建编译相干命令
├── PROJECT
├── README.md
├── RELEASE-PROCESS.MD
├── adapter  // keda-metrics-apiserver 组件入口
├── api // 自定义资源定义,例如ScaledObject的定义
├── bin 
├── config //组件yaml资源,通过kustomization工具生成
├── controllers //kubebuilder 中controller 代码管制crd资源
├── go.mod
├── go.sum
├── hack
├── images
├── main.go //keda-operator  controller入口
├── pkg   //蕴含组件外围代码实现
├── tests //e2e测试
├── tools  
├── vendor
└── version

keda中次要是两个组件keda-operator以及keda-metrics-apiserver

  • keda-operator : 负责创立/更新HPA以及通过Loop管制利用正本数
  • keda-metrics-apiserver:实现external-metrics接口,以对接给HPA的external类型的指标查问(比方各种prometheus指标,mysql等)

keda-operator

我的项目中用到了kubebuilder SDK,用来实现这个Operator的编写。

对于k8s中的自定义controller不理解的能够看看这边文章:如何在Kubernetes中创立一个自定义Controller?。

keda controller的次要流程,画了幅图:

组件启动入口在于main.go文件中:

通过controller-runtime组件启动两个自定义controller:ScaledObjectReconciler,ScaledJobReconciler:

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        MetricsBindAddress:     metricsAddr,
        HealthProbeBindAddress: ":8081",
        Port:                   9443,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "operator.keda.sh",
    })
...

// Add readiness probe 
err = mgr.AddReadyzCheck("ready-ping", healthz.Ping)
...
// Add liveness probe
err = mgr.AddHealthzCheck("health-ping", healthz.Ping)
....
//注册 ScaledObject 解决的controller
if err = (&controllers.ScaledObjectReconciler{
    Client: mgr.GetClient(),
    Log:    ctrl.Log.WithName("controllers").WithName("ScaledObject"),
    Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
    setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
    os.Exit(1)
}
////注册 ScaledJob 解决的controller
if err = (&controllers.ScaledJobReconciler{
    Client: mgr.GetClient(),
    Log:    ctrl.Log.WithName("controllers").WithName("ScaledJob"),
    Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
    setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
    os.Exit(1)
}

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    setupLog.Error(err, "problem running manager")
    os.Exit(1)
}

ScaledObjectReconciler 解决

咱们次要关注Reconcile办法,当ScaledObject发生变化时将会触发该办法:
办法外部次要性能实现:

...
// 解决删除ScaledObject的状况
if scaledObject.GetDeletionTimestamp() != nil {
      //进入垃圾回收(比方进行goroutine中Loop,复原原有正本数)
    return ctrl.Result{}, r.finalizeScaledObject(reqLogger, scaledObject)
}

// 给ScaledObject资源加上Finalizer:finalizer.keda.sh
if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil {
    return ctrl.Result{}, err
}
...


// 真正解决ScaledObject资源
msg, err := r.reconcileScaledObject(reqLogger, scaledObject)
// 设置Status字段阐明
conditions := scaledObject.Status.Conditions.DeepCopy()
if err != nil {
    reqLogger.Error(err, msg)
    conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
    conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")
} else {
    reqLogger.V(1).Info(msg)
    conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
}
kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions)
return ctrl.Result{}, err

r.reconcileScaledObject办法:

这个办法中次要两个动作:

  • ensureHPAForScaledObjectExists创立HPA资源
  • 进入requestScaleLoop(一直的检测scaler 是否active,进行正本数的批改)

ensureHPAForScaledObjectExists

通过跟踪进入到newHPAForScaledObject办法:

scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
...省略代码

hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{
    Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{
        MinReplicas: getHPAMinReplicas(scaledObject),
        MaxReplicas: getHPAMaxReplicas(scaledObject),
        Metrics:     scaledObjectMetricSpecs,
        Behavior:    behavior,
        ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{
            Name:       scaledObject.Spec.ScaleTargetRef.Name,
            Kind:       gvkr.Kind,
            APIVersion: gvkr.GroupVersion().String(),
        }},
    ObjectMeta: metav1.ObjectMeta{
        Name:      getHPAName(scaledObject),
        Namespace: scaledObject.Namespace,
        Labels:    labels,
    },
    TypeMeta: metav1.TypeMeta{
        APIVersion: "v2beta2",
    },
}

能够看到创立ScalerObject其实最终也是创立了HPA,其实还是通过HPA自身的个性来管制利用的弹性伸缩。

其中getScaledObjectMetricSpecs办法中就是获取到triggers中的metrics指标。

这里有辨别一下External的metrics和resource metrics,因为CPU/Memory scaler是通过resource metrics 来获取的。

requestScaleLoop

requestScaleLoop办法中用来循环check Scaler中的IsActive状态并作出对应的解决,比方批改正本数,间接来看最终的解决吧:
这里有两种模型来触发RequestScale

  • Pull模型:即被动的调用scaler 中的IsActive办法
  • Push模型:由Scaler来触发,PushScaler多了一个Run办法,通过channel传入active状态。

IsActive是由Scaler实现的,比方对于prometheus来说,可能指标为0则为false

这个具体的scaler实现后续再讲,咱们来看看RequestScale做了什么事:

//以后正本数为0,并是所有scaler属于active状态,则批改正本数为MinReplicaCount 或 1
if currentScale.Spec.Replicas == 0 && isActive {
    e.scaleFromZero(ctx, logger, scaledObject, currentScale)
} else if !isActive &&
    currentScale.Spec.Replicas > 0 &&
    (scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0) {
   // 所有scaler都解决not active状态,并且以后正本数大于0,且MinReplicaCount设定为0
   // 则缩容正本数为0
    e.scaleToZero(ctx, logger, scaledObject, currentScale)
} else if !isActive &&
    scaledObject.Spec.MinReplicaCount != nil &&
    currentScale.Spec.Replicas < *scaledObject.Spec.MinReplicaCount {
  // 所有scaler都解决not active状态,并且以后正本数小于MinReplicaCount,则批改为MinReplicaCount
    currentScale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount

    err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale)
    ....
} else if isActive {
//  解决active状态,并且正本数大于0,则更新LastActiveTime
    e.updateLastActiveTime(ctx, logger, scaledObject)
} else {
// 不解决
    logger.V(1).Info("ScaleTarget no change")
}

ScaledJobReconciler 解决

ScaledJobReconciler相比ScalerObject少了创立HPA的步骤,其余的步骤次要是通过checkScaledJobScalers,RequestJobScale两个办法来判断Job创立:

  • checkScaledJobScalers 办法,用于计算isActive,maxValue的值
  • RequestJobScale 办法,用于负责创立Job,外面还波及到三种扩容策略

这里间接看代码吧,不贴代码了。

如何进行Loop

这里有个问题就是startPushScalersstartScaleLoop都是在Goroutine中解决的,所以当ScaleObject/ScalerJob被删除的时候,这里须要可能被删除,这里就用到了context.Cancel办法,在Goroutine启动的时候就将,context保留在scaleLoopContexts *sync.Map中(如果曾经有了,就先Cancel一次),在删除资源的时候,进行删除:

func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error {
    withTriggers, err := asDuckWithTriggers(scalableObject)
    if err != nil {
        h.logger.Error(err, "error duck typing object into withTrigger")
        return err
    }

    key := generateKey(withTriggers)

    result, ok := h.scaleLoopContexts.Load(key)
    if ok {
        cancel, ok := result.(context.CancelFunc)
        if ok {
            cancel()
        }
        h.scaleLoopContexts.Delete(key)
    } else {
        h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
    }

    return nil
}

ps: 这里的妙啊,学到了

keda-metrics-apiserver

keda-metrics-apiserver实现了ExternalMetricsProvider接口:

type ExternalMetricsProvider interface {
   GetExternalMetric(namespace string, metricSelector labels.Selector, info ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error)

   ListAllExternalMetrics() []ExternalMetricInfo
}
  • GetExternalMetric 用于返回Scaler的指标,调用scaler.GetMetrics办法
  • ListAllExternalMetrics 返回所有反对的external metrics,例如prometheus,mysql等

当代码写好之后,再通过apiservice注册到apiservier上(当然还波及到鉴权,这里不啰嗦了):

apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
  labels:
    app.kubernetes.io/name: v1beta1.external.metrics.k8s.io
    app.kubernetes.io/version: latest
    app.kubernetes.io/part-of: keda-operator
  name: v1beta1.external.metrics.k8s.io
spec:
  service:
    name: keda-metrics-apiserver
    namespace: keda
  group: external.metrics.k8s.io
  version: v1beta1
  insecureSkipTLSVerify: true
  groupPriorityMinimum: 100
  versionPriority: 100

实现一个Scaler

其实有两种Scaler,即下面将的一个pull,一个push的模型,PushScaler多了一个Run办法:

实现一个Scaler,次要实现以下接口:

// Scaler interface
type Scaler interface {

    // 返回external_metrics.ExternalMetricValue对象,其实就是用于 keda-metrics-apiserver中获取到scaler的指标
    GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error)

    // 返回v2beta2.MetricSpec 构造,次要用于ScalerObject形容创立HPA的类型和Target指标等
    GetMetricSpecForScaling() []v2beta2.MetricSpec
      // 返回该Scaler是否Active,可能会影响Loop中间接批改正本数
    IsActive(ctx context.Context) (bool, error)

    //调用完一次下面的办法就会调用一次Close 
    Close() error
}

// PushScaler interface
type PushScaler interface {
    Scaler

    // 通过scaler实现Run办法,往active channel中,写入值,而非下面的间接调用IsActive放回
    Run(ctx context.Context, active chan<- bool)
}

总结

回过头来咱们解答下在结尾留下的问题:

  • KEDA是如何获取到多种事件的指标,以及如何判断扩缩容的?

    答:keda controler中生成了external 类型的hpa,并且实现了external metrics 的api

  • KEDA是如何做到将利用的正本数缩容0,根据是什么?

    答: keda 外部有个loop,一直的check isActive状态,会被动的批改利用正本

原文地址:https://silenceper.com/blog/2…

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理