概述

当咱们将容器的日志收集到音讯服务器之后,咱们该如何解决这些日志?部署一个专用的日志解决工作负载可能会消耗多余的老本,而当日志体量骤增、骤降时亦难以评估日志解决工作负载的待机数量。本文提供了一种基于 Serverless 的日志解决思路,能够在升高该工作链路老本的同时进步其灵活性。

咱们的大体设计是应用 Kafka 服务器作为日志的接收器,之后以输出 Kafka 服务器的日志作为事件,驱动 Serverless 工作负载对日志进行解决。据此的大抵步骤为:

  1. 搭建 Kafka 服务器作为 Kubernetes 集群的日志接收器
  2. 部署 OpenFunction 为日志解决工作负载提供 Serverless 能力
  3. 编写日志处理函数,抓取特定的日志生成告警音讯
  4. 配置 Notification Manager 将告警发送至 Slack

在这个场景中,咱们会利用到 OpenFunction 带来的 Serverless 能力。

OpenFunction 是 KubeSphere 社区开源的一个 FaaS(Serverless)我的项目,旨在让用户专一于他们的业务逻辑,而不用关怀底层运行环境和基础设施。该我的项目以后具备以下要害能力:

  • 反对通过 dockerfile 或 buildpacks 形式构建 OCI 镜像
  • 反对应用 Knative Serving 或 OpenFunctionAsync ( KEDA + Dapr ) 作为 runtime 运行 Serverless 工作负载
  • 自带事件驱动框架

应用 Kafka 作为日志接收器

首先,咱们为 KubeSphere 平台开启 logging 组件(能够参考 启用可插拔组件 获取更多信息)。而后咱们应用 strimzi-kafka-operator 搭建一个最小化的 Kafka 服务器。

  1. 在 default 命名空间中装置 strimzi-kafka-operator :

    helm repo add strimzi https://strimzi.io/charts/helm install kafka-operator -n default strimzi/strimzi-kafka-operator
  2. 运行以下命令在 default 命名空间中创立 Kafka 集群和 Kafka Topic,该命令所创立的 Kafka 和 Zookeeper 集群的存储类型为 ephemeral,应用 emptyDir 进行演示。

    留神,咱们此时创立了一个名为 “logs” 的 topic,后续会用到它
    cat <<EOF | kubectl apply -f -apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata:  name: kafka-logs-receiver  namespace: defaultspec:  kafka:    version: 2.8.0    replicas: 1    listeners:      - name: plain        port: 9092        type: internal        tls: false      - name: tls        port: 9093        type: internal        tls: true    config:      offsets.topic.replication.factor: 1      transaction.state.log.replication.factor: 1      transaction.state.log.min.isr: 1      log.message.format.version: '2.8'      inter.broker.protocol.version: "2.8"    storage:      type: ephemeral  zookeeper:    replicas: 1    storage:      type: ephemeral  entityOperator:    topicOperator: {}    userOperator: {}---apiVersion: kafka.strimzi.io/v1beta1kind: KafkaTopicmetadata:  name: logs  namespace: default  labels:    strimzi.io/cluster: kafka-logs-receiverspec:  partitions: 10  replicas: 3  config:    retention.ms: 7200000    segment.bytes: 1073741824EOF
  3. 运行以下命令查看 Pod 状态,并期待 Kafka 和 Zookeeper 运行并启动。

    $ kubectl get poNAME                                                   READY   STATUS        RESTARTS   AGEkafka-logs-receiver-entity-operator-568957ff84-nmtlw   3/3     Running       0          8m42skafka-logs-receiver-kafka-0                            1/1     Running       0          9m13skafka-logs-receiver-zookeeper-0                        1/1     Running       0          9m46sstrimzi-cluster-operator-687fdd6f77-cwmgm              1/1     Running       0          11m

    运行以下命令查看 Kafka 集群的元数据:

    # 启动一个工具 pod$ kubectl run utils --image=arunvelsriram/utils -i --tty --rm# 查看 Kafka 集群的元数据$ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092

