共计 4266 个字符,预计需要花费 11 分钟才能阅读完成。
序
本文次要钻研一下 go.cqrs 的 DomainRepository
DomainRepository
// DomainRepository is the interface that all domain repositories should implement.
type DomainRepository interface {
//Loads an aggregate of the given type and ID
Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error)
//Saves the aggregate.
Save(aggregate AggregateRoot, expectedVersion *int) error
}
DomainRepository 定义了 Load、Save 办法
GetEventStoreCommonDomainRepo
// GetEventStoreCommonDomainRepo is an implementation of the DomainRepository
// that uses GetEventStore for persistence
type GetEventStoreCommonDomainRepo struct {
eventStore *goes.Client
eventBus EventBus
streamNameDelegate StreamNamer
aggregateFactory AggregateFactory
eventFactory EventFactory
}
// Load will load all events from a stream and apply those events to an aggregate
// of the type specified.
//
// The aggregate type and id will be passed to the configured StreamNamer to
// get the stream name.
func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error) {
if r.aggregateFactory == nil {return nil, fmt.Errorf("The common domain repository has no Aggregate Factory.")
}
if r.streamNameDelegate == nil {return nil, fmt.Errorf("The common domain repository has no stream name delegate.")
}
if r.eventFactory == nil {return nil, fmt.Errorf("The common domain has no Event Factory.")
}
aggregate := r.aggregateFactory.GetAggregate(aggregateType, id)
if aggregate == nil {return nil, fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s", aggregateType)
}
streamName, err := r.streamNameDelegate.GetStreamName(aggregateType, id)
if err != nil {return nil, err}
stream := r.eventStore.NewStreamReader(streamName)
for stream.Next() {switch err := stream.Err().(type) {
case nil:
break
case *url.Error, *goes.ErrTemporarilyUnavailable:
return nil, &ErrRepositoryUnavailable{}
case *goes.ErrNoMoreEvents:
return aggregate, nil
case *goes.ErrUnauthorized:
return nil, &ErrUnauthorized{}
case *goes.ErrNotFound:
return nil, &ErrAggregateNotFound{AggregateType: aggregateType, AggregateID: id}
default:
return nil, &ErrUnexpected{Err: err}
}
event := r.eventFactory.GetEvent(stream.EventResponse().Event.EventType)
//TODO: No test for meta
meta := make(map[string]string)
stream.Scan(event, &meta)
if stream.Err() != nil {return nil, stream.Err()
}
em := NewEventMessage(id, event, Int(stream.EventResponse().Event.EventNumber))
for k, v := range meta {em.SetHeader(k, v)
}
aggregate.Apply(em, false)
aggregate.IncrementVersion()}
return aggregate, nil
}
// Save persists an aggregate
func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error {
if r.streamNameDelegate == nil {return fmt.Errorf("The common domain repository has no stream name delagate.")
}
resultEvents := aggregate.GetChanges()
streamName, err := r.streamNameDelegate.GetStreamName(typeOf(aggregate), aggregate.AggregateID())
if err != nil {return err}
if len(resultEvents) > 0 {evs := make([]*goes.Event, len(resultEvents))
for k, v := range resultEvents {
//TODO: There is no test for this code
v.SetHeader("AggregateID", aggregate.AggregateID())
evs[k] = goes.NewEvent("", v.EventType(), v.Event(), v.GetHeaders())
}
streamWriter := r.eventStore.NewStreamWriter(streamName)
err := streamWriter.Append(expectedVersion, evs...)
switch e := err.(type) {
case nil:
break
case *goes.ErrConcurrencyViolation:
return &ErrConcurrencyViolation{Aggregate: aggregate, ExpectedVersion: expectedVersion, StreamName: streamName}
case *goes.ErrUnauthorized:
return &ErrUnauthorized{}
case *goes.ErrTemporarilyUnavailable:
return &ErrRepositoryUnavailable{}
default:
return &ErrUnexpected{Err: e}
}
}
aggregate.ClearChanges()
for k, v := range resultEvents {
if expectedVersion == nil {r.eventBus.PublishEvent(v)
} else {em := NewEventMessage(v.AggregateID(), v.Event(), Int(*expectedVersion+k+1))
r.eventBus.PublishEvent(em)
}
}
return nil
}
GetEventStoreCommonDomainRepo 定义了 eventStore、eventBus、streamNameDelegate、aggregateFactory、eventFactory 属性,其 Load 办法先通过 r.aggregateFactory.GetAggregate 获取 aggregate,再通过 r.streamNameDelegate.GetStreamName(aggregateType, id) 获取 streamName,而后通过 r.eventStore.NewStreamReader 去遍历 event,挨个执行 aggregate.Apply(em, false) 及 aggregate.IncrementVersion();其 Save 办法先通过 aggregate.GetChanges() 获取 resultEvents,再遍历 resultEvents 结构 goes.Event,之后通过 streamWriter.Append 写入,而后执行 aggregate.ClearChanges(),最初执行 r.eventBus.PublishEvent
小结
go.cqrs 的 DomainRepository 定义了 Load、Save 办法;GetEventStoreCommonDomainRepo 实现了 DomainRepository 接口,其 Load 办法次要是读取 event,而后挨个执行 aggregate.Apply;其 Save 办法次要是将 aggregate.GetChanges() 转换为 event,而后通过 streamWriter.Append 写入,而后执行 aggregate.ClearChanges(),最初执行 r.eventBus.PublishEvent。
doc
- go.cqrs