共计 8358 个字符,预计需要花费 21 分钟才能阅读完成。
kubernetes 通过 GVK 惟一标识一个 API 对象类型,比方 HPA:
- G=autoscaling
- V=v1/v2beta1/v2beta2/v2…
- Kind=HorizontalPodAutoscaler
随着 kubernetes 版本的迭代,产生了不同的 Version,这就引发如下的问题:
- 当应用不同的 Version 创立的资源对象时,最终存储在 etcd 的是哪个 Version?
- 当应用不同的 Version Get 资源对象时,在 apiserver 中是如何转换的?
一. APIVersion 的流转
APIVersion 蕴含以下几种:
-
External Version:
- apiserver 对外裸露的 version,比方 v1/v2beta1 等;
-
Internal Version:
- 外部 version,是所有 version 字段的超集,通常是 latest external version;
-
Storage Version:
- 保留到 etcd 的 version,通常是 external version 中 stable 的 vervsion,比方 v1/v2 等;
ApiServer 中各个 version 之间不能间接转换,通常应用 Internal version 进行直达,即:
v1-->internal version-->v2beta1
APIVersion 的流转过程:
- 通过 kubectl create 创立资源对象时,client 端应用 External Version,即 apiserver 裸露的 version 和 scheme;
-
在 apiserver 中,对于 External Version 的对象:
- 首先,被转换为 Interval version;
- 而后,将 Internal version 转换为 storage version;
-
通过 kubectl get 查问资源对象时,依据 client 应用的 Version:
- 首先,将 Storage Version 对象转换为 Internal version;
- 而后,将 Internal version 转换为 External version,返回给 client;
二. storageVersion
1. 抉择入口
抉择 etcd 保留 Version 的入口代码:
- 通过 storageVersioner 抉择 version;
- storageVersioner 是 storageProvider 的对象内属性;
// vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
...
var apiResource metav1.APIResource
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionHash) &&
isStorageVersionProvider &&
storageVersionProvider.StorageVersion() != nil {versioner := storageVersionProvider.StorageVersion() // 通过 storageVersioner 抉择 version
gvk, err := getStorageVersionKind(versioner, storage, a.group.Typer) // 失去保留的 GVK
if err != nil {return nil, nil, err}
apiResource.StorageVersionHash = discovery.StorageVersionHash(gvk.Group, gvk.Version, gvk.Kind)
}
...
}
storageProvider 即 Store,storageVersioner 是其中的一个属性,其属性的构建:
- 由 storageConfig.EncodeVersioner 得来;
// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
...
if e.Storage.Storage == nil {
...
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
...
}
...
}
storageConfig.EncodeVersioner 的构建:
- 由 codeConfig 构建而来;
- 而 codeConfig.StorageVersion 保留了存储到 etcd 的 version;
// vendor/k8s.io/apiserver/pkg/server/storage/storage_factory.go
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
...
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
..
storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)
...
}
最终,依据 scheme.PrioritizedVersions,失去该 Group 下的第 0 号 version,作为存储到 etcd 的 version:
// staging/src/k8s.io/apiserver/pkg/server/storage/resource_encoding_config.go
func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {if !o.scheme.IsGroupRegistered(resource.Group) {return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
}
resourceOverride, resourceExists := o.resources[resource]
if resourceExists {return resourceOverride.ExternalResourceEncoding, nil}
// return the most preferred external version for the group
return o.scheme.PrioritizedVersionsForGroup(resource.Group)[0], nil
}
2.version 构建
下面看到,存储到 etcd 的 version = scheme.PrioritizedVersionsForGroup() 返回集群的第 0 号元素,即第一个元素;
而 scheme.PrioritizedVersionsForGroup() 返回的是:versionPriority+observedVersions 汇合;
// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func (s *Scheme) PrioritizedVersionsForGroup(group string) []schema.GroupVersion {ret := []schema.GroupVersion{}
for _, version := range s.versionPriority[group] {ret = append(ret, schema.GroupVersion{Group: group, Version: version})
}
for _, observedVersion := range s.observedVersions {
if observedVersion.Group != group {continue}
found := false
for _, existing := range ret {
if existing == observedVersion {
found = true
break
}
}
if !found {ret = append(ret, observedVersion)
}
}
return ret
}
重点看一下 versionPriority:
- 由 SetVersionPriority() 函数赋值;
- s.versionPriority[group]=[]string{…},其程序 == 传入的 versions 的程序;
// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func (s *Scheme) SetVersionPriority(versions ...schema.GroupVersion) error {groups := sets.String{}
order := []string{}
for _, version := range versions {
...
groups.Insert(version.Group)
order = append(order, version.Version)
}
if len(groups) != 1 {return fmt.Errorf("must register versions for exactly one group: %v", strings.Join(groups.List(), ","))
}
s.versionPriority[groups.List()[0]] = order
return nil
}
举个例子,autoscaling 这个 Group 下,如何调用 SetVersionPriority() 函数:
- autosclaing 中调用 SetVersionPriority 传入参数 =(v1,v2beta1,v2beta2);
- 因为抉择第一个 version 作为存储到 etcd 的 version,故对于 hpa,存储在 etcd 的 version=v1;
// pkg/apis/autoscaling/install/install.go
func init() {Install(legacyscheme.Scheme)
}
// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {utilruntime.Must(autoscaling.AddToScheme(scheme))
utilruntime.Must(v2beta2.AddToScheme(scheme))
utilruntime.Must(v2beta1.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v2.SchemeGroupVersion, v2beta1.SchemeGroupVersion, v2beta2.SchemeGroupVersion))
}
三. 不同版本之间的转换
对于 hpa,在 pkg/apis/autoscaling 下定义了 internal version 与各个 version 之间的转换函数;
对于 hpa:
- 存储在 etcd 的 version=v1;
-
通过 kubectl get hpa.v2beta2.autoscaling 的时候:
- apiserver 首先将 v1 转换为 internal version;
- 而后将 internal version 转换为 v2beta2;
各个 version 之间的转换函数:
- pkg/apis/autoscaling/v1/conversion.go
- pkg/apis/autoscaling/v2/conversion.go
- pkg/apis/autoscaling/v2beta1/conversion.go
- pkg/apis/autoscaling/v2beta2/conversion.go
比方 v1 的 objMetricSource 属性,被转换为 internal version 中的 objectMetricSource:
// pkg/apis/autoscaling/v1/conversion.go
func Convert_v1_ObjectMetricSource_To_autoscaling_ObjectMetricSource(in *autoscalingv1.ObjectMetricSource, out *autoscaling.ObjectMetricSource, s conversion.Scope) error {
var metricType autoscaling.MetricTargetType
if in.AverageValue == nil {metricType = autoscaling.ValueMetricType} else {metricType = autoscaling.AverageValueMetricType}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: &in.TargetValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
1. 注册 conversion 函数
对于 hpa v2beta2,注册了一堆的 conversion 函数
// pkg/apis/autoscaling/v2beta2/zz_generated.conversion.go
func init() {localSchemeBuilder.Register(RegisterConversions)
}
// RegisterConversions adds conversion functions to the given scheme.
// Public to allow building arbitrary schemes.
func RegisterConversions(s *runtime.Scheme) error {if err := s.AddGeneratedConversionFunc((*v2beta2.ContainerResourceMetricSource)(nil), (*autoscaling.ContainerResourceMetricSource)(nil), func(a, b interface{}, scope conversion.Scope) error {return Convert_v2beta2_ContainerResourceMetricSource_To_autoscaling_ContainerResourceMetricSource(a.(*v2beta2.ContainerResourceMetricSource), b.(*autoscaling.ContainerResourceMetricSource), scope)
}); err != nil {return err}
...
// v1 的 ObjectMetricSource ----> v2beta2 的 ObjectMetricSource
if err := s.AddGeneratedConversionFunc((*autoscaling.ObjectMetricSource)(nil), (*v2beta2.ObjectMetricSource)(nil), func(a, b interface{}, scope conversion.Scope) error {return Convert_autoscaling_ObjectMetricSource_To_v2beta2_ObjectMetricSource(a.(*autoscaling.ObjectMetricSource), b.(*v2beta2.ObjectMetricSource), scope)
}); err != nil {return err}
...
}
2. 由 convertor 对象进行转换
// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func NewScheme() *Scheme {
s := &Scheme{gvkToType: map[schema.GroupVersionKind]reflect.Type{},
typeToGVK: map[reflect.Type][]schema.GroupVersionKind{},
unversionedTypes: map[reflect.Type]schema.GroupVersionKind{},
unversionedKinds: map[string]reflect.Type{},
fieldLabelConversionFuncs: map[schema.GroupVersionKind]FieldLabelConversionFunc{},
defaulterFuncs: map[reflect.Type]func(interface{}){},
versionPriority: map[string][]string{},
schemeName: naming.GetNameFromCallsite(internalPackages...),
}
s.converter = conversion.NewConverter(s.nameFunc)
// Enable couple default conversions by default.
utilruntime.Must(RegisterEmbeddedConversions(s))
utilruntime.Must(RegisterStringConversions(s))
return s
}
执行转换:
// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func (s *Scheme) ConvertToVersion(in Object, target GroupVersioner) (Object, error) {return s.convertToVersion(true, in, target)
}
func (s *Scheme) convertToVersion(copy bool, in Object, target GroupVersioner) (Object, error) {
...
meta := s.generateConvertMeta(in)
meta.Context = target
if err := s.converter.Convert(in, out, meta); err != nil {return nil, err}
setTargetKind(out, gvk)
return out, nil
}
参考:
- https://segmentfault.com/a/1190000042657668