后面咱们理解了 etcd 的集群搭建模式,也理解了如何在 Kubernetes 集群中来部署 etcd 集群,要开发一个对应的 Operator 其实也就是让咱们用代码去实现 etcd 的这一系列的运维工作而已,说白了就是把 StatefulSet 中的启动脚本翻译成咱们的 golang 代码。这里咱们分成不同的版本来渐进式开发,首先第一个版本咱们开发一个最简略的 Operator,间接用咱们的 Operator 去生成后面的 StatefulSet 模板即可。

我的项目初始化
同样在开发 Operator 之前咱们须要先提前想好咱们的 CRD 资源对象,比方咱们想要通过上面的 CR 资源来创立对应的 etcd 集群:

apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: demo
spec:
size: 3 # 正本数量
image: cnych/etcd:v3.4.13 # 镜像
因为其余信息都是通过脚本获取的,所以基本上咱们通过 size 和 image 两个字段就能够确定一个 Etcd 集群部署的样子了,所以咱们的第一个版本非常简单,只有可能写出正确的部署脚本即可,而后咱们在 Operator 当中依据下面咱们定义的 EtcdCluster 这个 CR 资源来组装一个 游戏StatefulSet 和 Headless SVC 对象就能够了。

首先初始化我的项目,这里咱们应用 kubebuilder 来构建咱们的脚手架:

➜ kubebuilder init --domain ydzs.io --owner cnych --repo github.com/cnych/etcd-operator
Writing scaffold for you to edit...
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api
我的项目脚手架创立实现后,而后定义资源 API:

➜ kubebuilder create api --group etcd --version v1alpha1 --kind EtcdCluster
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1alpha1/etcdcluster_types.go
controllers/etcdcluster_controller.go
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
这样咱们的我的项目就初始化实现了,整体的代码构造如下所示:

➜ etcd-operator tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── api
│ └── v1alpha1
├── bin
│ └── manager
├── config
│ ├── certmanager
│ ├── crd
│ ├── default
│ ├── manager
│ ├── prometheus
│ ├── rbac
│ ├── samples
│ └── webhook
├── controllers
│ ├── etcdcluster_controller.go
│ └── suite_test.go
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── main.go

14 directories, 10 files
而后依据咱们下面设计的 EtcdCluster 这个对象来编辑 Operator 的构造体即可,批改文件www.sangpi.com 中的 EtcdClusterSpec 构造体:

// api/v1alpha1/etcdcluster_types.go

// EtcdClusterSpec defines the desired state of EtcdCluster
type EtcdClusterSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

Size uint json:"size"
Image string json:"image"
}
要留神每次批改实现后须要执行 make 命令从新生成代码:

➜ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
接下来咱们就能够去控制器的 Reconcile 函数中来实现咱们本人的业务逻辑了。

业务逻辑
首先在目录 controllers 上面创立一个 resource.go 文件,用来依据咱们定义的 EtcdCluster 对象生成对应的 StatefulSet 和 Headless SVC 对象。

// controllers/resource.go

package controllers

import (
"strconv"

"github.com/cnych/etcd-operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
EtcdClusterLabelKey = "etcd.ydzs.io/cluster"
EtcdClusterCommonLabelKey = "app"
EtcdDataDirName = "datadir"
)

func MutateStatefulSet(cluster v1alpha1.EtcdCluster, sts appsv1.StatefulSet) {
sts.Labels = map[string]string{
EtcdClusterCommonLabelKey: "etcd",
}
sts.Spec = appsv1.StatefulSetSpec{
Replicas: cluster.Spec.Size,
ServiceName: cluster.Name,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{

Labels: map[string]string{ EtcdClusterLabelKey: cluster.Name, EtcdClusterCommonLabelKey: "etcd",},

},
Spec: corev1.PodSpec{

Containers: newContainers(cluster),

},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
corev1.PersistentVolumeClaim{

ObjectMeta: metav1.ObjectMeta{ Name: EtcdDataDirName,},Spec: corev1.PersistentVolumeClaimSpec{ AccessModes: []corev1.PersistentVolumeAccessMode{  corev1.ReadWriteOnce, }, Resources: corev1.ResourceRequirements{  Requests: corev1.ResourceList{   corev1.ResourceStorage: resource.MustParse("1Gi"),  }, },},

},
},
}
}