咱们将这个 Kafka 服务器增加为日志接收器。

  1. admin 身份登录 KubeSphere 的 Web 控制台。点击左上角的平台治理,而后抉择集群治理

    如果您启用了多集群性能,您能够抉择一个集群。
  2. 集群治理页面,抉择集群设置下的日志收集
  3. 点击增加日志接收器并抉择 Kafka。输出 Kafka 代理地址和端口信息,而后点击确定持续。

  1. 运行以下命令验证 Kafka 集群是否能从 Fluent Bit 接管日志:

    # 启动一个工具 pod$ kubectl run utils --image=arunvelsriram/utils -i --tty --rm # 查看 logs topic 中的日志状况$ kafkacat -C -b kafka-logs-receiver-kafka-0.kafka-logs-receiver-kafka-brokers.default.svc:9092 -t logs

部署 OpenFunction

依照概述中的设计,咱们须要先部署 OpenFunction。OpenFunction 我的项目援用了很多第三方的我的项目,如 Knative、Tekton、ShipWright、Dapr、KEDA 等,手动装置较为繁琐,举荐应用 Prerequisites 文档 中的办法,一键部署 OpenFunction 的依赖组件。

其中 --with-shipwright 示意部署 shipwright 作为函数的构建驱动
--with-openFuncAsync 示意部署 OpenFuncAsync Runtime 作为函数的负载驱动
而当你的网络在拜访 Github 及 Google 受限时,能够加上 --poor-network 参数用于下载相干的组件
sh hack/deploy.sh --with-shipwright --with-openFuncAsync --poor-network

部署 OpenFunction:

此处抉择装置最新的稳固版本,你也能够应用开发版本,参考 Install 文档

为了能够失常应用 ShipWright ,咱们提供了默认的构建策略,能够应用以下命令设置该策略:

kubectl apply -f https://raw.githubusercontent.com/OpenFunction/OpenFunction/main/config/strategy/openfunction.yaml
kubectl apply -f https://github.com/OpenFunction/OpenFunction/releases/download/v0.3.0/bundle.yaml

编写日志处理函数

咱们以 创立并部署 WordPress 为例,搭建一个 WordPress 利用作为日志的生产者。该利用的工作负载所在的命名空间为 “demo-project”,Pod 名称为 “wordpress-v1-f54f697c5-hdn2z”。

当申请后果为 404 时,咱们收到的日志内容如下:

{"@timestamp":1629856477.226758,"log":"*.*.*.* - - [25/Aug/2021:01:54:36 +0000] \"GET /notfound HTTP/1.1\" 404 49923 \"-\" \"curl/7.58.0\"\n","time":"2021-08-25T01:54:37.226757612Z","kubernetes":{"pod_name":"wordpress-v1-f54f697c5-hdn2z","namespace_name":"demo-project","container_name":"container-nrdsp1","docker_id":"bb7b48e2883be0c05b22c04b1d1573729dd06223ae0b1676e33a4fac655958a5","container_image":"wordpress:4.8-apache"}}

咱们的需要是:当一个申请后果为 404 时,发送一个告警告诉给接收器(能够依据 配置 Slack 告诉 配置一个 Slack 告警接收器),并记录命名空间、Pod 名称、申请门路、申请办法等信息。依照这个需要,咱们编写一个简略的处理函数:

