关于kafka:如何借助Kafka持久化存储K8S事件数据

51次阅读

共计 4434 个字符,预计需要花费 12 分钟才能阅读完成。

大家应该对 Kubernetes Events 并不生疏,特地是当你应用 kubectl describe 命令或 Event API 资源来理解集群中的故障时。
 

$ kubectl get events

15m         Warning   FailedCreate                                                                                                      replicaset/ml-pipeline-visualizationserver-865c7865bc    

Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found

 

只管这些信息非常有用,但它只是长期的,保留工夫最长为 30 天。如果出于审计或是故障诊断等目标,你可能想要把这些信息保留得更久,比方保留在像 Kafka 这样更长久、高效的存储中。而后你能够借助其余工具(如 Argo Events)或本人的应用程序订阅 Kafka 主题来对某些事件做出响应。
 

构建 K8s 事件处理链路

咱们将构建一整套 Kubernetes 事件处理链路,其次要形成为:

  • Eventrouter,开源的 Kubernetes event 处理器,它能够将所有集群事件整合汇总到某个 Kafka 主题中。
  • Strimzi Operator,在 Kubernetes 中轻松治理 Kafka broker。
  • 自定义 Go 二进制文件以将事件散发到相应的 Kafka 主题中。
     

为什么要把事件散发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相干的 Kubernetes 资产,那么在应用这些资产之前你当然心愿将相干事件隔离开。
 

本示例中所有的配置、源代码和具体设置批示都曾经放在以下代码仓库中:
[](https://github.com/esys/kube-events-kafka)
 

 

创立 Kafka broker 和主题

我抉择应用 Strimzi([strimzi.io/]())将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创立和更新 Kafka broker 和主题的。你能够在官网文档中找到如何装置该 Operator 的具体阐明:
[](https://strimzi.io/docs/operators/latest/overview.html)
 

首先,创立一个新的 Kafka 集群:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: kube-events
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 3
      log.message.format.version: "2.6"
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
    - name: plain
      port: 9092
      tls: false
      type: internal
    - name: tls
      port: 9093
      tls: true
      type: internal
    replicas: 3
    storage:
      type: jbod
      volumes:
      - deleteClaim: false
        id: 0
        size: 10Gi
        type: persistent-claim
    version: 2.6.0
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: false
      size: 10Gi
      type: persistent-claim

 

而后创立 Kafka 主题来接管咱们的事件:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cluster-events
spec:
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
  partitions: 1
  replicas: 1

 

设置 EventRouter

在本教程中应用 kubectl apply 命令即可,咱们须要编辑 router 的配置,以指明咱们的 Kafka 端点和要应用的主题:

apiVersion: v1
data:
  config.json: |-
    {
      "sink": "kafka",
      "kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",
      "kafkaTopic": "cluster-events"
    }
kind: ConfigMap
metadata:
  name: eventrouter-cm

 

验证设置是否失常工作

咱们的 cluster-events Kafka 的主题当初应该收到所有的事件。最简略的办法是在主题上运行一个 consumer 来测验是否如此。为了不便期间,咱们应用咱们的一个 Kafka broker pods,它曾经有了所有必要的工具,你能够看到事件流:

kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \
  --bootstrap-server kube-events-kafka-bootstrap:9092 \
  --topic kube-events \
  --from-beginning
{"verb":"ADDED","event":{...}}
{"verb":"ADDED","event":{...}}
...

 

编写 Golang 消费者

当初咱们想将咱们的 Kubernetes 事件根据其所在的命名空间散发到多个主题中。咱们将编写一个 Golang 消费者和生产者来实现这一逻辑:

  • 消费者局部在 cluster-events 主题上监听传入的集群事件
  • 生产者局部写入与事件的命名空间相匹配的 Kafka 主题中
     

如果为 Kafka 配置了适当的选项(默认状况),就不须要顺便创立新的主题,因为 Kafka 会默认为你创立主题。这是 Kafka 客户端 API 的一个十分酷的性能。

p, err := kafka.NewProducer(cfg.Endpoint)
if err != nil {sugar.Fatal("cannot create producer")
}
defer p.Close()

c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)
if err != nil {sugar.Fatal("cannot create consumer")
}
defer c.Close()

run := true
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
        sig := <-sigs
        sugar.Infof("signal %s received, terminating", sig)
        run = false
}()

var wg sync.WaitGroup
go func() {wg.Add(1)
        for run {data, err := c.Read()
                if err != nil {sugar.Errorf("read event error: %v", err)
                        time.Sleep(5 * time.Second)
                        continue
                }
                if data == nil {continue}
                msg, err := event.CreateDestinationMessage(data)
                if err != nil {sugar.Errorf("cannot create destination event: %v", err)
                }
                p.Write(msg.Topic, msg.Message)
        }
        sugar.Info("worker thread done")
        wg.Done()}()

wg.Wait()

 

残缺代码在此处:
[](https://github.com/esys/kube-events-kafka/blob/master/events-fanout/cmd/main.go)
 

当然还有更高性能的抉择,这取决于预计的事件量和扇出(fanout)逻辑的复杂性。对于一个更弱小的实现,应用 Spark Structured Streaming 的消费者将是一个很好的抉择。
 

部署消费者

构建并将二进制文件推送到 Docker 镜像之后,咱们将它封装为 Kubernetes deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: events-fanout
  name: events-fanout
spec:
  replicas: 1
  selector:
    matchLabels:
      app: events-fanout
  template:
    metadata:
      labels:
        app: events-fanout
    spec:
      containers:
        - image: emmsys/events-fanout:latest
          name: events-fanout
          command: ["./events-fanout"]
          args:
            - -logLevel=info
          env:
            - name: ENDPOINT
              value: kube-events-kafka-bootstrap:9092
            - name: TOPIC
              value: cluster-events

 

查看指标主题是否创立

当初,新的主题曾经创立实现:

kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name

kafkatopic.kafka.strimzi.io/cluster-events
kafkatopic.kafka.strimzi.io/kube-system
kafkatopic.kafka.strimzi.io/default
kafkatopic.kafka.strimzi.io/kafka
kafkatopic.kafka.strimzi.io/kube-events

 

你会发现你的事件依据其命名空间参差地存储在这些主题中。
 

总结

拜访 Kubernetes 历史事件日志能够使你对 Kubernetes 零碎的状态有了更好的理解,但这单靠 kubectl 比拟难做到。更重要的是,它能够通过对事件做出反馈来实现集群或利用运维自动化,并以此来构建牢靠、反馈灵活的软件。
 

原文链接:
https://hackernoon.com/monitor-your-kubernetes-cluster-events…

正文完
 0