本文次要钻研一下eventhorizon的Aggregate

Aggregate

eventhorizon/aggregate.go

type Aggregate interface {    // Entity provides the ID of the aggregate.    Entity    // AggregateType returns the type name of the aggregate.    // AggregateType() string    AggregateType() AggregateType    // CommandHandler is used to handle commands.    CommandHandler}type Entity interface {    // EntityID returns the ID of the entity.    EntityID() uuid.UUID}type CommandHandler interface {    HandleCommand(context.Context, Command) error}// AggregateType is the type of an aggregate.type AggregateType string// String returns the string representation of an aggregate type.func (at AggregateType) String() string {    return string(at)}
Aggregate接口内嵌了Entity及CommandHandler,定义了AggregateType办法

AggregateStore

eventhorizon/aggregate.go

// AggregateStore is responsible for loading and saving aggregates.type AggregateStore interface {    // Load loads the most recent version of an aggregate with a type and id.    Load(context.Context, AggregateType, uuid.UUID) (Aggregate, error)    // Save saves the uncommittend events for an aggregate.    Save(context.Context, Aggregate) error}
AggregateStore定义了Load、Save办法

AggregateStore

eventhorizon/aggregatestore/model/aggregatestore.go

// AggregateStore is an aggregate store that uses a read write repo for// loading and saving aggregates.type AggregateStore struct {    repo         eh.ReadWriteRepo    eventHandler eh.EventHandler}var (    // ErrInvalidRepo is when a dispatcher is created with a nil repo.    ErrInvalidRepo = errors.New("invalid repo")    // ErrInvalidAggregate occurs when a loaded aggregate is not an aggregate.    ErrInvalidAggregate = errors.New("invalid aggregate"))// NewAggregateStore creates an aggregate store with a read write repo and an// event handler that can handle any resulting events (for example by publishing// them on an event bus).func NewAggregateStore(repo eh.ReadWriteRepo, eventHandler eh.EventHandler) (*AggregateStore, error) {    if repo == nil {        return nil, ErrInvalidRepo    }    d := &AggregateStore{        repo:         repo,        eventHandler: eventHandler,    }    return d, nil}// Load implements the Load method of the eventhorizon.AggregateStore interface.func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateType, id uuid.UUID) (eh.Aggregate, error) {    item, err := r.repo.Find(ctx, id)    if errors.Is(err, eh.ErrEntityNotFound) {        // Create the aggregate.        if item, err = eh.CreateAggregate(aggregateType, id); err != nil {            return nil, err        }    } else if err != nil {        return nil, err    }    aggregate, ok := item.(eh.Aggregate)    if !ok {        return nil, ErrInvalidAggregate    }    return aggregate, nil}// Save implements the Save method of the eventhorizon.AggregateStore interface.func (r *AggregateStore) Save(ctx context.Context, aggregate eh.Aggregate) error {    if err := r.repo.Save(ctx, aggregate); err != nil {        return err    }    // Handle any events optionally provided by the aggregate.    if a, ok := aggregate.(eh.EventSource); ok && r.eventHandler != nil {        for _, e := range a.Events() {            if err := r.eventHandler.HandleEvent(ctx, e); err != nil {                return err            }        }    }    return nil}
AggregateStore定义了eh.ReadWriteRepo、eh.EventHandler属性;其Load、Save办法均委托给了eh.ReadWriteRepo

ReadWriteRepo

eventhorizon/repo.go

type ReadWriteRepo interface {    ReadRepo    WriteRepo}type ReadRepo interface {    // Parent returns the parent read repository, if there is one.    // Useful for iterating a wrapped set of repositories to get a specific one.    Parent() ReadRepo    // Find returns an entity for an ID.    Find(context.Context, uuid.UUID) (Entity, error)    // FindAll returns all entities in the repository.    FindAll(context.Context) ([]Entity, error)}type WriteRepo interface {    // Save saves a entity in the storage.    Save(context.Context, Entity) error    // Remove removes a entity by ID from the storage.    Remove(context.Context, uuid.UUID) error}
ReadWriteRepo接口组合了ReadRepo、WriteRepo接口;其中ReadRepo接口定义了Parent、Find、FindAll办法;WriteRepo接口定义了Save、Remove办法

小结

eventhorizon的Aggregate接口内嵌了Entity及CommandHandler,定义了AggregateType办法;AggregateStore定义了Load、Save办法;AggregateStore的实现类定义了eh.ReadWriteRepo、eh.EventHandler属性,其Load、Save办法均委托给了eh.ReadWriteRepo。

doc

  • eventhorizon