关于kubernetes:kubernetes资源对象不同版本GV的保存和转换

38次阅读

共计 8358 个字符,预计需要花费 21 分钟才能阅读完成。

kubernetes 通过 GVK 惟一标识一个 API 对象类型,比方 HPA:

  • G=autoscaling
  • V=v1/v2beta1/v2beta2/v2…
  • Kind=HorizontalPodAutoscaler

随着 kubernetes 版本的迭代,产生了不同的 Version,这就引发如下的问题:

  • 当应用不同的 Version 创立的资源对象时,最终存储在 etcd 的是哪个 Version?
  • 当应用不同的 Version Get 资源对象时,在 apiserver 中是如何转换的?

一. APIVersion 的流转

APIVersion 蕴含以下几种:

  • External Version:

    • apiserver 对外裸露的 version,比方 v1/v2beta1 等;
  • Internal Version:

    • 外部 version,是所有 version 字段的超集,通常是 latest external version;
  • Storage Version:

    • 保留到 etcd 的 version,通常是 external version 中 stable 的 vervsion,比方 v1/v2 等;

ApiServer 中各个 version 之间不能间接转换,通常应用 Internal version 进行直达,即:

v1-->internal version-->v2beta1

APIVersion 的流转过程:

  • 通过 kubectl create 创立资源对象时,client 端应用 External Version,即 apiserver 裸露的 version 和 scheme;
  • 在 apiserver 中,对于 External Version 的对象:

    • 首先,被转换为 Interval version;
    • 而后,将 Internal version 转换为 storage version;
  • 通过 kubectl get 查问资源对象时,依据 client 应用的 Version:

    • 首先,将 Storage Version 对象转换为 Internal version;
    • 而后,将 Internal version 转换为 External version,返回给 client;

二. storageVersion

1. 抉择入口

抉择 etcd 保留 Version 的入口代码:

  • 通过 storageVersioner 抉择 version;
  • storageVersioner 是 storageProvider 的对象内属性;
// vendor/k8s.io/apiserver/pkg/endpoints/installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
    ...
    storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
    ...
    var apiResource metav1.APIResource
    if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionHash) &&
        isStorageVersionProvider &&
        storageVersionProvider.StorageVersion() != nil {versioner := storageVersionProvider.StorageVersion()                        // 通过 storageVersioner 抉择 version
        gvk, err := getStorageVersionKind(versioner, storage, a.group.Typer)        // 失去保留的 GVK
        if err != nil {return nil, nil, err}
        apiResource.StorageVersionHash = discovery.StorageVersionHash(gvk.Group, gvk.Version, gvk.Kind)
    }
    ...
}

storageProvider 即 Store,storageVersioner 是其中的一个属性,其属性的构建:

  • 由 storageConfig.EncodeVersioner 得来;
// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    ...
    if e.Storage.Storage == nil {
        ...
        e.StorageVersioner = opts.StorageConfig.EncodeVersioner
        ...
    }
    ...
}

storageConfig.EncodeVersioner 的构建:

  • 由 codeConfig 构建而来;
  • 而 codeConfig.StorageVersion 保留了存储到 etcd 的 version;
// vendor/k8s.io/apiserver/pkg/server/storage/storage_factory.go
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
    ...
    codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
    ..
    storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)
    ...
}

最终,依据 scheme.PrioritizedVersions,失去该 Group 下的第 0 号 version,作为存储到 etcd 的 version:

// staging/src/k8s.io/apiserver/pkg/server/storage/resource_encoding_config.go
func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {if !o.scheme.IsGroupRegistered(resource.Group) {return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
    }
    resourceOverride, resourceExists := o.resources[resource]
    if resourceExists {return resourceOverride.ExternalResourceEncoding, nil}
    // return the most preferred external version for the group
    return o.scheme.PrioritizedVersionsForGroup(resource.Group)[0], nil
}

2.version 构建

下面看到,存储到 etcd 的 version = scheme.PrioritizedVersionsForGroup() 返回集群的第 0 号元素,即第一个元素;

而 scheme.PrioritizedVersionsForGroup() 返回的是:versionPriority+observedVersions 汇合;

// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func (s *Scheme) PrioritizedVersionsForGroup(group string) []schema.GroupVersion {ret := []schema.GroupVersion{}
    for _, version := range s.versionPriority[group] {ret = append(ret, schema.GroupVersion{Group: group, Version: version})
    }
    for _, observedVersion := range s.observedVersions {
        if observedVersion.Group != group {continue}
        found := false
        for _, existing := range ret {
            if existing == observedVersion {
                found = true
                break
            }
        }
        if !found {ret = append(ret, observedVersion)
        }
    }
    return ret
}

重点看一下 versionPriority:

  • 由 SetVersionPriority() 函数赋值;
  • s.versionPriority[group]=[]string{…},其程序 == 传入的 versions 的程序;
// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func (s *Scheme) SetVersionPriority(versions ...schema.GroupVersion) error {groups := sets.String{}
    order := []string{}
    for _, version := range versions {
        ...
        groups.Insert(version.Group)
        order = append(order, version.Version)
    }
    if len(groups) != 1 {return fmt.Errorf("must register versions for exactly one group: %v", strings.Join(groups.List(), ","))
    }
    s.versionPriority[groups.List()[0]] = order
    return nil
}

