乐趣区

关于golang:聊聊eventhorizon的EventBus

本文次要钻研一下 eventhorizon 的 EventBus

EventBus

eventhorizon/eventbus.go

type EventBus interface {
    EventHandler

    // AddHandler adds a handler for an event. Returns an error if either the
    // matcher or handler is nil, the handler is already added or there was some
    // other problem adding the handler (for networked handlers for example).
    AddHandler(context.Context, EventMatcher, EventHandler) error

    // Errors returns an error channel where async handling errors are sent.
    Errors() <-chan EventBusError

    // Wait wait for all handlers to be cancelled by their context.
    Wait()}

type EventHandler interface {
    // HandlerType is the type of the handler.
    HandlerType() EventHandlerType

    // HandleEvent handles an event.
    HandleEvent(context.Context, Event) error
}

type EventMatcher interface {
    // Match returns true if the matcher matches an event.
    Match(Event) bool
}

EventBus 接口内嵌了 EventHandler 接口,定义了 AddHandler、Errors、Wait 办法

EventBus

eventhorizon/eventbus/local/eventbus.go

type EventBus struct {
    group        *Group
    registered   map[eh.EventHandlerType]struct{}
    registeredMu sync.RWMutex
    errCh        chan eh.EventBusError
    wg           sync.WaitGroup
    codec        eh.EventCodec
}

// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {data, err := b.codec.MarshalEvent(ctx, event)
    if err != nil {return fmt.Errorf("could not marshal event: %w", err)
    }

    return b.group.publish(ctx, data)
}

EventBus 定义了 group、registered、registeredMu、errCh、wg、codec 属性;HandleEvent 办法先序列化 event,而后通过 group.publish 公布 event

Group

eventhorizon/eventbus/local/eventbus.go

type Group struct {bus   map[string]chan []byte
    busMu sync.RWMutex
}

// NewGroup creates a Group.
func NewGroup() *Group {
    return &Group{bus: map[string]chan []byte{},
    }
}

func (g *Group) publish(ctx context.Context, b []byte) error {g.busMu.RLock()
    defer g.busMu.RUnlock()

    for _, ch := range g.bus {
        // Marshal and unmarshal the context to both simulate only sending data
        // that would be sent over a network bus and also break any relationship
        // with the old context.
        select {
        case ch <- b:
        default:
            log.Printf("eventhorizon: publish queue full in local event bus")
        }
    }

    return nil
}

// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {defer b.wg.Done()

    for {
        select {
        case data := <-ch:
            // Artificial delay to simulate network.
            time.Sleep(10 * time.Millisecond)

            event, ctx, err := b.codec.UnmarshalEvent(ctx, data)
            if err != nil {err = fmt.Errorf("could not unmarshal event: %w", err)
                select {case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
                default:
                    log.Printf("eventhorizon: missed error in local event bus: %s", err)
                }
                return
            }

            // Ignore non-matching events.
            if !m.Match(event) {continue}

            // Handle the event if it did match.
            if err := h.HandleEvent(ctx, event); err != nil {err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
                select {case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
                default:
                    log.Printf("eventhorizon: missed error in local event bus: %s", err)
                }
            }
        case <-ctx.Done():
            return
        }
    }
}

Group 的 publish 办法遍历 bus 的 channel,通过 select 写入 event;handle 办法循环 select 读取 event,而后通过 m.Match(event) 判断是合乎,是的话执行 h.HandleEvent

小结

eventhorizon 的 EventBus 接口内嵌了 EventHandler 接口,定义了 AddHandler、Errors、Wait 办法。

doc

  • eventhorizon
退出移动版