func newContainers(cluster *v1alpha1.EtcdCluster) []corev1.Container {
return []corev1.Container{
corev1.Container{
Name: "etcd",
Image: cluster.Spec.Image,
Ports: []corev1.ContainerPort{

corev1.ContainerPort{ Name:          "peer", ContainerPort: 2380,},corev1.ContainerPort{ Name:          "client", ContainerPort: 2379,},

},
Env: []corev1.EnvVar{

corev1.EnvVar{ Name:  "INITIAL_CLUSTER_SIZE", Value: strconv.Itoa(int(*cluster.Spec.Size)),},corev1.EnvVar{ Name:  "SET_NAME", Value: cluster.Name,},corev1.EnvVar{ Name: "POD_IP", ValueFrom: &corev1.EnvVarSource{  FieldRef: &corev1.ObjectFieldSelector{   FieldPath: "status.podIP",  }, },},corev1.EnvVar{ Name: "MY_NAMESPACE", ValueFrom: &corev1.EnvVarSource{  FieldRef: &corev1.ObjectFieldSelector{   FieldPath: "metadata.namespace",  }, },},

},
VolumeMounts: []corev1.VolumeMount{

corev1.VolumeMount{ Name:      EtcdDataDirName, MountPath: "/var/run/etcd",},

},
Command: []string{

"/bin/sh", "-ec","HOSTNAME=$(hostname)\n\n              ETCDCTL_API=3\n\n              eps() {\n                  EPS=\"\"\n                  for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                      EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n                  done\n                  echo ${EPS}\n              }\n\n              member_hash() {\n                  etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n              }\n\n              initial_peers() {\n                  PEERS=\"\"\n                  for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                    PEERS=\"${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380\"\n                  done\n                  echo ${PEERS}\n              }\n\n              # etcd-SET_ID\n              SET_ID=${HOSTNAME##*-}\n\n              # adding a new member to existing cluster (assuming all initial pods are available)\n              if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n                  # export ETCDCTL_ENDPOINTS=$(eps)\n                  # member already added?\n\n                  MEMBER_HASH=$(member_hash)\n                  if [ -n \"${MEMBER_HASH}\" ]; then\n                      # the member hash exists but for some reason etcd failed\n                      # as the datadir has not be created, we can remove the member\n                      # and retrieve new hash\n                      echo \"Remove member ${MEMBER_HASH}\"\n                      etcdctl --endpoints=$(eps) member remove ${MEMBER_HASH}\n                  fi\n\n                  echo \"Adding new member\"\n\n                  etcdctl member --endpoints=$(eps) add ${HOSTNAME} --peer-urls=http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 | grep \"^ETCD_\" > /var/run/etcd/new_member_envs\n\n                  if [ $? -ne 0 ]; then\n                      echo \"member add ${HOSTNAME} error.\"\n                      rm -f /var/run/etcd/new_member_envs\n                      exit 1\n                  fi\n\n                  echo \"==> Loading env vars of existing cluster...\"\n                  sed -ie \"s/^/export /\" /var/run/etcd/new_member_envs\n                  cat /var/run/etcd/new_member_envs\n                  . /var/run/etcd/new_member_envs\n\n                  exec etcd --listen-peer-urls http://${POD_IP}:2380 \\\n                      --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n                      --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n                      --data-dir /var/run/etcd/default.etcd\n              fi\n\n              for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                  while true; do\n                      echo \"Waiting for ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local to come up\"\n                      ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local > /dev/null && break\n                      sleep 1s\n                  done\n              done\n\n              echo \"join member ${HOSTNAME}\"\n              # join member\n              exec etcd --name ${HOSTNAME} \\\n                  --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 \\\n                  --listen-peer-urls http://${POD_IP}:2380 \\\n                  --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n                  --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n                  --initial-cluster-token etcd-cluster-1 \\\n                  --data-dir /var/run/etcd/default.etcd \\\n                  --initial-cluster $(initial_peers) \\\n                  --initial-cluster-state new",

},
Lifecycle: &corev1.Lifecycle{

PreStop: &corev1.Handler{ Exec: &corev1.ExecAction{  Command: []string{   "/bin/sh", "-ec",   "HOSTNAME=$(hostname)\n\n                    member_hash() {\n                        etcdctl member list | grep -w \"$HOSTNAME\" | awk '{ print $1}' | awk -F \",\" '{ print $1}'\n                    }\n\n                    eps() {\n                        EPS=\"\"\n                        for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n                            EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n                        done\n                        echo ${EPS}\n                    }\n\n                    export ETCDCTL_ENDPOINTS=$(eps)\n                    SET_ID=${HOSTNAME##*-}\n\n                    # Removing member from cluster\n                    if [ \"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n                        echo \"Removing ${HOSTNAME} from etcd cluster\"\n                        etcdctl member remove $(member_hash)\n                        if [ $? -eq 0 ]; then\n                            # Remove everything otherwise the cluster will no longer scale-up\n                            rm -rf /var/run/etcd/*\n                        fi\n                    fi",  }, },},

},
},
}
}

func MutateHeadlessSvc(cluster v1alpha1.EtcdCluster, svc corev1.Service) {
svc.Labels = map[string]string{
EtcdClusterCommonLabelKey: "etcd",
}
svc.Spec = corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
EtcdClusterLabelKey: cluster.Name,
},
Ports: []corev1.ServicePort{
corev1.ServicePort{

Name: "peer",Port: 2380,

},
corev1.ServicePort{

Name: "client",Port: 2379,

},
},
}
}
下面的代码尽管很多,但逻辑很简略,就是依据咱们的 EtcdCluter 去结构 StatefulSet 和 Headless SVC 资源对象,结构实现后,当咱们创立 EtcdCluster 的时候就能够在控制器的 Reconcile 函数中去进行逻辑解决了,这里咱们也能够应用后面示例中的代码来简略解决即可,代码如下所示:

