序
本文次要钻研一下 eventhorizon 的 EventBus
EventBus
eventhorizon/eventbus.go
type EventBus interface {
EventHandler
// AddHandler adds a handler for an event. Returns an error if either the
// matcher or handler is nil, the handler is already added or there was some
// other problem adding the handler (for networked handlers for example).
AddHandler(context.Context, EventMatcher, EventHandler) error
// Errors returns an error channel where async handling errors are sent.
Errors() <-chan EventBusError
// Wait wait for all handlers to be cancelled by their context.
Wait()}
type EventHandler interface {
// HandlerType is the type of the handler.
HandlerType() EventHandlerType
// HandleEvent handles an event.
HandleEvent(context.Context, Event) error
}
type EventMatcher interface {
// Match returns true if the matcher matches an event.
Match(Event) bool
}
EventBus 接口内嵌了 EventHandler 接口,定义了 AddHandler、Errors、Wait 办法
EventBus
eventhorizon/eventbus/local/eventbus.go
type EventBus struct {
group *Group
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
wg sync.WaitGroup
codec eh.EventCodec
}
// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {data, err := b.codec.MarshalEvent(ctx, event)
if err != nil {return fmt.Errorf("could not marshal event: %w", err)
}
return b.group.publish(ctx, data)
}
EventBus 定义了 group、registered、registeredMu、errCh、wg、codec 属性;HandleEvent 办法先序列化 event,而后通过 group.publish 公布 event
Group
eventhorizon/eventbus/local/eventbus.go
type Group struct {bus map[string]chan []byte
busMu sync.RWMutex
}
// NewGroup creates a Group.
func NewGroup() *Group {
return &Group{bus: map[string]chan []byte{},
}
}
func (g *Group) publish(ctx context.Context, b []byte) error {g.busMu.RLock()
defer g.busMu.RUnlock()
for _, ch := range g.bus {
// Marshal and unmarshal the context to both simulate only sending data
// that would be sent over a network bus and also break any relationship
// with the old context.
select {
case ch <- b:
default:
log.Printf("eventhorizon: publish queue full in local event bus")
}
}
return nil
}
// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {defer b.wg.Done()
for {
select {
case data := <-ch:
// Artificial delay to simulate network.
time.Sleep(10 * time.Millisecond)
event, ctx, err := b.codec.UnmarshalEvent(ctx, data)
if err != nil {err = fmt.Errorf("could not unmarshal event: %w", err)
select {case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
return
}
// Ignore non-matching events.
if !m.Match(event) {continue}
// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
select {case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
}
case <-ctx.Done():
return
}
}
}
Group 的 publish 办法遍历 bus 的 channel,通过 select 写入 event;handle 办法循环 select 读取 event,而后通过 m.Match(event) 判断是合乎,是的话执行 h.HandleEvent
小结
eventhorizon 的 EventBus 接口内嵌了 EventHandler 接口,定义了 AddHandler、Errors、Wait 办法。
doc
- eventhorizon