KEDA 在 2020 年 11 月 4 号 release 了 2.0 版本,蕴含了一些新的比拟有用的个性,比方 ScaledObject/ScaledJob
中反对多触发器、反对 HPA 原始的 CPU、Memory scaler 等。
具体的装置应用请参考上一篇文章应用 keda 实现基于事件的弹性伸缩,这篇文章次要深刻的看下 KEDA 外部机制以及是如何工作的。
咱们先提出几个问题,带着问题去看代码,不便咱们了解整个机制:
- KEDA 是如何获取到多种事件的指标,以及如何判断扩缩容的?
- KEDA 是如何做到将利用的正本数缩容 0,根据是什么?
代码构造
对一些次要目录阐明,其余一些 MD 文件次要是文字说明:
├── BRANDING.md
├── BUILD.md // 如何在本地编译和运行
├── CHANGELOG.md
├── CONTRIBUTING.md // 如何参加奉献次我的项目
├── CREATE-NEW-SCALER.md
├── Dockerfile
├── Dockerfile.adapter
├── GOVERNANCE.md
├── LICENSE
├── MAINTAINERS.md
├── Makefile // 构建编译相干命令
├── PROJECT
├── README.md
├── RELEASE-PROCESS.MD
├── adapter // keda-metrics-apiserver 组件入口
├── api // 自定义资源定义,例如 ScaledObject 的定义
├── bin
├── config // 组件 yaml 资源,通过 kustomization 工具生成
├── controllers //kubebuilder 中 controller 代码管制 crd 资源
├── go.mod
├── go.sum
├── hack
├── images
├── main.go //keda-operator controller 入口
├── pkg // 蕴含组件外围代码实现
├── tests //e2e 测试
├── tools
├── vendor
└── version
keda 中次要是两个组件 keda-operator
以及keda-metrics-apiserver
。
- keda-operator:负责创立 / 更新 HPA 以及通过 Loop 管制利用正本数
- keda-metrics-apiserver:实现 external-metrics 接口,以对接给 HPA 的 external 类型的指标查问(比方各种 prometheus 指标,mysql 等)
keda-operator
我的项目中用到了 kubebuilder SDK,用来实现这个 Operator 的编写。
对于 k8s 中的自定义 controller 不理解的能够看看这边文章:如何在 Kubernetes 中创立一个自定义 Controller?。
keda controller 的次要流程,画了幅图:
组件启动入口在于 main.go
文件中:
通过 controller-runtime
组件启动两个自定义 controller:ScaledObjectReconciler
,ScaledJobReconciler
:
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
HealthProbeBindAddress: ":8081",
Port: 9443,
LeaderElection: enableLeaderElection,
LeaderElectionID: "operator.keda.sh",
})
...
// Add readiness probe
err = mgr.AddReadyzCheck("ready-ping", healthz.Ping)
...
// Add liveness probe
err = mgr.AddHealthzCheck("health-ping", healthz.Ping)
....
// 注册 ScaledObject 解决的 controller
if err = (&controllers.ScaledObjectReconciler{Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"),
Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
}
//// 注册 ScaledJob 解决的 controller
if err = (&controllers.ScaledJobReconciler{Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"),
Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")
os.Exit(1)
}
ScaledObjectReconciler 解决
咱们次要关注 Reconcile
办法,当 ScaledObject
发生变化时将会触发该办法:
办法外部次要性能实现:
...
// 解决删除 ScaledObject 的状况
if scaledObject.GetDeletionTimestamp() != nil {
// 进入垃圾回收(比方进行 goroutine 中 Loop,复原原有正本数)return ctrl.Result{}, r.finalizeScaledObject(reqLogger, scaledObject)
}
// 给 ScaledObject 资源加上 Finalizer:finalizer.keda.sh
if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil {return ctrl.Result{}, err
}
...
// 真正解决 ScaledObject 资源
msg, err := r.reconcileScaledObject(reqLogger, scaledObject)
// 设置 Status 字段阐明
conditions := scaledObject.Status.Conditions.DeepCopy()
if err != nil {reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")
} else {reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
}
kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions)
return ctrl.Result{}, err
r.reconcileScaledObject 办法:
这个办法中次要两个动作:
ensureHPAForScaledObjectExists
创立 HPA 资源- 进入
requestScaleLoop
(一直的检测 scaler 是否 active,进行正本数的批改)
ensureHPAForScaledObjectExists
通过跟踪进入到 newHPAForScaledObject 办法:
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
... 省略代码
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{
Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{MinReplicas: getHPAMinReplicas(scaledObject),
MaxReplicas: getHPAMaxReplicas(scaledObject),
Metrics: scaledObjectMetricSpecs,
Behavior: behavior,
ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{
Name: scaledObject.Spec.ScaleTargetRef.Name,
Kind: gvkr.Kind,
APIVersion: gvkr.GroupVersion().String(),
}},
ObjectMeta: metav1.ObjectMeta{Name: getHPAName(scaledObject),
Namespace: scaledObject.Namespace,
Labels: labels,
},
TypeMeta: metav1.TypeMeta{APIVersion: "v2beta2",},
}
能够看到创立 ScalerObject 其实最终也是创立了 HPA,其实还是通过 HPA 自身的个性来管制利用的弹性伸缩。
其中 getScaledObjectMetricSpecs 办法中就是获取到 triggers
中的 metrics 指标。
这里有辨别一下 External 的 metrics 和 resource metrics,因为 CPU/Memory scaler 是通过 resource metrics 来获取的。
requestScaleLoop
requestScaleLoop 办法中用来循环 check Scaler 中的 IsActive 状态并作出对应的解决,比方批改正本数,间接来看最终的解决吧:
这里有两种模型来触发RequestScale
:
- Pull 模型:即被动的调用 scaler 中的
IsActive
办法 - Push 模型:由 Scaler 来触发,
PushScaler
多了一个 Run 办法,通过 channel 传入 active 状态。
IsActive
是由 Scaler 实现的,比方对于 prometheus 来说,可能指标为 0 则为 false
这个具体的 scaler 实现后续再讲,咱们来看看 RequestScale 做了什么事:
// 以后正本数为 0,并是所有 scaler 属于 active 状态,则批改正本数为 MinReplicaCount 或 1
if currentScale.Spec.Replicas == 0 && isActive {e.scaleFromZero(ctx, logger, scaledObject, currentScale)
} else if !isActive &&
currentScale.Spec.Replicas > 0 &&
(scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0) {
// 所有 scaler 都解决 not active 状态,并且以后正本数大于 0,且 MinReplicaCount 设定为 0
// 则缩容正本数为 0
e.scaleToZero(ctx, logger, scaledObject, currentScale)
} else if !isActive &&
scaledObject.Spec.MinReplicaCount != nil &&
currentScale.Spec.Replicas < *scaledObject.Spec.MinReplicaCount {
// 所有 scaler 都解决 not active 状态,并且以后正本数小于 MinReplicaCount,则批改为 MinReplicaCount
currentScale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount
err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale)
....
} else if isActive {
// 解决 active 状态,并且正本数大于 0,则更新 LastActiveTime
e.updateLastActiveTime(ctx, logger, scaledObject)
} else {
// 不解决
logger.V(1).Info("ScaleTarget no change")
}
ScaledJobReconciler 解决
ScaledJobReconciler
相比 ScalerObject
少了创立 HPA 的步骤,其余的步骤次要是通过 checkScaledJobScalers,RequestJobScale 两个办法来判断 Job 创立:
checkScaledJobScalers
办法,用于计算 isActive,maxValue 的值RequestJobScale
办法,用于负责创立 Job,外面还波及到三种扩容策略
这里间接看代码吧,不贴代码了。
如何进行 Loop
这里有个问题就是 startPushScalers
和startScaleLoop
都是在 Goroutine 中解决的,所以当 ScaleObject/ScalerJob 被删除的时候,这里须要可能被删除,这里就用到了 context.Cancel
办法,在 Goroutine 启动的时候就将,context 保留在 scaleLoopContexts *sync.Map 中(如果曾经有了,就先 Cancel 一次),在删除资源的时候,进行删除:
func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error {withTriggers, err := asDuckWithTriggers(scalableObject)
if err != nil {h.logger.Error(err, "error duck typing object into withTrigger")
return err
}
key := generateKey(withTriggers)
result, ok := h.scaleLoopContexts.Load(key)
if ok {cancel, ok := result.(context.CancelFunc)
if ok {cancel()
}
h.scaleLoopContexts.Delete(key)
} else {h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
}
return nil
}
ps: 这里的妙啊,学到了
keda-metrics-apiserver
keda-metrics-apiserver 实现了 ExternalMetricsProvider
接口:
type ExternalMetricsProvider interface {GetExternalMetric(namespace string, metricSelector labels.Selector, info ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error)
ListAllExternalMetrics() []ExternalMetricInfo
}
- GetExternalMetric 用于返回 Scaler 的指标,调用
scaler.GetMetrics
办法 - ListAllExternalMetrics 返回所有反对的 external metrics,例如 prometheus,mysql 等
当代码写好之后,再通过 apiservice 注册到 apiservier 上(当然还波及到鉴权,这里不啰嗦了):
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
app.kubernetes.io/name: v1beta1.external.metrics.k8s.io
app.kubernetes.io/version: latest
app.kubernetes.io/part-of: keda-operator
name: v1beta1.external.metrics.k8s.io
spec:
service:
name: keda-metrics-apiserver
namespace: keda
group: external.metrics.k8s.io
version: v1beta1
insecureSkipTLSVerify: true
groupPriorityMinimum: 100
versionPriority: 100
实现一个 Scaler
其实有两种 Scaler,即下面将的一个 pull,一个 push 的模型,PushScaler 多了一个 Run 办法:
实现一个 Scaler,次要实现以下接口:
// Scaler interface
type Scaler interface {
// 返回 external_metrics.ExternalMetricValue 对象,其实就是用于 keda-metrics-apiserver 中获取到 scaler 的指标
GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error)
// 返回 v2beta2.MetricSpec 构造,次要用于 ScalerObject 形容创立 HPA 的类型和 Target 指标等
GetMetricSpecForScaling() []v2beta2.MetricSpec
// 返回该 Scaler 是否 Active,可能会影响 Loop 中间接批改正本数
IsActive(ctx context.Context) (bool, error)
// 调用完一次下面的办法就会调用一次 Close
Close() error}
// PushScaler interface
type PushScaler interface {
Scaler
// 通过 scaler 实现 Run 办法,往 active channel 中,写入值,而非下面的间接调用 IsActive 放回
Run(ctx context.Context, active chan<- bool)
}
总结
回过头来咱们解答下在结尾留下的问题:
- KEDA 是如何获取到多种事件的指标,以及如何判断扩缩容的?
答:keda controler 中生成了 external 类型的 hpa,并且实现了 external metrics 的 api
- KEDA 是如何做到将利用的正本数缩容 0,根据是什么?
答:keda 外部有个 loop,一直的 check isActive 状态,会被动的批改利用正本
原文地址:https://silenceper.com/blog/2…