In-memory Channel 是当前 Knative Eventing 中默认的 Channel, 也是一般刚接触 Knative Eventing 首先了解到的 Channel。本文通过分析 In-memory Channel 来进一步了解 Knative Eventing 中 Broker/Trigger 事件处理机制。
事件处理概览
我们先整体看一下 Knative Eventing 工作机制示意图:
通过 namespace 创建默认 Broker 如果不指定 Channel,会使用默认的 Inmemory Channel。
下面我们从数据平面开始分析 Event 事件是如何通过 In-memory Channel 分发到 Knative Service
Ingress
Ingress 是事件进入 Channel 前的第一级过滤,但目前的功能仅仅是接收事件然后转发到 Channel。过滤功能处理 TODO 状态。
func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {tctx := cloudevents.HTTPTransportContextFrom(ctx)
if tctx.Method != http.MethodPost {
resp.Status = http.StatusMethodNotAllowed
return nil
}
// tctx.URI is actually the path...
if tctx.URI != "/" {
resp.Status = http.StatusNotFound
return nil
}
ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))
defer func() {stats.Record(ctx, MeasureEventsTotal.M(1))
}()
send := h.decrementTTL(&event)
if !send {ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL"))
return nil
}
// TODO Filter.
ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))
return h.sendEvent(ctx, tctx, event)
}
In-memory Channel
Broker 字面意思为代理者,那么它代理的是谁呢?是 Channel。为什么要代理 Channel 呢,而不直接发给访问 Channel。这个其实涉及到 Broker/Trigger 设计的初衷:对事件过滤处理。我们知道 Channel(消息通道)负责事件传递,Subscription(订阅)负责订阅事件,通常这二者的模型如下:
这里就涉及到消息队列和订阅分发的实现。那么在 In-memory Channel 中如何实现的呢?
其实 In-memory 的核心处理在 Fanout Handler 中,它负责将接收到的事件分发到不同的 Subscription。
In-memory Channel 处理示意图:
事件接收并分发核心代码如下:
func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
if f.config.AsyncHandler {go func() {// Any returned error is already logged in f.dispatch().
_ = f.dispatch(m)
}()
return nil
}
return f.dispatch(m)
}
}
当前分发机制默认是异步机制(可通过 AsyncHandler 参数控制分发机制)。
消息分发机制:
// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out
// requests return successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {errorCh := make(chan error, len(f.config.Subscriptions))
for _, sub := range f.config.Subscriptions {go func(s eventingduck.SubscriberSpec) {errorCh <- f.makeFanoutRequest(*msg, s)
}(sub)
}
for range f.config.Subscriptions {
select {
case err := <-errorCh:
if err != nil {f.logger.Error("Fanout had an error", zap.Error(err))
return err
}
case <-time.After(f.timeout):
f.logger.Error("Fanout timed out")
return errors.New("fanout timed out")
}
}
// All Subscriptions returned err = nil.
return nil
}
通过这里的代码,我们可以看到分发处理超时机制。默认为 60s。也就是说如果分发的请求响应超过 60s,那么 In-memory 会报错:Fanout timed out。
Filter
一般的消息分发会将消息发送给订阅的服务,但在 Broker/Trigger 模型中需要对事件进行过滤处理,这个处理的地方就是在 Filter 中。
- 根据请求获取 Trigger 信息。Filter 中会根据请求的信息拿到 Trigger 名称,然后通过获取 Trigger 对应的资源信息拿到过滤规则
- 根据 Trigger 过滤规则进行事件的过滤处理
- 最后将满足过滤规则的分发到 Kservice
其中过滤规则处理代码如下:
func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {
if ts.Filter == nil || ts.Filter.SourceAndType == nil {r.logger.Error("No filter specified")
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))
return false
}
// Record event count and filtering time
startTS := time.Now()
defer func() {filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)
stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))
}()
filterType := ts.Filter.SourceAndType.Type
if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() {r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type()))
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))
return false
}
filterSource := ts.Filter.SourceAndType.Source
s := event.Context.AsV01().Source
actualSource := s.String()
if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource {r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource))
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))
return false
}
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass"))
return true
}
当前的机制是所有的订阅事件都会通过 Filter 集中进行事件过滤,如果一个 Broker 有大量的订阅 Trigger,是否会给 Filter 带来性能上的压力?这个在实际场景 Broker/Trigger 的运用中需要考虑到这个问题。
结论
作为内置的默认 Channel 实现,In-memory 可以说很好的完成了事件接收并转发的使命,并且 Knative Eventing 在接下来的迭代中会支持部署时指定设置默认的 Channel。
本文作者:元毅
阅读原文
本文为云栖社区原创内容,未经允许不得转载。