序
本文次要钻研一下eventhorizon的EventBus
EventBus
eventhorizon/eventbus.go
type EventBus interface { EventHandler // AddHandler adds a handler for an event. Returns an error if either the // matcher or handler is nil, the handler is already added or there was some // other problem adding the handler (for networked handlers for example). AddHandler(context.Context, EventMatcher, EventHandler) error // Errors returns an error channel where async handling errors are sent. Errors() <-chan EventBusError // Wait wait for all handlers to be cancelled by their context. Wait()}type EventHandler interface { // HandlerType is the type of the handler. HandlerType() EventHandlerType // HandleEvent handles an event. HandleEvent(context.Context, Event) error}type EventMatcher interface { // Match returns true if the matcher matches an event. Match(Event) bool}
EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait办法
EventBus
eventhorizon/eventbus/local/eventbus.go
type EventBus struct { group *Group registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex errCh chan eh.EventBusError wg sync.WaitGroup codec eh.EventCodec}// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error { data, err := b.codec.MarshalEvent(ctx, event) if err != nil { return fmt.Errorf("could not marshal event: %w", err) } return b.group.publish(ctx, data)}
EventBus定义了group、registered、registeredMu、errCh、wg、codec属性;HandleEvent办法先序列化event,而后通过group.publish公布event
Group
eventhorizon/eventbus/local/eventbus.go
type Group struct { bus map[string]chan []byte busMu sync.RWMutex}// NewGroup creates a Group.func NewGroup() *Group { return &Group{ bus: map[string]chan []byte{}, }}func (g *Group) publish(ctx context.Context, b []byte) error { g.busMu.RLock() defer g.busMu.RUnlock() for _, ch := range g.bus { // Marshal and unmarshal the context to both simulate only sending data // that would be sent over a network bus and also break any relationship // with the old context. select { case ch <- b: default: log.Printf("eventhorizon: publish queue full in local event bus") } } return nil}// Handles all events coming in on the channel.func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) { defer b.wg.Done() for { select { case data := <-ch: // Artificial delay to simulate network. time.Sleep(10 * time.Millisecond) event, ctx, err := b.codec.UnmarshalEvent(ctx, data) if err != nil { err = fmt.Errorf("could not unmarshal event: %w", err) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in local event bus: %s", err) } return } // Ignore non-matching events. if !m.Match(event) { continue } // Handle the event if it did match. if err := h.HandleEvent(ctx, event); err != nil { err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error()) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in local event bus: %s", err) } } case <-ctx.Done(): return } }}
Group的publish办法遍历bus的channel,通过select写入event;handle办法循环select读取event,而后通过m.Match(event)判断是合乎,是的话执行h.HandleEvent
小结
eventhorizon的EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait办法。
doc
- eventhorizon