你能够从 OpenFunction Context Spec 处理解 openfunction-context 的应用办法,这是 OpenFunction 提供给用户编写函数的工具库
你能够通过 OpenFunction Samples 理解更多的 OpenFunction 函数案例
package logshandlerimport (    "encoding/json"    "fmt"    "log"    "regexp"    "time"    ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"    alert "github.com/prometheus/alertmanager/template")const (    HTTPCodeNotFound = "404"    Namespace        = "demo-project"    PodName          = "wordpress-v1-[A-Za-z0-9]{9}-[A-Za-z0-9]{5}"    AlertName        = "404 Request"    Severity         = "warning")// LogsHandler ctx 参数提供了用户函数在集群语境中的上下文句柄,如 ctx.SendTo 用于将数据发送至指定的目的地// LogsHandler in 参数用于将输出源中的数据(如有)以 bytes 的形式传递给函数func LogsHandler(ctx *ofctx.OpenFunctionContext, in []byte) int {    content := string(in)    // 这里咱们设置了三个正则表达式,别离用于匹配 HTTP 返回码、资源命名空间、资源 Pod 名称    matchHTTPCode, _ := regexp.MatchString(fmt.Sprintf(" %s ", HTTPCodeNotFound), content)    matchNamespace, _ := regexp.MatchString(fmt.Sprintf("namespace_name\":\"%s", Namespace), content)    matchPodName := regexp.MustCompile(fmt.Sprintf(`(%s)`, PodName)).FindStringSubmatch(content)    if matchHTTPCode && matchNamespace && matchPodName != nil {        log.Printf("Match log - Content: %s", content)        // 如果上述三个正则表达式同时命中,那么咱们须要提取日志内容中的一些信息,用于填充至告警信息中        // 这些信息为:404 申请的申请形式(HTTP Method)、申请门路(HTTP Path)以及 Pod 名称        match := regexp.MustCompile(`([A-Z]+) (/\S*) HTTP`).FindStringSubmatch(content)        if match == nil {            return 500        }        path := match[len(match)-1]        method := match[len(match)-2]        podName := matchPodName[len(matchPodName)-1]        // 收集到要害信息后,咱们应用 altermanager 的 Data 构造体组装告警信息        notify := &alert.Data{            Receiver:          "notification_manager",            Status:            "firing",            Alerts:            alert.Alerts{},            GroupLabels:       alert.KV{"alertname": AlertName, "namespace": Namespace},            CommonLabels:      alert.KV{"alertname": AlertName, "namespace": Namespace, "severity": Severity},            CommonAnnotations: alert.KV{},            ExternalURL:       "",        }        alt := alert.Alert{            Status: "firing",            Labels: alert.KV{                "alertname": AlertName,                "namespace": Namespace,                "severity":  Severity,                "pod":       podName,                "path":      path,                "method":    method,            },            Annotations:  alert.KV{},            StartsAt:     time.Now(),            EndsAt:       time.Time{},            GeneratorURL: "",            Fingerprint:  "",        }        notify.Alerts = append(notify.Alerts, alt)        notifyBytes, _ := json.Marshal(notify)        // 应用 ctx.SendTo 将内容发送给名为 "notification-manager" 的输入端(你能够在之后的函数配置 logs-handler-function.yaml 中找到它的定义)        if err := ctx.SendTo(notifyBytes, "notification-manager"); err != nil {            panic(err)        }        log.Printf("Send log to notification manager.")    }    return 200}

咱们将这个函数上传到代码仓库中,记录代码仓库的地址以及代码在仓库中的目录门路,在上面的创立函数步骤中咱们将应用到这两个值。

你能够在 OpenFunction Samples 中找到这个案例。

创立函数

接下来咱们将应用 OpenFunction 构建上述的函数。首先设置一个用于拜访镜像仓库的秘钥文件 push-secret(在应用代码构建出 OCI 镜像后,OpenFunction 会将该镜像上传到用户的镜像仓库中,用于后续的负载启动):

REGISTRY_SERVER=https://index.docker.io/v1/ REGISTRY_USER=<your username> REGISTRY_PASSWORD=<your password>kubectl create secret docker-registry push-secret \    --docker-server=$REGISTRY_SERVER \    --docker-username=$REGISTRY_USER \    --docker-password=$REGISTRY_PASSWORD

利用函数 logs-handler-function.yaml

函数定义中蕴含了对两个要害组件的应用:

Dapr 对应用程序屏蔽了简单的中间件,使得 logs-handler 能够非常容易地解决 Kafka 中的事件

KEDA 通过监控音讯服务器中的事件流量来驱动 logs-handler 函数的启动,并且依据 Kafka 中音讯的生产延时动静扩大 logs-handler 实例

apiVersion: core.openfunction.io/v1alpha1kind: Functionmetadata:  name: logs-handlerspec:  version: "v1.0.0"  # 这里定义了构建后的镜像的上传门路  image: openfunctiondev/logs-async-handler:v1  imageCredentials:    name: push-secret  build:    builder: openfunctiondev/go115-builder:v0.2.0    env:      FUNC_NAME: "LogsHandler"    # 这里定义了源代码的门路    # url 为下面提到的代码仓库地址    # sourceSubPath 为代码在仓库中的目录门路    srcRepo:      url: "https://github.com/OpenFunction/samples.git"      sourceSubPath: "functions/OpenFuncAsync/logs-handler-function/"  serving:    # OpenFuncAsync 是 OpenFunction 通过 KEDA+Dapr 实现的一种由事件驱动的异步函数运行时    runtime: "OpenFuncAsync"    openFuncAsync:      # 此处定义了函数的输出(kafka-receiver)和输入(notification-manager),与上面 components 中的定义对应关联      dapr:        inputs:          - name: kafka-receiver            type: bindings        outputs:          - name: notification-manager            type: bindings            params:              operation: "post"              type: "bindings"        annotations:          dapr.io/log-level: "debug"        # 这里实现了上述输出端和输入端的具体定义(即 Dapr Components)        components:          - name: kafka-receiver            type: bindings.kafka            version: v1            metadata:              - name: brokers                value: "kafka-logs-receiver-kafka-brokers:9092"              - name: authRequired                value: "false"              - name: publishTopic                value: "logs"              - name: topics                value: "logs"              - name: consumerGroup                value: "logs-handler"          # 此处为 KubeSphere 的 notification-manager 地址          - name: notification-manager            type: bindings.http            version: v1            metadata:              - name: url                value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts      keda:        scaledObject:          pollingInterval: 15          minReplicaCount: 0          maxReplicaCount: 10          cooldownPeriod: 30          # 这里定义了函数的触发器,即 Kafka 服务器的 “logs” topic          # 同时定义了音讯沉积阈值(此处为 10),即当音讯沉积量超过 10,logs-handler 实例个数就会主动扩大          triggers:            - type: kafka              metadata:                topic: logs                bootstrapServers: kafka-logs-receiver-kafka-brokers.default.svc.cluster.local:9092                consumerGroup: logs-handler                lagThreshold: "10"

后果演示

咱们先敞开 Kafka 日志接收器:在日志收集页面,点击进入 Kafka 日志接收器详情页面,而后点击更多操作并抉择更改状态,将其设置为敞开

停用后一段时间,咱们能够察看到 logs-handler 函数实例曾经膨胀到 0 了。

再将 Kafka 日志接收器激活,logs-handler 随之启动。

~# kubectl get po --watchNAME                                                     READY   STATUS        RESTARTS   AGEkafka-logs-receiver-entity-operator-568957ff84-tdrrx     3/3     Running       0          7m27skafka-logs-receiver-kafka-0                              1/1     Running       0          7m48skafka-logs-receiver-zookeeper-0                          1/1     Running       0          8m12slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          34sstrimzi-cluster-operator-687fdd6f77-kc8cv                1/1     Running       0          10mlogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          36slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          37slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          0slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          2slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   1/2     Running             0          4slogs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   2/2     Running             0          11s

接着咱们向 WordPress 利用一个不存在的门路发动申请:

curl http://<wp-svc-address>/notfound

能够看到 Slack 中曾经收到了这条音讯(与之比照的是,当咱们失常拜访该 WordPress 站点时, Slack 中并不会收到告警音讯):

进一步摸索

  • 同步函数的解决方案
> 为了能够失常应用 Knative Serving ,咱们须要设置其网关的负载均衡器地址。(你能够应用本机地址作为 workaround)>> 将上面的 "1.2.3.4" 替换为理论场景中的地址。>> ```shell> kubectl patch svc -n kourier-system kourier \> -p '{"spec": {"type": "LoadBalancer", "externalIPs": ["1.2.3.4"]}}'> > kubectl patch configmap/config-domain -n knative-serving \> --type merge --patch '{"data":{"1.2.3.4.sslip.io":""}}'> ```>

除了间接由 Kafka 服务器驱动函数运作(异步形式),OpenFunction 还反对应用自带的事件框架对接 Kafka 服务器,之后以 Sink 的形式驱动 Knative 函数运作。能够参考 OpenFunction Samples 中的案例。

在该计划中,同步函数的处理速度较之异步函数有所升高,当然咱们同样能够借助 KEDA 来触发 Knative Serving 的 concurrency 机制,但总体而言不足异步函数的便捷性。(后续的阶段中咱们会优化 OpenFunction 的事件框架来解决同步函数这方面的缺点)

由此可见,不同类型的 Serverless 函数有其善于的工作场景,如一个有序的控制流函数就须要由同步函数而非异步函数来解决。

综述

Serverless 带来了咱们所冀望的对业务场景疾速拆解重构的能力。

如本案例所示,OpenFunction 岂但以 Serverless 的形式晋升了日志解决、告警告诉链路的灵便度,还通过函数框架将通常对接 Kafka 时简单的配置步骤简化为语义明确的代码逻辑。同时,咱们也在一直演进 OpenFunction,将在之后版本中实现由本身的 Serverless 能力驱动本身的组件运作。

本文由博客一文多发平台 OpenWrite 公布!