KEDA 在2020年11月4号release了2.0版本,蕴含了一些新的比拟有用的个性,比方ScaledObject/ScaledJob
中反对多触发器、反对HPA原始的CPU、Memory scaler等。
具体的装置应用请参考上一篇文章应用keda实现基于事件的弹性伸缩,这篇文章次要深刻的看下KEDA外部机制以及是如何工作的。
咱们先提出几个问题,带着问题去看代码,不便咱们了解整个机制:
- KEDA是如何获取到多种事件的指标,以及如何判断扩缩容的?
- KEDA是如何做到将利用的正本数缩容0,根据是什么?
代码构造
对一些次要目录阐明,其余一些MD文件次要是文字说明:
├── BRANDING.md├── BUILD.md //如何在本地编译和运行├── CHANGELOG.md├── CONTRIBUTING.md //如何参加奉献次我的项目├── CREATE-NEW-SCALER.md├── Dockerfile├── Dockerfile.adapter├── GOVERNANCE.md├── LICENSE├── MAINTAINERS.md├── Makefile // 构建编译相干命令├── PROJECT├── README.md├── RELEASE-PROCESS.MD├── adapter // keda-metrics-apiserver 组件入口├── api // 自定义资源定义,例如ScaledObject的定义├── bin ├── config //组件yaml资源,通过kustomization工具生成├── controllers //kubebuilder 中controller 代码管制crd资源├── go.mod├── go.sum├── hack├── images├── main.go //keda-operator controller入口├── pkg //蕴含组件外围代码实现├── tests //e2e测试├── tools ├── vendor└── version
keda中次要是两个组件keda-operator
以及keda-metrics-apiserver
。
- keda-operator : 负责创立/更新HPA以及通过Loop管制利用正本数
- keda-metrics-apiserver:实现external-metrics接口,以对接给HPA的external类型的指标查问(比方各种prometheus指标,mysql等)
keda-operator
我的项目中用到了kubebuilder SDK,用来实现这个Operator的编写。
对于k8s中的自定义controller不理解的能够看看这边文章:如何在Kubernetes中创立一个自定义Controller?。
keda controller的次要流程,画了幅图:
组件启动入口在于main.go
文件中:
通过controller-runtime
组件启动两个自定义controller:ScaledObjectReconciler
,ScaledJobReconciler
:
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, HealthProbeBindAddress: ":8081", Port: 9443, LeaderElection: enableLeaderElection, LeaderElectionID: "operator.keda.sh", })...// Add readiness probe err = mgr.AddReadyzCheck("ready-ping", healthz.Ping)...// Add liveness probeerr = mgr.AddHealthzCheck("health-ping", healthz.Ping)....//注册 ScaledObject 解决的controllerif err = (&controllers.ScaledObjectReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"), Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScaledObject") os.Exit(1)}////注册 ScaledJob 解决的controllerif err = (&controllers.ScaledJobReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"), Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScaledJob") os.Exit(1)}if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1)}
ScaledObjectReconciler 解决
咱们次要关注Reconcile
办法,当ScaledObject
发生变化时将会触发该办法:
办法外部次要性能实现:
...// 解决删除ScaledObject的状况if scaledObject.GetDeletionTimestamp() != nil { //进入垃圾回收(比方进行goroutine中Loop,复原原有正本数) return ctrl.Result{}, r.finalizeScaledObject(reqLogger, scaledObject)}// 给ScaledObject资源加上Finalizer:finalizer.keda.shif err := r.ensureFinalizer(reqLogger, scaledObject); err != nil { return ctrl.Result{}, err}...// 真正解决ScaledObject资源msg, err := r.reconcileScaledObject(reqLogger, scaledObject)// 设置Status字段阐明conditions := scaledObject.Status.Conditions.DeepCopy()if err != nil { reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")} else { reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)}kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions)return ctrl.Result{}, err
r.reconcileScaledObject办法:
这个办法中次要两个动作:
ensureHPAForScaledObjectExists
创立HPA资源- 进入
requestScaleLoop
(一直的检测scaler 是否active,进行正本数的批改)
ensureHPAForScaledObjectExists
通过跟踪进入到newHPAForScaledObject办法:
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject)...省略代码hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{ Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{ MinReplicas: getHPAMinReplicas(scaledObject), MaxReplicas: getHPAMaxReplicas(scaledObject), Metrics: scaledObjectMetricSpecs, Behavior: behavior, ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{ Name: scaledObject.Spec.ScaleTargetRef.Name, Kind: gvkr.Kind, APIVersion: gvkr.GroupVersion().String(), }}, ObjectMeta: metav1.ObjectMeta{ Name: getHPAName(scaledObject), Namespace: scaledObject.Namespace, Labels: labels, }, TypeMeta: metav1.TypeMeta{ APIVersion: "v2beta2", },}
能够看到创立ScalerObject其实最终也是创立了HPA,其实还是通过HPA自身的个性来管制利用的弹性伸缩。
其中getScaledObjectMetricSpecs办法中就是获取到triggers
中的metrics指标。
这里有辨别一下External的metrics和resource metrics,因为CPU/Memory scaler是通过resource metrics 来获取的。
requestScaleLoop
requestScaleLoop办法中用来循环check Scaler中的IsActive状态并作出对应的解决,比方批改正本数,间接来看最终的解决吧:
这里有两种模型来触发RequestScale
:
- Pull模型:即被动的调用scaler 中的
IsActive
办法 - Push模型:由Scaler来触发,
PushScaler
多了一个Run办法,通过channel传入active状态。
IsActive
是由Scaler实现的,比方对于prometheus来说,可能指标为0则为false
这个具体的scaler实现后续再讲,咱们来看看RequestScale做了什么事:
//以后正本数为0,并是所有scaler属于active状态,则批改正本数为MinReplicaCount 或 1if currentScale.Spec.Replicas == 0 && isActive { e.scaleFromZero(ctx, logger, scaledObject, currentScale)} else if !isActive && currentScale.Spec.Replicas > 0 && (scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0) { // 所有scaler都解决not active状态,并且以后正本数大于0,且MinReplicaCount设定为0 // 则缩容正本数为0 e.scaleToZero(ctx, logger, scaledObject, currentScale)} else if !isActive && scaledObject.Spec.MinReplicaCount != nil && currentScale.Spec.Replicas < *scaledObject.Spec.MinReplicaCount { // 所有scaler都解决not active状态,并且以后正本数小于MinReplicaCount,则批改为MinReplicaCount currentScale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale) ....} else if isActive {// 解决active状态,并且正本数大于0,则更新LastActiveTime e.updateLastActiveTime(ctx, logger, scaledObject)} else {// 不解决 logger.V(1).Info("ScaleTarget no change")}
ScaledJobReconciler 解决
ScaledJobReconciler
相比ScalerObject
少了创立HPA的步骤,其余的步骤次要是通过checkScaledJobScalers,RequestJobScale两个办法来判断Job创立:
checkScaledJobScalers
办法,用于计算isActive,maxValue的值RequestJobScale
办法,用于负责创立Job,外面还波及到三种扩容策略
这里间接看代码吧,不贴代码了。
如何进行Loop
这里有个问题就是startPushScalers
和startScaleLoop
都是在Goroutine中解决的,所以当ScaleObject/ScalerJob被删除的时候,这里须要可能被删除,这里就用到了context.Cancel
办法,在Goroutine启动的时候就将,context保留在scaleLoopContexts *sync.Map中(如果曾经有了,就先Cancel一次),在删除资源的时候,进行删除:
func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error { withTriggers, err := asDuckWithTriggers(scalableObject) if err != nil { h.logger.Error(err, "error duck typing object into withTrigger") return err } key := generateKey(withTriggers) result, ok := h.scaleLoopContexts.Load(key) if ok { cancel, ok := result.(context.CancelFunc) if ok { cancel() } h.scaleLoopContexts.Delete(key) } else { h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key) } return nil}
ps: 这里的妙啊,学到了
keda-metrics-apiserver
keda-metrics-apiserver实现了ExternalMetricsProvider
接口:
type ExternalMetricsProvider interface { GetExternalMetric(namespace string, metricSelector labels.Selector, info ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) ListAllExternalMetrics() []ExternalMetricInfo}
- GetExternalMetric 用于返回Scaler的指标,调用
scaler.GetMetrics
办法 - ListAllExternalMetrics 返回所有反对的external metrics,例如prometheus,mysql等
当代码写好之后,再通过apiservice注册到apiservier上(当然还波及到鉴权,这里不啰嗦了):
apiVersion: apiregistration.k8s.io/v1kind: APIServicemetadata: labels: app.kubernetes.io/name: v1beta1.external.metrics.k8s.io app.kubernetes.io/version: latest app.kubernetes.io/part-of: keda-operator name: v1beta1.external.metrics.k8s.iospec: service: name: keda-metrics-apiserver namespace: keda group: external.metrics.k8s.io version: v1beta1 insecureSkipTLSVerify: true groupPriorityMinimum: 100 versionPriority: 100
实现一个Scaler
其实有两种Scaler,即下面将的一个pull,一个push的模型,PushScaler多了一个Run办法:
实现一个Scaler,次要实现以下接口:
// Scaler interfacetype Scaler interface { // 返回external_metrics.ExternalMetricValue对象,其实就是用于 keda-metrics-apiserver中获取到scaler的指标 GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) // 返回v2beta2.MetricSpec 构造,次要用于ScalerObject形容创立HPA的类型和Target指标等 GetMetricSpecForScaling() []v2beta2.MetricSpec // 返回该Scaler是否Active,可能会影响Loop中间接批改正本数 IsActive(ctx context.Context) (bool, error) //调用完一次下面的办法就会调用一次Close Close() error}// PushScaler interfacetype PushScaler interface { Scaler // 通过scaler实现Run办法,往active channel中,写入值,而非下面的间接调用IsActive放回 Run(ctx context.Context, active chan<- bool)}
总结
回过头来咱们解答下在结尾留下的问题:
- KEDA是如何获取到多种事件的指标,以及如何判断扩缩容的?
答:keda controler中生成了external 类型的hpa,并且实现了external metrics 的api
- KEDA是如何做到将利用的正本数缩容0,根据是什么?
答: keda 外部有个loop,一直的check isActive状态,会被动的批改利用正本
原文地址:https://silenceper.com/blog/2...