共计 11930 个字符,预计需要花费 30 分钟才能阅读完成。
后面咱们理解了 etcd 的集群搭建模式,也理解了如何在 Kubernetes 集群中来部署 etcd 集群,要开发一个对应的 Operator 其实也就是让咱们用代码去实现 etcd 的这一系列的运维工作而已,说白了就是把 StatefulSet 中的启动脚本翻译成咱们的 golang 代码。这里咱们分成不同的版本来渐进式开发,首先第一个版本咱们开发一个最简略的 Operator,间接用咱们的 Operator 去生成后面的 StatefulSet 模板即可。
我的项目初始化
同样在开发 Operator 之前咱们须要先提前想好咱们的 CRD 资源对象,比方咱们想要通过上面的 CR 资源来创立对应的 etcd 集群:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: demo
spec:
size: 3 # 正本数量
image: cnych/etcd:v3.4.13 # 镜像
因为其余信息都是通过脚本获取的,所以基本上咱们通过 size 和 image 两个字段就能够确定一个 Etcd 集群部署的样子了,所以咱们的第一个版本非常简单,只有可能写出正确的部署脚本即可,而后咱们在 Operator 当中依据下面咱们定义的 EtcdCluster 这个 CR 资源来组装一个 游戏 StatefulSet 和 Headless SVC 对象就能够了。
首先初始化我的项目,这里咱们应用 kubebuilder 来构建咱们的脚手架:
➜ kubebuilder init –domain ydzs.io –owner cnych –repo github.com/cnych/etcd-operator
Writing scaffold for you to edit…
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api
我的项目脚手架创立实现后,而后定义资源 API:
➜ kubebuilder create api –group etcd –version v1alpha1 –kind EtcdCluster
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit…
api/v1alpha1/etcdcluster_types.go
controllers/etcdcluster_controller.go
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
go build -o bin/manager main.go
这样咱们的我的项目就初始化实现了,整体的代码构造如下所示:
➜ etcd-operator tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── api
│ └── v1alpha1
├── bin
│ └── manager
├── config
│ ├── certmanager
│ ├── crd
│ ├── default
│ ├── manager
│ ├── prometheus
│ ├── rbac
│ ├── samples
│ └── webhook
├── controllers
│ ├── etcdcluster_controller.go
│ └── suite_test.go
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── main.go
14 directories, 10 files
而后依据咱们下面设计的 EtcdCluster 这个对象来编辑 Operator 的构造体即可,批改文件 www.sangpi.com 中的 EtcdClusterSpec 构造体:
// api/v1alpha1/etcdcluster_types.go
// EtcdClusterSpec defines the desired state of EtcdCluster
type EtcdClusterSpec struct {
// INSERT ADDITIONAL SPEC FIELDS – desired state of cluster
// Important: Run “make” to regenerate code after modifying this file
Size uint json:"size"
Image string json:"image"
}
要留神每次批改实现后须要执行 make 命令从新生成代码:
➜ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
go build -o bin/manager main.go
接下来咱们就能够去控制器的 Reconcile 函数中来实现咱们本人的业务逻辑了。
业务逻辑
首先在目录 controllers 上面创立一个 resource.go 文件,用来依据咱们定义的 EtcdCluster 对象生成对应的 StatefulSet 和 Headless SVC 对象。
// controllers/resource.go
package controllers
import (
“strconv”
“github.com/cnych/etcd-operator/api/v1alpha1”
appsv1 “k8s.io/api/apps/v1”
corev1 “k8s.io/api/core/v1”
“k8s.io/apimachinery/pkg/api/resource”
metav1 “k8s.io/apimachinery/pkg/apis/meta/v1”
)
var (
EtcdClusterLabelKey = “etcd.ydzs.io/cluster”
EtcdClusterCommonLabelKey = “app”
EtcdDataDirName = “datadir”
)
func MutateStatefulSet(cluster v1alpha1.EtcdCluster, sts appsv1.StatefulSet) {
sts.Labels = map[string]string{
EtcdClusterCommonLabelKey: “etcd”,
}
sts.Spec = appsv1.StatefulSetSpec{
Replicas: cluster.Spec.Size,
ServiceName: cluster.Name,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
EtcdClusterLabelKey: cluster.Name,
EtcdClusterCommonLabelKey: "etcd",
},
},
Spec: corev1.PodSpec{
Containers: newContainers(cluster),
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: EtcdDataDirName,},
Spec: corev1.PersistentVolumeClaimSpec{AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce,},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
}
}
func newContainers(cluster *v1alpha1.EtcdCluster) []corev1.Container {
return []corev1.Container{
corev1.Container{
Name: “etcd”,
Image: cluster.Spec.Image,
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
Name: "peer",
ContainerPort: 2380,
},
corev1.ContainerPort{
Name: "client",
ContainerPort: 2379,
},
},
Env: []corev1.EnvVar{
corev1.EnvVar{
Name: "INITIAL_CLUSTER_SIZE",
Value: strconv.Itoa(int(*cluster.Spec.Size)),
},
corev1.EnvVar{
Name: "SET_NAME",
Value: cluster.Name,
},
corev1.EnvVar{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.podIP",},
},
},
corev1.EnvVar{
Name: "MY_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace",},
},
},
},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: EtcdDataDirName,
MountPath: "/var/run/etcd",
},
},
Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n ETCDCTL_API=3\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\"| awk'{ print $1}'| awk -F \",\"'{print $1}'\n }\n\n initial_peers() {\n PEERS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n PEERS=\"${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380\"\n done\n echo ${PEERS}\n }\n\n # etcd-SET_ID\n SET_ID=${HOSTNAME##*-}\n\n # adding a new member to existing cluster (assuming all initial pods are available)\n if [\"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n # export ETCDCTL_ENDPOINTS=$(eps)\n # member already added?\n\n MEMBER_HASH=$(member_hash)\n if [-n \"${MEMBER_HASH}\" ]; then\n # the member hash exists but for some reason etcd failed\n # as the datadir has not be created, we can remove the member\n # and retrieve new hash\n echo \"Remove member ${MEMBER_HASH}\"\n etcdctl --endpoints=$(eps) member remove ${MEMBER_HASH}\n fi\n\n echo \"Adding new member\"\n\n etcdctl member --endpoints=$(eps) add ${HOSTNAME} --peer-urls=http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 | grep \"^ETCD_\" > /var/run/etcd/new_member_envs\n\n if [$? -ne 0]; then\n echo \"member add ${HOSTNAME} error.\"\n rm -f /var/run/etcd/new_member_envs\n exit 1\n fi\n\n echo \"==> Loading env vars of existing cluster...\"\n sed -ie \"s/^/export /\" /var/run/etcd/new_member_envs\n cat /var/run/etcd/new_member_envs\n . /var/run/etcd/new_member_envs\n\n exec etcd --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --data-dir /var/run/etcd/default.etcd\n fi\n\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n while true; do\n echo \"Waiting for ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local to come up\"\n ping -W 1 -c 1 ${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local > /dev/null && break\n sleep 1s\n done\n done\n\n echo \"join member ${HOSTNAME}\"\n # join member\n exec etcd --name ${HOSTNAME} \\\n --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2380 \\\n --listen-peer-urls http://${POD_IP}:2380 \\\n --listen-client-urls http://${POD_IP}:2379,http://127.0.0.1:2379 \\\n --advertise-client-urls http://${HOSTNAME}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379 \\\n --initial-cluster-token etcd-cluster-1 \\\n --data-dir /var/run/etcd/default.etcd \\\n --initial-cluster $(initial_peers) \\\n --initial-cluster-state new",
},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{Command: []string{
"/bin/sh", "-ec",
"HOSTNAME=$(hostname)\n\n member_hash() {\n etcdctl member list | grep -w \"$HOSTNAME\"| awk'{ print $1}'| awk -F \",\"'{print $1}'\n }\n\n eps() {\n EPS=\"\"\n for i in $(seq 0 $((${INITIAL_CLUSTER_SIZE} - 1))); do\n EPS=\"${EPS}${EPS:+,}http://${SET_NAME}-${i}.${SET_NAME}.${MY_NAMESPACE}.svc.cluster.local:2379\"\n done\n echo ${EPS}\n }\n\n export ETCDCTL_ENDPOINTS=$(eps)\n SET_ID=${HOSTNAME##*-}\n\n # Removing member from cluster\n if [\"${SET_ID}\" -ge ${INITIAL_CLUSTER_SIZE} ]; then\n echo \"Removing ${HOSTNAME} from etcd cluster\"\n etcdctl member remove $(member_hash)\n if [$? -eq 0]; then\n # Remove everything otherwise the cluster will no longer scale-up\n rm -rf /var/run/etcd/*\n fi\n fi",
},
},
},
},
},
}
}
func MutateHeadlessSvc(cluster v1alpha1.EtcdCluster, svc corev1.Service) {
svc.Labels = map[string]string{
EtcdClusterCommonLabelKey: “etcd”,
}
svc.Spec = corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
EtcdClusterLabelKey: cluster.Name,
},
Ports: []corev1.ServicePort{
corev1.ServicePort{
Name: "peer",
Port: 2380,
},
corev1.ServicePort{
Name: "client",
Port: 2379,
},
},
}
}
下面的代码尽管很多,但逻辑很简略,就是依据咱们的 EtcdCluter 去结构 StatefulSet 和 Headless SVC 资源对象,结构实现后,当咱们创立 EtcdCluster 的时候就能够在控制器的 Reconcile 函数中去进行逻辑解决了,这里咱们也能够应用后面示例中的代码来简略解决即可,代码如下所示:
// controllers/etcdcluster_controller.go
func (r *EtcdClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues(“etcdcluster”, req.NamespacedName)
// 首先咱们获取 EtcdCluster 实例
var etcdCluster etcdv1alpha1.EtcdCluster
if err := r.Client.Get(ctx, req.NamespacedName, &etcdCluster); err != nil {
// EtcdCluster was deleted,Ignore
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 失去 EtcdCluster 过后去创立对应的 StatefulSet 和 Service
// CreateOrUpdate
// (就是察看的以后状态和冀望的状态进行比照)
// 调谐,获取到以后的一个状态,而后和咱们冀望的状态进行比照是不是就能够
// CreateOrUpdate Service
var svc corev1.Service
svc.Name = etcdCluster.Name
svc.Namespace = etcdCluster.Namespace
or, err := ctrl.CreateOrUpdate(ctx, r, &svc, func() error {
// 调谐必须在这个函数中去实现
MutateHeadlessSvc(&etcdCluster, &svc)
return controllerutil.SetControllerReference(&etcdCluster, &svc, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info(“CreateOrUpdate”, “Service”, or)
// CreateOrUpdate StatefulSet
var sts appsv1.StatefulSet
sts.Name = etcdCluster.Name
sts.Namespace = etcdCluster.Namespace
or, err = ctrl.CreateOrUpdate(ctx, r, &sts, func() error {
// 调谐必须在这个函数中去实现
MutateStatefulSet(&etcdCluster, &sts)
return controllerutil.SetControllerReference(&etcdCluster, &sts, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info(“CreateOrUpdate”, “StatefulSet”, or)
return ctrl.Result{}, nil
}
这里咱们就是去对咱们的 EtcdCluster 对象进行调谐,而后去创立或者更新对应的 StatefulSet 或者 Headless SVC 对象,逻辑很简略,这样咱们就实现咱们的第一个版本的 etcd-operator。
调试
接下来咱们首先装置咱们的 CRD 对象,让咱们的 Kubernetes 零碎辨认咱们的 EtcdCluster 对象:
➜ make install
/Users/ych/devs/projects/go/bin/controller-gen “crd:trivialVersions=true” rbac:roleName=manager-role webhook paths=”./…” output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f –
customresourcedefinition.apiextensions.k8s.io/etcdclusters.etcd.ydzs.io configured
而后运行控制器:
➜ make run
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile=”hack/boilerplate.go.txt” paths=”./…”
go fmt ./…
go vet ./…
/Users/ych/devs/projects/go/bin/controller-gen “crd:trivialVersions=true” rbac:roleName=manager-role webhook paths=”./…” output:crd:artifacts:config=config/crd/bases
go run ./main.go
2020-11-20T17:44:48.222+0800 INFO controller-runtime.metrics metrics server is starting to listen {“addr”: “:8080”}
2020-11-20T17:44:48.223+0800 INFO setup starting manager
2020-11-20T17:44:48.223+0800 INFO controller-runtime.manager starting metrics server {“path”: “/metrics”}
2020-11-20T17:44:48.223+0800 INFO controller-runtime.controller Starting EventSource {“controller”: “etcdcluster”, “source”: “kind source: /, Kind=”}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting Controller {“controller”: “etcdcluster”}
2020-11-20T17:44:48.326+0800 INFO controller-runtime.controller Starting workers {“controller”: “etcdcluster”, “worker count”: 1}
控制器启动胜利后咱们就能够去创立咱们的 Etcd 集群了,将示例 CR 资源清单批改成上面的 YAML:
apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdCluster
metadata:
name: etcd-sample
spec:
size: 3
image: cnych/etcd:v3.4.13
另外开启一个终端创立下面的资源对象:
➜ kubectl apply -f config/samples/etcd_v1alpha1_etcdcluster.yaml
etcdcluster.etcd.ydzs.io/etcd-sample created
创立实现后咱们能够查看对应的 EtcdCluster 对象:
➜ kubectl get etcdcluster
NAME AGE
etcd-sample 2m35s
对应也会主动创立咱们的 StatefulSet 和 Service 资源清单:
➜ kubectl get all -l app=etcd
NAME READY STATUS RESTARTS AGE
pod/etcd-sample-0 1/1 Running 0 85s
pod/etcd-sample-1 1/1 Running 0 71s
pod/etcd-sample-2 1/1 Running 0 66s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/etcd-sample ClusterIP None <none> 2380/TCP,2379/TCP 86s
NAME READY AGE
statefulset.apps/etcd-sample 3/3 87s
到这里咱们的 Etcd 集群就启动起来了,咱们是不是只通过简略的几行代码就实现了一个 etcd-operator。
图片
当然还有很多细节没有解决,比方还没有增加对 StatefulSet 和 Headless SVC 的 RBAC 权限申明以及这两个资源对象变更的 Watch,这个后面咱们曾经解说过了,大家能够试着欠缺这块实现。不过这里咱们实现 etcd operator 的形式比拟讨巧,咱们须要提前去编写启动脚本,这个当然不算一个惯例的形式,然而咱们晓得了如果去启动 etcd 集群了,后续也就能够用 golang 代码去实现了,所以这只是一个一个过程的实现而已~