本文次要钻研一下go.cqrs的DomainRepository

DomainRepository

// DomainRepository is the interface that all domain repositories should implement.type DomainRepository interface {    //Loads an aggregate of the given type and ID    Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error)    //Saves the aggregate.    Save(aggregate AggregateRoot, expectedVersion *int) error}
DomainRepository定义了Load、Save办法

GetEventStoreCommonDomainRepo

// GetEventStoreCommonDomainRepo is an implementation of the DomainRepository// that uses GetEventStore for persistencetype GetEventStoreCommonDomainRepo struct {    eventStore         *goes.Client    eventBus           EventBus    streamNameDelegate StreamNamer    aggregateFactory   AggregateFactory    eventFactory       EventFactory}// Load will load all events from a stream and apply those events to an aggregate// of the type specified.//// The aggregate type and id will be passed to the configured StreamNamer to// get the stream name.func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error) {    if r.aggregateFactory == nil {        return nil, fmt.Errorf("The common domain repository has no Aggregate Factory.")    }    if r.streamNameDelegate == nil {        return nil, fmt.Errorf("The common domain repository has no stream name delegate.")    }    if r.eventFactory == nil {        return nil, fmt.Errorf("The common domain has no Event Factory.")    }    aggregate := r.aggregateFactory.GetAggregate(aggregateType, id)    if aggregate == nil {        return nil, fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s", aggregateType)    }    streamName, err := r.streamNameDelegate.GetStreamName(aggregateType, id)    if err != nil {        return nil, err    }    stream := r.eventStore.NewStreamReader(streamName)    for stream.Next() {        switch err := stream.Err().(type) {        case nil:            break        case *url.Error, *goes.ErrTemporarilyUnavailable:            return nil, &ErrRepositoryUnavailable{}        case *goes.ErrNoMoreEvents:            return aggregate, nil        case *goes.ErrUnauthorized:            return nil, &ErrUnauthorized{}        case *goes.ErrNotFound:            return nil, &ErrAggregateNotFound{AggregateType: aggregateType, AggregateID: id}        default:            return nil, &ErrUnexpected{Err: err}        }        event := r.eventFactory.GetEvent(stream.EventResponse().Event.EventType)        //TODO: No test for meta        meta := make(map[string]string)        stream.Scan(event, &meta)        if stream.Err() != nil {            return nil, stream.Err()        }        em := NewEventMessage(id, event, Int(stream.EventResponse().Event.EventNumber))        for k, v := range meta {            em.SetHeader(k, v)        }        aggregate.Apply(em, false)        aggregate.IncrementVersion()    }    return aggregate, nil}// Save persists an aggregatefunc (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error {    if r.streamNameDelegate == nil {        return fmt.Errorf("The common domain repository has no stream name delagate.")    }    resultEvents := aggregate.GetChanges()    streamName, err := r.streamNameDelegate.GetStreamName(typeOf(aggregate), aggregate.AggregateID())    if err != nil {        return err    }    if len(resultEvents) > 0 {        evs := make([]*goes.Event, len(resultEvents))        for k, v := range resultEvents {            //TODO: There is no test for this code            v.SetHeader("AggregateID", aggregate.AggregateID())            evs[k] = goes.NewEvent("", v.EventType(), v.Event(), v.GetHeaders())        }        streamWriter := r.eventStore.NewStreamWriter(streamName)        err := streamWriter.Append(expectedVersion, evs...)        switch e := err.(type) {        case nil:            break        case *goes.ErrConcurrencyViolation:            return &ErrConcurrencyViolation{Aggregate: aggregate, ExpectedVersion: expectedVersion, StreamName: streamName}        case *goes.ErrUnauthorized:            return &ErrUnauthorized{}        case *goes.ErrTemporarilyUnavailable:            return &ErrRepositoryUnavailable{}        default:            return &ErrUnexpected{Err: e}        }    }    aggregate.ClearChanges()    for k, v := range resultEvents {        if expectedVersion == nil {            r.eventBus.PublishEvent(v)        } else {            em := NewEventMessage(v.AggregateID(), v.Event(), Int(*expectedVersion+k+1))            r.eventBus.PublishEvent(em)        }    }    return nil}
GetEventStoreCommonDomainRepo定义了eventStore、eventBus、streamNameDelegate、aggregateFactory、eventFactory属性,其Load办法先通过r.aggregateFactory.GetAggregate获取aggregate,再通过r.streamNameDelegate.GetStreamName(aggregateType, id)获取streamName,而后通过r.eventStore.NewStreamReader去遍历event,挨个执行aggregate.Apply(em, false)及aggregate.IncrementVersion();其Save办法先通过aggregate.GetChanges()获取resultEvents,再遍历resultEvents结构goes.Event,之后通过streamWriter.Append写入,而后执行aggregate.ClearChanges(),最初执行r.eventBus.PublishEvent

小结

go.cqrs的DomainRepository定义了Load、Save办法;GetEventStoreCommonDomainRepo实现了DomainRepository接口,其Load办法次要是读取event,而后挨个执行aggregate.Apply;其Save办法次要是将aggregate.GetChanges()转换为event,而后通过streamWriter.Append写入,而后执行aggregate.ClearChanges(),最初执行r.eventBus.PublishEvent。

doc

  • go.cqrs