乐趣区

使用clientgo包访问Kubernetes-CRD

Kubernetes API 服务器可通过自定义资源定义轻松扩展。但是,用 client-go 库访问这些资源有点麻烦,官方也没有完整的文档。如 kubebuilder operator-framework 都能很方便的帮助我们去创建实现一个 controller,但是封装的过于好导致我们并不清楚内部是怎么调用 client-go 的,很多场景我们是需要自己去调用接口操作 CRD 的而不是在 controller 中去访问 CRD。

安装 kubernetes

最简单的方式当然是使用 sealos 不管是单机还是生产环境还是云上都可以使用 sealos。

wget https://github.com/fanux/sealos/releases/download/v2.0.7/sealos && chmod +x sealos && mv sealos /usr/bin 

sealos init --passwd YOUR_SERVER_PASSWD \
    --master 192.168.0.2  --master 192.168.0.3  --master 192.168.0.4  \
    --node 192.168.0.5 \
    --pkg-url https://sealyun.oss-cn-beijing.aliyuncs.com/cf6bece970f6dab3d8dc8bc5b588cc18-1.16.0/kube1.16.0.tar.gz \
    --version v1.16.0

以上两条命令就搞定了一个 kubernetes 1.16.0 集群,其它版本也一样

本文非常实用,比较全比较完善的相关文档也比较难找到。

举个栗子:

我们在实现虚拟机 CRD 时,节点上 agent 需要查询虚拟机 CRD,这种情况显然我们不会通过 controller 进行操作,此时我们就需要知道怎么直接用 client-go 操作 CRD。
<!–more–>

创建 CRD

apiVersion: "apiextensions.k8s.io/v1beta1"
kind: "CustomResourceDefinition"
metadata:
  name: "projects.example.sealyun.com"
spec:
  group: "example.sealyun.com"
  version: "v1alpha1"
  scope: "Namespaced"
  names:
    plural: "projects"
    singular: "project"
    kind: "Project"
  validation:
    openAPIV3Schema:
      required: ["spec"]
      properties:
        spec:
          required: ["replicas"]
          properties:
            replicas:
              type: "integer"
              minimum: 1

这个可以使用 kubebuilder 或者 operator-framework 生成, 自己写太累

要定义自定义资源定义,您需要考虑 API 组名称(在本例中 example.sealyun.com)。按照惯例,这通常是您控制的域的域名(例如,您组织的域),以防止命名冲突。然后 CRD 的名称遵循模式 <plural-resource-name>.<api-group-name>,因此在这种情况下 projects.example.sealyun.com。

通常,您希望根据特定架构验证用户在自定义资源中存储的数据。这就是 spec.validation.openAPIV3Schema 它的用途:它包含一个描述资源应具有的格式的 JSON 模式。

使用 kubectl 创建资源定义, 如果用 kubebuilder 可以直接 make && make deploy:

> kubectl apply -f projects-crd.yaml
customresourcedefinition "projects.example.sealyun.com" created

可以创建此新资源类型的实例:

apiVersion: "example.sealyun.com/v1alpha1"
kind: "Project"
metadata:
  name: "example-project"
  namespace: "default"
spec:
  replicas: 1
> kubectl apply -f project.yaml
project "example-project" created
> kubectl get projects
NAME               AGE
example-project    2m

创建 Golang 客户端

接下来,我们将使用 client-go 包来访问这些自定义资源。

定义类型

kubebuilder 等都会自动为您生成,我这里为了讲清楚所有的东西也加上这块的相关说明

首先定义自定义资源的类型。通过 API 组版本组织这些类型是一个很好的做法; 例如,api/types/v1alpha1/project.go:

package v1alpha1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

type ProjectSpec struct {Replicas int `json:"replicas"`}

type Project struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec ProjectSpec `json:"spec"`
}

