在处理数据时,往往会涉及到一个数据需要进行多次加工,这时候我们一般是通过 Pipeline 的方式进行处理。那么在 Knative Eventing 中是否也能支持对一个事件进行分步骤多次处理?这个还真有。从 0.7 版本开始,Knative Eventing 中提供了一个 Sequence 资源模型,可用于事件 Pipeline 处理。
Sequence 定义
首先我们看一下 Sequence Spec 定义:
apiVersion: messaging.knative.dev/v1alpha1
kind: Sequence
metadata:
name: test
spec:
channelTemplate:
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
steps:
- ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: test
reply:
kind: Broker
apiVersion: eventing.knative.dev/v1alpha1
name: test
Sequence Spec 包括 3 个部分:
- steps:在 step 中定义了按照顺序执行的服务,每个服务会对应创建 Subscription。
- channelTemplate:指定了使用具体的那个 Channel
- reply:(可选)定义了最后一个 step 返回结果的响应目标
在 Broker/Trigger 模型中使用 Sequence
我们将创建以下逻辑配置。创建一个 cronjobsource,向 Broker 提供事件,然后创建一个 filter,将这些事件连接到由 3 个 step 组成的 Sequence 中。然后,我们获取最后的 step 返回结果事件发送给给 Broker,并创建另一个 Trigger,该 Trigger 随后将显示事件结果。
对于这个例子,这里设置一个 Broker 程序、一个 InMemoryChannel 以及一个 Knative Service(用于显示事件结果)。示例使用 default namespace。
如果要使用不同类型的 Channel,则需要修改 sequence.spec.channeltemplate
以创建对应的 Channel 资源。
创建 Knative Service
首先创建 3 个 Knative Service,用于 Sequence 中服务处理
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: first
spec:
template:
spec:
containers:
- image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
env:
- name: STEP
value: "0"
---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: second
spec:
template:
spec:
containers:
- image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
env:
- name: STEP
value: "1"
---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: third
spec:
template:
spec:
containers:
- image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700
env:
- name: STEP
value: "2"
---
执行创建命令:
kubectl -n default create -f ./steps.yaml
创建 Sequence
创建 Sequence,这里依次顺序执行 [first->second->third] 这 3 个服务。将最终处理的结果发送到 broker-test 中。
apiVersion: messaging.knative.dev/v1alpha1
kind: Sequence
metadata:
name: sequence
spec:
channelTemplate:
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
steps:
- ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: first
- ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: second
- ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: third
reply:
kind: Broker
apiVersion: eventing.knative.dev/v1alpha1
name: broker-test
执行如下命令:
kubectl -n default create -f ./sequence.yaml
创建 CronJobSource 指向 Broker
这里将创建一个 cronjobsource,它将每 2 分钟发送一个{“message”: “Hello world!”} 信息到 broker-test 中。
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource
metadata:
name: cronjob-source
spec:
schedule: "*/2 * * * *"
data: '{"message":"Hello world!"}'
sink:
apiVersion: eventing.knative.dev/v1alpha1
kind: Broker
name: broker-test
执行命令如下:
kubectl -n default create -f ./cron-source.yaml
为 Sequence 创建 Trigger
创建订阅事件类型为:dev.knative.cronjob.event 的 Trigger, 用于 Sequence 进行消费处理。
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: sequence-trigger
spec:
filter:
sourceAndType:
type: dev.knative.cronjob.event
subscriber:
ref:
apiVersion: messaging.knative.dev/v1alpha1
kind: Sequence
name: sequence
执行如下命令:
kubectl -n default create -f ./trigger.yaml
创建结果订阅 Trigger
创建结果订阅 Trigger,订阅samples.http.mod3
的事件类型,对 sequence 执行的结果进行显示
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: sequence-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display
---
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: sequence-trigger
spec:
filter:
sourceAndType:
type: samples.http.mod3
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: sequence-display
---
结论
通过 Sequence 资源模型,我们很容易在 Knative Eventing 中实现事件处理的 Pipeline。对于需要多步骤处理的服务尤为适合。
本文作者:元毅
阅读原文
本文为云栖社区原创内容,未经允许不得转载。