举个例子,autoscaling 这个 Group 下,如何调用 SetVersionPriority() 函数:

  • autosclaing 中调用 SetVersionPriority 传入参数 =(v1,v2beta1,v2beta2);
  • 因为抉择第一个 version 作为存储到 etcd 的 version,故对于 hpa,存储在 etcd 的 version=v1;
// pkg/apis/autoscaling/install/install.go
func init() {Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {utilruntime.Must(autoscaling.AddToScheme(scheme))
    utilruntime.Must(v2beta2.AddToScheme(scheme))
    utilruntime.Must(v2beta1.AddToScheme(scheme))
    utilruntime.Must(v1.AddToScheme(scheme))
    utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v2.SchemeGroupVersion, v2beta1.SchemeGroupVersion, v2beta2.SchemeGroupVersion))
}

三. 不同版本之间的转换

对于 hpa,在 pkg/apis/autoscaling 下定义了 internal version 与各个 version 之间的转换函数;

对于 hpa:

  • 存储在 etcd 的 version=v1;
  • 通过 kubectl get hpa.v2beta2.autoscaling 的时候:

    • apiserver 首先将 v1 转换为 internal version;
    • 而后将 internal version 转换为 v2beta2;

各个 version 之间的转换函数:

  • pkg/apis/autoscaling/v1/conversion.go
  • pkg/apis/autoscaling/v2/conversion.go
  • pkg/apis/autoscaling/v2beta1/conversion.go
  • pkg/apis/autoscaling/v2beta2/conversion.go

比方 v1 的 objMetricSource 属性,被转换为 internal version 中的 objectMetricSource:

// pkg/apis/autoscaling/v1/conversion.go
func Convert_v1_ObjectMetricSource_To_autoscaling_ObjectMetricSource(in *autoscalingv1.ObjectMetricSource, out *autoscaling.ObjectMetricSource, s conversion.Scope) error {
    var metricType autoscaling.MetricTargetType
    if in.AverageValue == nil {metricType = autoscaling.ValueMetricType} else {metricType = autoscaling.AverageValueMetricType}
    out.Target = autoscaling.MetricTarget{
        Type:         metricType,
        Value:        &in.TargetValue,
        AverageValue: in.AverageValue,
    }
    out.DescribedObject = autoscaling.CrossVersionObjectReference{
        Kind:       in.Target.Kind,
        Name:       in.Target.Name,
        APIVersion: in.Target.APIVersion,
    }
    out.Metric = autoscaling.MetricIdentifier{
        Name:     in.MetricName,
        Selector: in.Selector,
    }
    return nil
}

1. 注册 conversion 函数

对于 hpa v2beta2,注册了一堆的 conversion 函数

// pkg/apis/autoscaling/v2beta2/zz_generated.conversion.go
func init() {localSchemeBuilder.Register(RegisterConversions)
}

// RegisterConversions adds conversion functions to the given scheme.
// Public to allow building arbitrary schemes.
func RegisterConversions(s *runtime.Scheme) error {if err := s.AddGeneratedConversionFunc((*v2beta2.ContainerResourceMetricSource)(nil), (*autoscaling.ContainerResourceMetricSource)(nil), func(a, b interface{}, scope conversion.Scope) error {return Convert_v2beta2_ContainerResourceMetricSource_To_autoscaling_ContainerResourceMetricSource(a.(*v2beta2.ContainerResourceMetricSource), b.(*autoscaling.ContainerResourceMetricSource), scope)
    }); err != nil {return err}
    ...
    // v1 的 ObjectMetricSource ----> v2beta2 的 ObjectMetricSource
    if err := s.AddGeneratedConversionFunc((*autoscaling.ObjectMetricSource)(nil), (*v2beta2.ObjectMetricSource)(nil), func(a, b interface{}, scope conversion.Scope) error {return Convert_autoscaling_ObjectMetricSource_To_v2beta2_ObjectMetricSource(a.(*autoscaling.ObjectMetricSource), b.(*v2beta2.ObjectMetricSource), scope)
    }); err != nil {return err}
    ...
}

2. 由 convertor 对象进行转换

// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func NewScheme() *Scheme {
    s := &Scheme{gvkToType:                 map[schema.GroupVersionKind]reflect.Type{},
        typeToGVK:                 map[reflect.Type][]schema.GroupVersionKind{},
        unversionedTypes:          map[reflect.Type]schema.GroupVersionKind{},
        unversionedKinds:          map[string]reflect.Type{},
        fieldLabelConversionFuncs: map[schema.GroupVersionKind]FieldLabelConversionFunc{},
        defaulterFuncs:            map[reflect.Type]func(interface{}){},
        versionPriority:           map[string][]string{},
        schemeName:                naming.GetNameFromCallsite(internalPackages...),
    }
    s.converter = conversion.NewConverter(s.nameFunc)

    // Enable couple default conversions by default.
    utilruntime.Must(RegisterEmbeddedConversions(s))
    utilruntime.Must(RegisterStringConversions(s))
    return s
}

执行转换:

// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
func (s *Scheme) ConvertToVersion(in Object, target GroupVersioner) (Object, error) {return s.convertToVersion(true, in, target)
}

func (s *Scheme) convertToVersion(copy bool, in Object, target GroupVersioner) (Object, error) {
    ...
    meta := s.generateConvertMeta(in)
    meta.Context = target
    if err := s.converter.Convert(in, out, meta); err != nil {return nil, err}
    setTargetKind(out, gvk)
    return out, nil
}

参考:

  1. https://segmentfault.com/a/1190000042657668

正文完
 0