type ProjectList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`

    Items []Project `json:"items"`}

该 metav1.ObjectMeta 类型包含了典型 metadata 的属性

定义 DeepCopy 方法

Kubernetes API(在本例中为 Project 和 ProjectList)提供的每种类型都需要实现该 k8s.io/apimachinery/pkg/runtime.Object 接口。该接口定义了两种方法 GetObjectKind()和 DeepCopyObject()。第一种方法已经由嵌入式 metav1.TypeMeta 结构提供; 第二个你必须自己实现。

该 DeepCopyObject 方法旨在生成对象的深层副本。由于这涉及许多样板代码,因此很多工具通常会自动生成这些方法。为了本文的目的,我们将手动完成。继续向 deepcopy.go 同一个包添加第二个文件:

package v1alpha1

import "k8s.io/apimachinery/pkg/runtime"

// DeepCopyInto copies all properties of this object into another object of the
// same type that is provided as a pointer.
func (in *Project) DeepCopyInto(out *Project) {
    out.TypeMeta = in.TypeMeta
    out.ObjectMeta = in.ObjectMeta
    out.Spec = ProjectSpec{Replicas: in.Spec.Replicas,}
}

// DeepCopyObject returns a generically typed copy of an object
func (in *Project) DeepCopyObject() runtime.Object {out := Project{}
    in.DeepCopyInto(&out)

    return &out
}

// DeepCopyObject returns a generically typed copy of an object
func (in *ProjectList) DeepCopyObject() runtime.Object {out := ProjectList{}
    out.TypeMeta = in.TypeMeta
    out.ListMeta = in.ListMeta

    if in.Items != nil {out.Items = make([]Project, len(in.Items))
        for i := range in.Items {in.Items[i].DeepCopyInto(&out.Items[i])
        }
    }

    return &out
}

注册类型

接下来,您需要使客户端库知道新类型。允许客户端在与 API 服务器通信时自动处理新类型。

为此,register.go 请在包中添加一个新文件:

package v1alpha1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
)

const GroupName = "example.sealyun.com"
const GroupVersion = "v1alpha1"

var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion}

var (SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    AddToScheme   = SchemeBuilder.AddToScheme
)

func addKnownTypes(scheme *runtime.Scheme) error {
    scheme.AddKnownTypes(SchemeGroupVersion,
        &Project{},
        &ProjectList{},)

    metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
    return nil
}

此代码实际上并没有做任何事情(除了创建新 runtime.SchemeBuilder 实例)。重要的部分是 AddToScheme 函数,它是 runtime.SchemeBuilder 中创建的类型。一旦 Kubernetes 客户端初始化为注册您的类型,您可以稍后从客户端代码的任何部分调用此函数。

构建 HTTP 客户端

在定义类型并添加方法以在全局方案构建器中注册它们之后,您现在可以创建能够加载自定义资源的 HTTP 客户端。

为此,将以下代码添加到包的 main.go 文件中:

package main

import (
    "flag"
    "log"
    "time"

    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/apimachinery/pkg/runtime/serializer"

    "github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)

var kubeconfig string

func init() {flag.StringVar(&kubeconfig, "kubeconfig", "","path to Kubernetes config file")
    flag.Parse()}

func main() {
    var config *rest.Config
    var err error

    if kubeconfig == "" {log.Printf("using in-cluster configuration")
        config, err = rest.InClusterConfig()} else {log.Printf("using configuration from'%s'", kubeconfig)
        config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
    }

    if err != nil {panic(err)
    }

    v1alpha1.AddToScheme(scheme.Scheme)

    crdConfig := *config
    crdConfig.ContentConfig.GroupVersion = &schema.GroupVersion{Group: v1alpha1.GroupName, Version: v1alpha1.GroupVersion}
    crdConfig.APIPath = "/apis"
    crdConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
    crdConfig.UserAgent = rest.DefaultKubernetesUserAgent()

    exampleRestClient, err := rest.UnversionedRESTClientFor(&crdConfig)
    if err != nil {panic(err)
    }
}

您现在可以使用第 exampleRestClient 中创建的内容来查询 example.sealyun.com/v1alpha1API 组中的所有自定义资源。示例可能如下所示:

result := v1alpha1.ProjectList{}
err := exampleRestClient.
    Get().
    Resource("projects").
    Do().
    Into(&result)

为了以更加类型安全的方式使用您的 API,通常最好将这些操作包装在您自己的客户端集中。为此,创建一个新的子包 clientset/v1alpha1。首先,实现一个定义 API 组类型的接口,并将配置设置从您的 main 方法移动到该 clientset 的构造函数中(NewForConfig 在下面的示例中):

package v1alpha1

import (
    "github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
)

type ExampleV1Alpha1Interface interface {Projects(namespace string) ProjectInterface 
}

type ExampleV1Alpha1Client struct {restClient rest.Interface}

func NewForConfig(c *rest.Config) (*ExampleV1Alpha1Client, error) {
    config := *c
    config.ContentConfig.GroupVersion = &schema.GroupVersion{Group: v1alpha1.GroupName, Version: v1alpha1.GroupVersion}
    config.APIPath = "/apis"
    config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
    config.UserAgent = rest.DefaultKubernetesUserAgent()

    client, err := rest.RESTClientFor(&config)
    if err != nil {return nil, err}

    return &ExampleV1Alpha1Client{restClient: client}, nil
}

func (c *ExampleV1Alpha1Client) Projects(namespace string) ProjectInterface {
    return &projectClient{
        restClient: c.restClient,
        ns: namespace,
    }
}

以上是对 client 的封装

接下来,您需要实现一个特定的 Project 客户端集来访问自定义资源(请注意,上面的示例已经使用了我们仍需要提供的 ProjectInterface 和 projectClient 类型)。projects.go 在同一个包中创建第二个文件:

package v1alpha1

import (
    "github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
)

type ProjectInterface interface {List(opts metav1.ListOptions) (*v1alpha1.ProjectList, error)
    Get(name string, options metav1.GetOptions) (*v1alpha1.Project, error)
    Create(*v1alpha1.Project) (*v1alpha1.Project, error)
    Watch(opts metav1.ListOptions) (watch.Interface, error)
    // ...
}

type projectClient struct {
    restClient rest.Interface
    ns         string
}

func (c *projectClient) List(opts metav1.ListOptions) (*v1alpha1.ProjectList, error) {result := v1alpha1.ProjectList{}
    err := c.restClient.
        Get().
        Namespace(c.ns).
        Resource("projects").
        VersionedParams(&opts, scheme.ParameterCodec).
        Do().
        Into(&result)

    return &result, err
}

func (c *projectClient) Get(name string, opts metav1.GetOptions) (*v1alpha1.Project, error) {result := v1alpha1.Project{}
    err := c.restClient.
        Get().
        Namespace(c.ns).
        Resource("projects").
        Name(name).
        VersionedParams(&opts, scheme.ParameterCodec).
        Do().
        Into(&result)

    return &result, err
}

func (c *projectClient) Create(project *v1alpha1.Project) (*v1alpha1.Project, error) {result := v1alpha1.Project{}
    err := c.restClient.
        Post().
        Namespace(c.ns).
        Resource("projects").
        Body(project).
        Do().
        Into(&result)

    return &result, err
}

func (c *projectClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
    opts.Watch = true
    return c.restClient.
        Get().
        Namespace(c.ns).
        Resource("projects").
        VersionedParams(&opts, scheme.ParameterCodec).
        Watch()}

上面还缺少一些 Delete Update 方法,照抄就行,或者参考 pod 的实现

再去使用就变的非常简单了:

import clientV1alpha1 "github.com/martin-helmich/kubernetes-crd-example/clientset/v1alpha1"
// ...

func main() {
    // ...

    clientSet, err := clientV1alpha1.NewForConfig(config)
    if err != nil {panic(err)
    }

    projects, err := clientSet.Projects("default").List(metav1.ListOptions{})
    if err != nil {panic(err)
    }

    fmt.Printf("projects found: %+v\n", projects)
}

创建 informer

构建 Kubernetes operator 时,通常希望能够对新创建或更新的事件进行监听。理论上,可以定期调用该 List()方法并检查是否添加了新资源。

大多数情况通过使用初始 List()初始加载资源的所有相关实例,然后使用 Watch()订阅相关事件进行处理。然后,使用从 informer 接收的初始对象列表和更新来构建本地缓存,该缓存允许快速访问任何自定义资源,而无需每次都访问 API 服务器。

这种模式非常普遍,以至于 client-go 库为此提供了一个 cache 包:来自包的 Informerk8s.io/client-go/tools/cache。您可以为自定义资源构建新的 Informer,如下所示:

package main

import (
    "time"

    "github.com/martin-helmich/kubernetes-crd-example/api/types/v1alpha1"
    client_v1alpha1 "github.com/martin-helmich/kubernetes-crd-example/clientset/v1alpha1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/tools/cache"
)

func WatchResources(clientSet client_v1alpha1.ExampleV1Alpha1Interface) cache.Store {
    projectStore, projectController := cache.NewInformer(
        &cache.ListWatch{ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {return clientSet.Projects("some-namespace").List(lo)
            },
            WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {return clientSet.Projects("some-namespace").Watch(lo)
            },
        },
        &v1alpha1.Project{},
        1*time.Minute,
        cache.ResourceEventHandlerFuncs{},)

    go projectController.Run(wait.NeverStop)
    return projectStore
}

该 NewInformer 方法返回两个对象:第二个返回值,控制器控制 List()和 Watch()调用并填充第一个返回值,Store 缓存监听到的一些信息。
您现在可以使用 store 轻松访问 CRD,列出所有 CRD 或通过名称访问它们。store 函数返回 interface{}类型,因此您必须将它们强制转换回 CRD 类型:

store := WatchResource(clientSet)
project := store.GetByKey("some-namespace/some-project").(*v1alpha1.Project)

如此很多情况下就不需要再去调用 apiserver 了,给 apiserver 减轻压力.

在 kubebuilder 中进行访问

通过获取 manager 中的 reader, 但是这里只能读不能写,写的话需要 mgr.GetClient() 但是这个就必须是长时间运行的

更多详情
想加入 apiclient 功能 PR

package main

import (
    "context"
    "fmt"
    "os"

    "k8s.io/apimachinery/pkg/types"

    v1 "github.com/fanux/sealvm/api/v1"
    "github.com/prometheus/common/log"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    clientgoscheme "k8s.io/client-go/kubernetes/scheme"
)

var scheme = runtime.NewScheme()

func init() {v1.AddToScheme(scheme)
    clientgoscheme.AddToScheme(scheme)
}

func main() {mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme,})
    if err != nil {os.Exit(1)
    }
    client := mgr.GetAPIReader() // 如果是长时间运行用 mgr.GetClient()
    ctx := context.Background()
    name := types.NamespacedName{Namespace: "default", Name: "virtulmachine-sample"}

    vm := &v1.VirtulMachine{}
    if err := client.Get(ctx, name, vm); err != nil {log.Error(err, "unable to fetch vm")
    } else {fmt.Println(vm.Spec.CPU, vm.Spec.Memory, vm)
    }
}

推荐做法,直接调用 client:

package main

import (
    "context"
    "fmt"
    "sigs.k8s.io/controller-runtime/pkg/client"

    "k8s.io/apimachinery/pkg/types"

    v1 "github.com/fanux/sealvm/api/v1"
    "github.com/prometheus/common/log"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"

    clientgoscheme "k8s.io/client-go/kubernetes/scheme"
)

var scheme = runtime.NewScheme()

func init() {v1.AddToScheme(scheme)
    clientgoscheme.AddToScheme(scheme)
}

func getClient() (client.Client, error){config := ctrl.GetConfigOrDie()
    if config == nil {return nil, fmt.Errorf("config is nil")
    }
    options := ctrl.Options{Scheme:scheme}
    // Set default values for options fields
    //options = setOptionsDefaults(options)
    //mapper, err := options.MapperProvider(config)
    //if err != nil {//    log.Error(err, "Failed to get API Group-Resources")
    //    return nil, err
    //}

    client, err := client.New(config, client.Options{Scheme: options.Scheme})
    if err !=nil {return nil, err}
    return client,nil
}

func main() {client,err := getClient()
    if err != nil {fmt.Println("client is nil",err)
        return
    }

    ctx := context.Background()
    name := types.NamespacedName{Namespace: "default", Name: "virtulmachine-sample"}

    vm := &v1.VirtulMachine{}
    if err = client.Get(ctx, name, vm); err != nil {log.Error(err, "unable to fetch vm")
    } else {fmt.Println(vm.Spec.CPU, vm.Spec.Memory, vm)
    }
}

总结

虽然现在很多工具给我们写 CRD controller 带来了极大的便捷,但是对于 client-go 这些基本的使用还是非常必要的,而官方 client-go 的开发文档和事例真的是少之又少,基本仅包含非常基本的操作。

还有一个 dynamic client 的方式也可以用来访问自定义 CRD,但是文中的方式会更优雅更清晰更适合工程化。
kubernetes 一键 HA

退出移动版