手撸golang GO与微服务 ES-CQRS模式之1
缘起
最近浏览 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
git地址: https://gitee.com/ioly/learning.gooop
ES-CQRS模式
ES(Event Sourcing)事件溯源十分好了解,指的是将每次的事件都记录下来,而不是去记录对象的状态。比方新建、批改等都会作为事件记录下来,当须要最新的状态时,通过事件的重叠来计算最新的状态。依照事件溯源的模式进行架构设计,就是事件驱动架构(Event DrivenArchitecture, EDA)。命令查问职责拆散(CQRS)最早来自Betrand Meyer写的Object-OrientedSoftware Construction一书,指的是命令查问拆散(Command Query Separation,CQS)。其根本思维是任何一个对象的办法都能够分为以下两大类:▪ 命令(Command):不返回任何后果(void),但会扭转对象的状态。▪ 查问(Query):返回后果,然而不会扭转对象的状态,对系统没有副作用。CQRS的外围出发点就是把零碎分为读和写两局部,从而不便别离进行优化。
指标(Day 1)
- 依据ES-CQRS模式, 设计"TODO - 待办事宜"程序
设计
- TodoDTO: 待办事宜数值对象
- TodoCreatedEvent: 创立todo事件
- TodoUpdatedEvent: 批改todo事件
- TodoRemovedEvent: 删除todo事件
- IEventBus: 事件总线接口
- iTodoEventSerializer: 事件序列化到JSON数据的接口
- iTodoReader: todo读取接口
- iTodoWriter: todo写入接口
- iJSONStore: json文件读写接口
- tEventBus: 事件总线的实现
- tTodoEventSerializer: 事件序列化到JSON的实现
- tTodoWriter: 事件写入器的实现
- tMockJSONStore: 虚构的JSON文件读写实现
- tTodoReader: 未实现
TodoDTO.go
待办事宜数值对象
package todo_apptype TodoDTO struct { NO int Title string Content string}
TodoCreatedEvent.go
todo事项创立事件
package todo_apptype TodoCreatedEvent struct { Data *TodoDTO}
TodoUpdatedEvent.go
todo事项批改事件
package todo_apptype TodoUpdatedEvent struct { Data *TodoDTO}
TodoRemovedEvent.go
todo事项删除事件
package todo_apptype TodoRemovedEvent struct { NO int}
IEventBus.go
事件总线接口
package todo_apptype EventHandleFunc func(e string, args interface{})type EventHandler struct { ID string Handler EventHandleFunc}type IEventBus interface { Pub(e string, args interface{}) Sub(e string, id string, handleFunc EventHandleFunc) Unsub(e string, id string)}const EventTodoCreated = "todo.created"const EventTodoUpdated = "todo.updated"const EventTodoRemoved = "todo.removed"
iTodoEventSerializer.go
事件序列化到JSON数据的接口
package todo_apptype iTodoEventSerializer interface { SerializeCreatedEvent(it *TodoCreatedEvent) *tJSONData SerializeUpdatedEvent(it *TodoUpdatedEvent) *tJSONData SerializeRemovedEvent(it *TodoRemovedEvent) *tJSONData}
iTodoReader.go
todo读取接口
package todo_apptype iTodoReader interface { All() []*TodoDTO}
iTodoWriter.go
todo写入接口
package todo_apptype iTodoWriter interface { HandleCreated(e *TodoCreatedEvent) HandleUpdated(e *TodoUpdatedEvent) HandleRemoved(e *TodoRemovedEvent)}
iJSONStore.go
json文件读写接口
package todo_apptype iJSONStore interface { Load() Append(it *tJSONData)}
tEventBus.go
事件总线的实现
package todo_appimport ( "learning/gooop/saga/mqs/logger" "sync")type tEventBus struct { rwmutex *sync.RWMutex items map[string][]*EventHandler}func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler { return &EventHandler{ id, handleFunc, }}func newEventBus() IEventBus { it := new(tEventBus) it.init() return it}func (me *tEventBus) init() { me.rwmutex = new(sync.RWMutex) me.items = make(map[string][]*EventHandler)}func (me *tEventBus) Pub(e string, args interface{}) { me.rwmutex.RLock() defer me.rwmutex.RUnlock() handlers,ok := me.items[e] if ok { for _,it := range handlers { logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID) go it.Handler(e, args) } }}func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) { me.rwmutex.Lock() defer me.rwmutex.Unlock() handler := newEventHandler(id, handleFunc) handlers,ok := me.items[e] if ok { me.items[e] = append(handlers, handler) } else { me.items[e] = []*EventHandler{handler } }}func (me *tEventBus) Unsub(e string, id string) { me.rwmutex.Lock() defer me.rwmutex.Unlock() handlers,ok := me.items[e] if ok { for i,it := range handlers { if it.ID == id { lastI := len(handlers) - 1 if i != lastI { handlers[i], handlers[lastI] = handlers[lastI], handlers[i] } me.items[e] = handlers[:lastI] } } }}var GlobalEventBus = newEventBus()
tTodoEventSerializer.go
事件序列化到JSON的实现
package todo_apptype tTodoEventSerializer struct {}func newEventSeiralizer() iTodoEventSerializer { it := new(tTodoEventSerializer) return it}func (me *tTodoEventSerializer) serializeWithTag(tag int, v interface{}) *tJSONData { it := new(tJSONData) err := it.Set(TagCreated, v) if err != nil { return nil } return it}func (me *tTodoEventSerializer) SerializeCreatedEvent(e *TodoCreatedEvent) *tJSONData { return me.serializeWithTag(TagCreated, e)}func (me *tTodoEventSerializer) SerializeUpdatedEvent(e *TodoUpdatedEvent) *tJSONData { return me.serializeWithTag(TagUpdated, e)}func (me *tTodoEventSerializer) SerializeRemovedEvent(e *TodoRemovedEvent) *tJSONData { return me.serializeWithTag(TagRemoved, e)}const TagCreated = 1const TagUpdated = 2const TagRemoved = 3var gDefaultEventSerializer = newEventSeiralizer()
tTodoWriter.go
事件写入器的实现
package todo_apptype tTodoWriter struct {}func newTodoWriter() iTodoWriter { it := new(tTodoWriter) it.init() return it}func (me *tTodoWriter) init() { GlobalEventBus.Sub("todo.created", "", me.handleEvent)}func (me *tTodoWriter) handleEvent(e string, args interface{}) { switch e { case EventTodoCreated: if it,ok := args.(*TodoCreatedEvent);ok { me.HandleCreated(it) } break case EventTodoUpdated: if it,ok := args.(*TodoUpdatedEvent);ok { me.HandleUpdated(it) } break case EventTodoRemoved: if it,ok := args.(*TodoRemovedEvent);ok { me.HandleRemoved(it) } break }}func (me *tTodoWriter) HandleCreated(e *TodoCreatedEvent) { j := gDefaultEventSerializer.SerializeCreatedEvent(e) if j != nil { MockJSONStore.Append(j) }}func (me *tTodoWriter) HandleUpdated(e *TodoUpdatedEvent) { j := gDefaultEventSerializer.SerializeUpdatedEvent(e) if j != nil { MockJSONStore.Append(j) }}func (me *tTodoWriter) HandleRemoved(e *TodoRemovedEvent) { j := gDefaultEventSerializer.SerializeRemovedEvent(e) if j != nil { MockJSONStore.Append(j) }}
tMockJSONStore.go
虚构的JSON文件读写实现
package todo_appimport "sync"type tMockJSONStore struct { rwmutex *sync.RWMutex once sync.Once items []*tJSONData}func newMockJSONStore() iJSONStore { it := new(tMockJSONStore) it.init() return it}func (me *tMockJSONStore) init() { me.rwmutex = new(sync.RWMutex) me.items = []*tJSONData{}}func (me *tMockJSONStore) Load() { me.once.Do(func() { me.rwmutex.RLock() defer me.rwmutex.RUnlock() for _,it := range me.items { switch it.Tag { case TagCreated: v := new(TodoCreatedEvent) e := it.Get(v) if e == nil { GlobalEventBus.Pub(EventTodoCreated, e) } break case TagUpdated: v := new(TodoUpdatedEvent) e := it.Get(v) if e == nil { GlobalEventBus.Pub(EventTodoUpdated, e) } break case TagRemoved: v := new(TodoRemovedEvent) e := it.Get(v) if e == nil { GlobalEventBus.Pub(EventTodoRemoved, e) } break } } })}func (me *tMockJSONStore) Append(it *tJSONData) { me.rwmutex.Lock() defer me.rwmutex.Unlock() me.items = append(me.items, it)}var MockJSONStore = newMockJSONStore()
(未完待续)