// controllers/etcdcluster_controller.go

func (r *EtcdClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("etcdcluster", req.NamespacedName)

// 首先咱们获取 EtcdCluster 实例
var etcdCluster etcdv1alpha1.EtcdCluster
if err := r.Client.Get(ctx, req.NamespacedName, &etcdCluster); err != nil {
// EtcdCluster was deleted,Ignore
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// 失去 EtcdCluster 过后去创立对应的StatefulSet和Service
// CreateOrUpdate

// (就是察看的以后状态和冀望的状态进行比照)

// 调谐,获取到以后的一个状态,而后和咱们冀望的状态进行比照是不是就能够

// CreateOrUpdate Service
var svc corev1.Service
svc.Name = etcdCluster.Name
svc.Namespace = etcdCluster.Namespace
or, err := ctrl.CreateOrUpdate(ctx, r, &svc, func() error {
// 调谐必须在这个函数中去实现
MutateHeadlessSvc(&etcdCluster, &svc)
return controllerutil.SetControllerReference(&etcdCluster, &svc, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate", "Service", or)

// CreateOrUpdate StatefulSet
var sts appsv1.StatefulSet
sts.Name = etcdCluster.Name
sts.Namespace = etcdCluster.Namespace
or, err = ctrl.CreateOrUpdate(ctx, r, &sts, func() error {
// 调谐必须在这个函数中去实现
MutateStatefulSet(&etcdCluster, &sts)
return controllerutil.SetControllerReference(&etcdCluster, &sts, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate", "StatefulSet", or)

return ctrl.Result{}, nil
}
这里咱们就是去对咱们的 EtcdCluster 对象进行调谐,而后去创立或者更新对应的 StatefulSet 或者 Headless SVC 对象,逻辑很简略,这样咱们就实现咱们的第一个版本的 etcd-operator。

调试
接下来咱们首先装置咱们的 CRD 对象,让咱们的 Kubernetes 零碎辨认咱们的 EtcdCluster 对象:

➜ make install
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/etcdclusters.etcd.ydzs.io configured
而后运行控制器:

➜ make run
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
/Users/ych/devs/projects/go/bin/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
go run ./main.go
2020-11-20T17:44:48.222+0800 INFO controller-runtime.metrics metrics server is starting to listen {"addr": ":8080"}
2020-11-20T17:44:48.223+0800 INFO setup starting manager
2020-11-20T17:44:48.223+0800 INFO controller-runtime.manager starting metrics server {"path": "/metrics"}
2020-11-20T17:44:48.223+0800 INFO controller-runtime.controller Starting EventSource {"controller": "etcdcluster", "source": "kind source: /, Kind="}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting Controller {"controller": "etcdcluster"}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting workers {"controller": "etcdcluster", "worker count": 1}
控制器启动胜利后咱们就能够去创立咱们的 Etcd 集群了,将示例 CR 资源清单批改成上面的 YAML:

apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: etcd-sample
spec:
size: 3
image: cnych/etcd:v3.4.13
另外开启一个终端创立下面的资源对象:

➜ kubectl apply -f config/samples/etcd_v1alpha1_etcdcluster.yaml
etcdcluster.etcd.ydzs.io/etcd-sample created
创立实现后咱们能够查看对应的 EtcdCluster 对象:

➜ kubectl get etcdcluster
NAME AGE
etcd-sample 2m35s
对应也会主动创立咱们的 StatefulSet 和 Service 资源清单:

➜ kubectl get all -l app=etcd
NAME READY STATUS RESTARTS AGE
pod/etcd-sample-0 1/1 Running 0 85s
pod/etcd-sample-1 1/1 Running 0 71s
pod/etcd-sample-2 1/1 Running 0 66s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/etcd-sample ClusterIP None <none> 2380/TCP,2379/TCP 86s

NAME READY AGE
statefulset.apps/etcd-sample 3/3 87s
到这里咱们的 Etcd 集群就启动起来了,咱们是不是只通过简略的几行代码就实现了一个 etcd-operator。

图片

当然还有很多细节没有解决,比方还没有增加对 StatefulSet 和 Headless SVC 的 RBAC 权限申明以及这两个资源对象变更的 Watch,这个后面咱们曾经解说过了,大家能够试着欠缺这块实现。不过这里咱们实现 etcd operator 的形式比拟讨巧,咱们须要提前去编写启动脚本,这个当然不算一个惯例的形式,然而咱们晓得了如果去启动 etcd 集群了,后续也就能够用 golang 代码去实现了,所以这只是一个一个过程的实现而已~