后面咱们理解了 etcd 的集群搭建模式,也理解了如何在 Kubernetes 集群中来部署 etcd 集群,要开发一个对应的 Operator 其实也就是让咱们用代码去实现 etcd 的这一系列的运维工作而已,说白了就是把 StatefulSet 中的启动脚本翻译成咱们的 golang 代码。这里咱们分成不同的版本来渐进式开发,首先第一个版本咱们开发一个最简略的 Operator,间接用咱们的 Operator 去生成后面的 StatefulSet 模板即可。
我的项目初始化
同样在开发 Operator 之前咱们须要先提前想好咱们的 CRD 资源对象,比方咱们想要通过上面的 CR 资源来创立对应的 etcd 集群:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: demo
spec:
size: 3 # 正本数量
image: cnych/etcd:v3.4.13 # 镜像
因为其余信息都是通过脚本获取的,所以基本上咱们通过 size 和 image 两个字段就能够确定一个 Etcd 集群部署的样子了,所以咱们的第一个版本非常简单,只有可能写出正确的部署脚本即可,而后咱们在 Operator 当中依据下面咱们定义的 EtcdCluster 这个 CR 资源来组装一个 游戏 StatefulSet 和 Headless SVC 对象就能够了。
首先初始化我的项目,这里咱们应用 kubebuilder 来构建咱们的脚手架:
➜ kubebuilder init –domain ydzs.io –owner cnych –repo github.com/cnych/etcd-operator
Writing scaffold for you to edit…
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api
我的项目脚手架创立实现后,而后定义资源 API:
➜ kubebuilder create api –group etcd –version v1alpha1 –kind EtcdCluster
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit…
api/v1alpha1/etcdcluster_types.go
controllers/etcdcluster_controller.go
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
go build -o bin/manager main.go
这样咱们的我的项目就初始化实现了,整体的代码构造如下所示:
➜ etcd-operator tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── api
│ └── v1alpha1
├── bin
│ └── manager
├── config
│ ├── certmanager
│ ├── crd
│ ├── default
│ ├── manager
│ ├── prometheus
│ ├── rbac
│ ├── samples
│ └── webhook
├── controllers
│ ├── etcdcluster_controller.go
│ └── suite_test.go
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── main.go
14 directories, 10 files
而后依据咱们下面设计的 EtcdCluster 这个对象来编辑 Operator 的构造体即可,批改文件 www.sangpi.com 中的 EtcdClusterSpec 构造体:
// api/v1alpha1/etcdcluster_types.go
// EtcdClusterSpec defines the desired state of EtcdCluster
type EtcdClusterSpec struct {
// INSERT ADDITIONAL SPEC FIELDS – desired state of cluster
// Important: Run “make” to regenerate code after modifying this file
Size uint json:"size"
Image string json:"image"
}
要留神每次批改实现后须要执行 make 命令从新生成代码:
➜ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
go build -o bin/manager main.go
接下来咱们就能够去控制器的 Reconcile 函数中来实现咱们本人的业务逻辑了。
业务逻辑
首先在目录 controllers 上面创立一个 resource.go 文件,用来依据咱们定义的 EtcdCluster 对象生成对应的 StatefulSet 和 Headless SVC 对象。
// controllers/resource.go
package controllers
import (
“strconv”
“github.com/cnych/etcd-operator/api/v1alpha1”
appsv1 “k8s.io/api/apps/v1”
corev1 “k8s.io/api/core/v1”
“k8s.io/apimachinery/pkg/api/resource”
metav1 “k8s.io/apimachinery/pkg/apis/meta/v1”
)
var (
EtcdClusterLabelKey = “etcd.ydzs.io/cluster”
EtcdClusterCommonLabelKey = “app”
EtcdDataDirName = “datadir”
)
func MutateStatefulSet(cluster v1alpha1.EtcdCluster, sts appsv1.StatefulSet) {
sts.Labels = map[string]string{
EtcdClusterCommonLabelKey: “etcd”,
}
sts.Spec = appsv1.StatefulSetSpec{
Replicas: cluster.Spec.Size,
ServiceName: cluster.Name,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
EtcdClusterCommonLabelKey: "etcd",
},
},
Spec: corev1.PodSpec{
Containers: newContainers(cluster),
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: EtcdDataDirName,},
Spec: corev1.PersistentVolumeClaimSpec{AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce,},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
}
}
func newContainers(cluster *v1alpha1.EtcdCluster) []corev1.Container {
return []corev1.Container{
corev1.Container{
Name: “etcd”,
Image: cluster.Spec.Image,
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
Name: "peer",
ContainerPort: 2380,
},
corev1.ContainerPort{
Name: "client",
ContainerPort: 2379,
},
},
Env: []corev1.EnvVar{
corev1.EnvVar{
Name: "INITIAL_CLUSTER_SIZE",
Value: strconv.Itoa(int(*cluster.Spec.Size)),
},
corev1.EnvVar{
Name: "SET_NAME",
Value: cluster.Name,
},
corev1.EnvVar{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.podIP",},
},
},
corev1.EnvVar{
Name: "MY_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace",},
},
},
},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: EtcdDataDirName,
MountPath: "/var/run/etcd",
},
},
Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n ETCDCTL_API=3\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\"| awk'{ print $1}'| awk -F \",\"'{print $1}'\n }\n\n initial_peers() {\n PEERS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n PEERS=\"${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380\"\n done\n echo ${PEERS}\n }\n\n # etcd-SET_ID\n SET_ID=${HOSTNAME##*-}\n\n # adding a new member to existing cluster (assuming all initial pods are available)\n if [\"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n # export ETCDCTL_ENDPOINTS=$(eps)\n # member already added?\n\n MEMBER_HASH=$(member_hash)\n if [-n \"${MEMBER_HASH}\" ]; then\n # the member hash exists but for some reason etcd failed\n # as the datadir has not be created, we can remove the member\n # and retrieve new hash\n echo \"Remove member ${MEMBER_HASH}\"\n etcdctl --endpoints=$(eps) member remove ${MEMBER_HASH}\n fi\n\n echo \"Adding new member\"\n\n etcdctl member --endpoints=$(eps) add ${HOSTNAME} --peer-urls=http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 | grep \"^ETCD_\" > /var/run/etcd/new_member_envs\n\n if [$? -ne 0]; then\n echo \"member add ${HOSTNAME} error.\"\n rm -f /var/run/etcd/new_member_envs\n exit 1\n fi\n\n echo \"==> Loading env vars of existing cluster...\"\n sed -ie \"s/^/export /\" /var/run/etcd/new_member_envs\n cat /var/run/etcd/new_member_envs\n . /var/run/etcd/new_member_envs\n\n exec etcd --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --data-dir /var/run/etcd/default.etcd\n fi\n\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n while true; do\n echo \"Waiting for ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local to come up\"\n ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local > /dev/null && break\n sleep 1s\n done\n done\n\n echo \"join member ${HOSTNAME}\"\n # join member\n exec etcd --name ${HOSTNAME} \\\n --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 \\\n --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --initial-cluster-token etcd-cluster-1 \\\n --data-dir /var/run/etcd/default.etcd \\\n --initial-cluster $(initial_peers) \\\n --initial-cluster-state new",
},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\"| awk'{ print $1}'| awk -F \",\"'{print $1}'\n }\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n export ETCDCTL_ENDPOINTS=$(eps)\n SET_ID=${HOSTNAME##*-}\n\n # Removing member from cluster\n if [\"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n echo \"Removing ${HOSTNAME} from etcd cluster\"\n etcdctl member remove $(member_hash)\n if [$? -eq 0]; then\n # Remove everything otherwise the cluster will no longer scale-up\n rm -rf /var/run/etcd/*\n fi\n fi",
},
},
},
},
},
}
}
func MutateHeadlessSvc(cluster v1alpha1.EtcdCluster, svc corev1.Service) {
svc.Labels = map[string]string{
EtcdClusterCommonLabelKey: “etcd”,
}
svc.Spec = corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
EtcdClusterLabelKey: cluster.Name,
},
Ports: []corev1.ServicePort{
corev1.ServicePort{
Name: "peer",
Port: 2380,
},
corev1.ServicePort{
Name: "client",
Port: 2379,
},
},
}
}
下面的代码尽管很多,但逻辑很简略,就是依据咱们的 EtcdCluter 去结构 StatefulSet 和 Headless SVC 资源对象,结构实现后,当咱们创立 EtcdCluster 的时候就能够在控制器的 Reconcile 函数中去进行逻辑解决了,这里咱们也能够应用后面示例中的代码来简略解决即可,代码如下所示:
// controllers/etcdcluster_controller.go
func (r *EtcdClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues(“etcdcluster”, req.NamespacedName)
// 首先咱们获取 EtcdCluster 实例
var etcdCluster etcdv1alpha1.EtcdCluster
if err := r.Client.Get(ctx, req.NamespacedName, &etcdCluster); err != nil {
// EtcdCluster was deleted,Ignore
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 失去 EtcdCluster 过后去创立对应的 StatefulSet 和 Service
// CreateOrUpdate
// (就是察看的以后状态和冀望的状态进行比照)
// 调谐,获取到以后的一个状态,而后和咱们冀望的状态进行比照是不是就能够
// CreateOrUpdate Service
var svc corev1.Service
svc.Name = etcdCluster.Name
svc.Namespace = etcdCluster.Namespace
or, err := ctrl.CreateOrUpdate(ctx, r, &svc, func() error {
// 调谐必须在这个函数中去实现
MutateHeadlessSvc(&etcdCluster, &svc)
return controllerutil.SetControllerReference(&etcdCluster, &svc, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info(“CreateOrUpdate”, “Service”, or)
// CreateOrUpdate StatefulSet
var sts appsv1.StatefulSet
sts.Name = etcdCluster.Name
sts.Namespace = etcdCluster.Namespace
or, err = ctrl.CreateOrUpdate(ctx, r, &sts, func() error {
// 调谐必须在这个函数中去实现
MutateStatefulSet(&etcdCluster, &sts)
return controllerutil.SetControllerReference(&etcdCluster, &sts, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info(“CreateOrUpdate”, “StatefulSet”, or)
return ctrl.Result{}, nil
}
这里咱们就是去对咱们的 EtcdCluster 对象进行调谐,而后去创立或者更新对应的 StatefulSet 或者 Headless SVC 对象,逻辑很简略,这样咱们就实现咱们的第一个版本的 etcd-operator。
调试
接下来咱们首先装置咱们的 CRD 对象,让咱们的 Kubernetes 零碎辨认咱们的 EtcdCluster 对象:
➜ make install
/Users/ych/devs/projects/go/bin/controller-gen “crd:trivialVersions=true” rbac:roleName=manager-role webhook paths=”./…” output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f –
customresourcedefinition.apiextensions.k8s.io/etcdclusters.etcd.ydzs.io configured
而后运行控制器:
➜ make run
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
/Users/ych/devs/projects/go/bin/controller-gen “crd:trivialVersions=true” rbac:roleName=manager-role webhook paths=”./…” output:crd:artifacts:config=config/crd/bases
go run ./main.go
2020-11-20T17:44:48.222+0800 INFO controller-runtime.metrics metrics server is starting to listen {“addr”: “:8080”}
2020-11-20T17:44:48.223+0800 INFO setup starting manager
2020-11-20T17:44:48.223+0800 INFO controller-runtime.manager starting metrics server {“path”: “/metrics”}
2020-11-20T17:44:48.223+0800 INFO controller-runtime.controller Starting EventSource {“controller”: “etcdcluster”, “source”: “kind source: /, Kind=”}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting Controller {“controller”: “etcdcluster”}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting workers {“controller”: “etcdcluster”, “worker count”: 1}
控制器启动胜利后咱们就能够去创立咱们的 Etcd 集群了,将示例 CR 资源清单批改成上面的 YAML:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: etcd-sample
spec:
size: 3
image: cnych/etcd:v3.4.13
另外开启一个终端创立下面的资源对象:
➜ kubectl apply -f config/samples/etcd_v1alpha1_etcdcluster.yaml
etcdcluster.etcd.ydzs.io/etcd-sample created
创立实现后咱们能够查看对应的 EtcdCluster 对象:
➜ kubectl get etcdcluster
NAME AGE
etcd-sample 2m35s
对应也会主动创立咱们的 StatefulSet 和 Service 资源清单:
➜ kubectl get all -l app=etcd
NAME READY STATUS RESTARTS AGE
pod/etcd-sample-0 1/1 Running 0 85s
pod/etcd-sample-1 1/1 Running 0 71s
pod/etcd-sample-2 1/1 Running 0 66s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/etcd-sample ClusterIP None <none> 2380/TCP,2379/TCP 86s
NAME READY AGE
statefulset.apps/etcd-sample 3/3 87s
到这里咱们的 Etcd 集群就启动起来了,咱们是不是只通过简略的几行代码就实现了一个 etcd-operator。
图片
当然还有很多细节没有解决,比方还没有增加对 StatefulSet 和 Headless SVC 的 RBAC 权限申明以及这两个资源对象变更的 Watch,这个后面咱们曾经解说过了,大家能够试着欠缺这块实现。不过这里咱们实现 etcd operator 的形式比拟讨巧,咱们须要提前去编写启动脚本,这个当然不算一个惯例的形式,然而咱们晓得了如果去启动 etcd 集群了,后续也就能够用 golang 代码去实现了,所以这只是一个一个过程的实现而已~