本文次要钻研一下eventhorizon的EventBus

EventBus

eventhorizon/eventbus.go

type EventBus interface {    EventHandler    // AddHandler adds a handler for an event. Returns an error if either the    // matcher or handler is nil, the handler is already added or there was some    // other problem adding the handler (for networked handlers for example).    AddHandler(context.Context, EventMatcher, EventHandler) error    // Errors returns an error channel where async handling errors are sent.    Errors() <-chan EventBusError    // Wait wait for all handlers to be cancelled by their context.    Wait()}type EventHandler interface {    // HandlerType is the type of the handler.    HandlerType() EventHandlerType    // HandleEvent handles an event.    HandleEvent(context.Context, Event) error}type EventMatcher interface {    // Match returns true if the matcher matches an event.    Match(Event) bool}
EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait办法

EventBus

eventhorizon/eventbus/local/eventbus.go

type EventBus struct {    group        *Group    registered   map[eh.EventHandlerType]struct{}    registeredMu sync.RWMutex    errCh        chan eh.EventBusError    wg           sync.WaitGroup    codec        eh.EventCodec}// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {    data, err := b.codec.MarshalEvent(ctx, event)    if err != nil {        return fmt.Errorf("could not marshal event: %w", err)    }    return b.group.publish(ctx, data)}
EventBus定义了group、registered、registeredMu、errCh、wg、codec属性;HandleEvent办法先序列化event,而后通过group.publish公布event

Group

eventhorizon/eventbus/local/eventbus.go

type Group struct {    bus   map[string]chan []byte    busMu sync.RWMutex}// NewGroup creates a Group.func NewGroup() *Group {    return &Group{        bus: map[string]chan []byte{},    }}func (g *Group) publish(ctx context.Context, b []byte) error {    g.busMu.RLock()    defer g.busMu.RUnlock()    for _, ch := range g.bus {        // Marshal and unmarshal the context to both simulate only sending data        // that would be sent over a network bus and also break any relationship        // with the old context.        select {        case ch <- b:        default:            log.Printf("eventhorizon: publish queue full in local event bus")        }    }    return nil}// Handles all events coming in on the channel.func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {    defer b.wg.Done()    for {        select {        case data := <-ch:            // Artificial delay to simulate network.            time.Sleep(10 * time.Millisecond)            event, ctx, err := b.codec.UnmarshalEvent(ctx, data)            if err != nil {                err = fmt.Errorf("could not unmarshal event: %w", err)                select {                case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:                default:                    log.Printf("eventhorizon: missed error in local event bus: %s", err)                }                return            }            // Ignore non-matching events.            if !m.Match(event) {                continue            }            // Handle the event if it did match.            if err := h.HandleEvent(ctx, event); err != nil {                err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())                select {                case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:                default:                    log.Printf("eventhorizon: missed error in local event bus: %s", err)                }            }        case <-ctx.Done():            return        }    }}
Group的publish办法遍历bus的channel,通过select写入event;handle办法循环select读取event,而后通过m.Match(event)判断是合乎,是的话执行h.HandleEvent

小结

eventhorizon的EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait办法。

doc

  • eventhorizon