手撸golang GO与微服务 ES-CQRS模式之2
缘起
最近浏览 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
gitee:
- https://gitee.com/ioly/learning.gooop
ES-CQRS模式
ES(Event Sourcing)事件溯源十分好了解,指的是将每次的事件都记录下来,而不是去记录对象的状态。比方新建、批改等都会作为事件记录下来,当须要最新的状态时,通过事件的重叠来计算最新的状态。依照事件溯源的模式进行架构设计,就是事件驱动架构(Event DrivenArchitecture, EDA)。命令查问职责拆散(CQRS)最早来自Betrand Meyer写的Object-OrientedSoftware Construction一书,指的是命令查问拆散(Command Query Separation,CQS)。其根本思维是任何一个对象的办法都能够分为以下两大类:▪ 命令(Command):不返回任何后果(void),但会扭转对象的状态。▪ 查问(Query):返回后果,然而不会扭转对象的状态,对系统没有副作用。CQRS的外围出发点就是把零碎分为读和写两局部,从而不便别离进行优化。
指标(Day 2)
- 依据ES-CQRS模式, 大幅重构Day 1的设计, 并进行单元测试
设计
- TodoDTO: 待办事宜数值对象
- OperationTag: todo写入事件的类型标记
- TodoEvent: todo写入事件
- ClassTag: json序列化的类型标记
- tJSONData: json序列化的数据容器
- IEventBus: 事件总线接口
- iTodoEventSerializer: 事件序列化到JSON数据的接口
- iTodoReader: todo读取接口
- iTodoWriter: todo写入接口
- iJSONStore: json文件读写接口
- ITodoService: todo待办事宜服务接口
- tEventBus: 事件总线的实现
- tTodoEventSerializer: 事件序列化到JSON的实现
- tTodoWriter: 事件写入器的实现, 监听write指令, 并长久化到json存储
- tMockJSONStore: 虚构的JSON文件读写实现
- tTodoReader: 待办事宜读取器, 监听write和load指令, 并计算todo列表的以后状态
- tMockTodoService: 待办事宜服务的实现
单元测试
todo_app_test.go
package es_cqrsimport ( td "learning/gooop/es_cqrs/todo_app" "testing")func fnAssertTrue (t *testing.T, b bool, msg string) { if !b { t.Fatal(msg) }}func Test_TodoApp(t *testing.T) { t1 := &td.TodoDTO{ 1, "title-1", "content-1" } td.MockTodoService.Create(t1) all := td.MockTodoService.GetAll() fnAssertTrue(t, len(all) == 1, "expecting 1 item") fnAssertTrue(t, all[0].Title == t1.Title, "expecting " + t1.Title) t.Log("pass creating") t1.Content = "content-1 updated" t1.Title = "title-1 updated" td.MockTodoService.Update(t1) all = td.MockTodoService.GetAll() fnAssertTrue(t, len(all) == 1, "expecting 1 item") fnAssertTrue(t, all[0].Content == t1.Content, "expecting " + t1.Content) t.Log("pass updating") td.MockTodoService.Delete(t1) all = td.MockTodoService.GetAll() fnAssertTrue(t, len(all) == 0, "expecting 0 items") t.Log("pass deleting")}
测试输入
$ go test -v todo_app_test.go === RUN Test_TodoApp22:38:08.180382833 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.122:38:08.180533659 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}22:38:08.180539669 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.222:38:08.180552255 tTodoReader.items: [&{1 title-1 content-1}]22:38:08.180557245 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.222:38:08.180560995 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService todo_app_test.go:21: pass creating22:38:08.180580644 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.122:38:08.180604465 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}22:38:08.180612665 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.222:38:08.180618512 tTodoReader.items: [&{1 title-1 updated content-1 updated}]22:38:08.18062244 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.222:38:08.180626445 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService todo_app_test.go:29: pass updating22:38:08.180642172 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.122:38:08.180656612 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}, {"Tag":3,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}22:38:08.180669129 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.222:38:08.180672774 tTodoReader.items: []22:38:08.180675952 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.222:38:08.180679309 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService todo_app_test.go:34: pass deleting--- PASS: Test_TodoApp (0.00s)PASSok command-line-arguments 0.002s
TodoDTO.go
待办事宜数值对象
package todo_apptype TodoDTO struct { NO int Title string Content string}func (me *TodoDTO) Clone() *TodoDTO { return &TodoDTO{ me.NO, me.Title, me.Content, }}
OperationTag.go
todo写入事件的类型标记
package todo_apptype OperationTag intconst OPCreated OperationTag = 1const OPUpdated OperationTag = 2const OPDeleted OperationTag = 3
TodoEvent.go
todo写入事件
package todo_apptype TodoEvent struct { Tag OperationTag Data *TodoDTO}
ClassTag.go
json序列化的类型标记
package todo_apptype ClassTag intconst TodoEventClass ClassTag = 1
tJSONData.go
json序列化的数据容器
package todo_appimport "encoding/json"type tJSONData struct { Tag ClassTag Content []byte}func (me *tJSONData) Set(tag ClassTag, it interface{}) error { me.Tag = tag j, e := json.Marshal(it) if e != nil { return e } me.Content = j return nil}func (me *tJSONData) Get(it interface{}) error { return json.Unmarshal(me.Content, it)}
IEventBus.go
事件总线接口
package todo_apptype EventHandleFunc func(e string, args interface{})type EventHandler struct { ID string Handler EventHandleFunc}type IEventBus interface { Pub(e string, args interface{}) Sub(e string, id string, handleFunc EventHandleFunc) Unsub(e string, id string)}const EventWriteTodoCmd = "todo.write.cmd"const EventReadTodoCmd = "todo.read.cmd"const EventReadTodoRet = "todo.read.ret"const EventLoadTodoCmd = "todo.load.cmd"
iTodoEventSerializer.go
事件序列化到JSON数据的接口
package todo_apptype iTodoEventSerializer interface { Serialize(it *TodoEvent) *tJSONData}
iTodoReader.go
todo读取接口
package todo_apptype iTodoReader interface { All() []*TodoDTO HandleTodoEvent(e *TodoEvent)}
iTodoWriter.go
todo写入接口
package todo_apptype iTodoWriter interface { HandleTodoEvent(e *TodoEvent)}
iJSONStore.go
json文件读写接口
package todo_apptype iJSONStore interface { Load() Append(it *tJSONData)}
ITodoService.go
todo待办事宜服务接口
package todo_apptype ITodoService interface { Create(it *TodoDTO) Update(it *TodoDTO) Delete(it *TodoDTO) GetAll() []*TodoDTO}
tEventBus.go
事件总线的实现
package todo_appimport ( "learning/gooop/saga/mqs/logger" "sync")type tEventBus struct { rwmutex *sync.RWMutex items map[string][]*EventHandler}func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler { return &EventHandler{ id, handleFunc, }}func newEventBus() IEventBus { it := new(tEventBus) it.init() return it}func (me *tEventBus) init() { me.rwmutex = new(sync.RWMutex) me.items = make(map[string][]*EventHandler)}func (me *tEventBus) Pub(e string, args interface{}) { me.rwmutex.RLock() defer me.rwmutex.RUnlock() handlers, ok := me.items[e] if ok { for _, it := range handlers { logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID) it.Handler(e, args) } }}func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) { me.rwmutex.Lock() defer me.rwmutex.Unlock() handler := newEventHandler(id, handleFunc) handlers, ok := me.items[e] if ok { me.items[e] = append(handlers, handler) } else { me.items[e] = []*EventHandler{handler} }}func (me *tEventBus) Unsub(e string, id string) { me.rwmutex.Lock() defer me.rwmutex.Unlock() handlers, ok := me.items[e] if ok { for i, it := range handlers { if it.ID == id { lastI := len(handlers) - 1 if i != lastI { handlers[i], handlers[lastI] = handlers[lastI], handlers[i] } me.items[e] = handlers[:lastI] } } }}var GlobalEventBus = newEventBus()
tTodoEventSerializer.go
事件序列化到JSON的实现
package todo_apptype tTodoEventSerializer struct {}func newEventSeiralizer() iTodoEventSerializer { it := new(tTodoEventSerializer) return it}func (me *tTodoEventSerializer) Serialize(e *TodoEvent) *tJSONData { it := new(tJSONData) err := it.Set(TodoEventClass, e) if err != nil { return nil } return it}var gDefaultEventSerializer = newEventSeiralizer()
tTodoWriter.go
事件写入器的实现, 监听write指令, 并长久化到json存储
package todo_appimport ( "fmt" "sync/atomic")type tTodoWriter struct { id string}func newTodoWriter() iTodoWriter { it := new(tTodoWriter) it.init() return it}func (me *tTodoWriter) init() { me.id = fmt.Sprintf("tTodoWriter.%d", atomic.AddInt32(&gWriterCounter, 1)) GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleWriteTodoCmd)}func (me *tTodoWriter) handleWriteTodoCmd(e string, args interface{}) { switch e { case EventWriteTodoCmd: if it, ok := args.(*TodoEvent); ok { me.HandleTodoEvent(it) } break }}func (me *tTodoWriter) HandleTodoEvent(e *TodoEvent) { j := gDefaultEventSerializer.Serialize(e) if j != nil { MockJSONStore.Append(j) }}var gWriterCounter int32 = 0
tMockJSONStore.go
虚构的JSON文件读写实现
package todo_appimport ( "fmt" "learning/gooop/saga/mqs/logger" "strings" "sync")type tMockJSONStore struct { rwmutex *sync.RWMutex once sync.Once items []*tJSONData}func newMockJSONStore() iJSONStore { it := new(tMockJSONStore) it.init() return it}func (me *tMockJSONStore) init() { me.rwmutex = new(sync.RWMutex) me.items = []*tJSONData{}}func (me *tMockJSONStore) Load() { me.once.Do(func() { me.rwmutex.RLock() defer me.rwmutex.RUnlock() for _, it := range me.items { switch it.Tag { case TodoEventClass: v := new(TodoEvent) e := it.Get(v) if e == nil { GlobalEventBus.Pub(EventLoadTodoCmd, e) } break } } })}func (me *tMockJSONStore) Append(it *tJSONData) { me.rwmutex.Lock() defer me.rwmutex.Unlock() me.items = append(me.items, it) lines := []string{} for _,it := range me.items { lines = append(lines, fmt.Sprintf("%s", string(it.Content))) } logger.Logf("tMockJSONStore.items: %s", strings.Join(lines, ", "))}var MockJSONStore = newMockJSONStore()
tTodoReader.go
待办事宜读取器, 监听write和load指令, 并计算todo列表的以后状态
package todo_appimport ( "fmt" "learning/gooop/saga/mqs/logger" "strings" "sync" "sync/atomic")type tTodoReader struct { id string rwmutex *sync.RWMutex items []*TodoDTO}func newTodoReader() iTodoReader { it := new(tTodoReader) it.init() return it}func (me *tTodoReader) init() { id := fmt.Sprintf("tTodoReader.%d", atomic.AddInt32(&gReaderCounter, 1)) me.id = id me.rwmutex = new(sync.RWMutex) GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleEvent) GlobalEventBus.Sub(EventLoadTodoCmd, me.id, me.handleEvent) GlobalEventBus.Sub(EventReadTodoCmd, me.id, me.handleEvent)}func (me *tTodoReader) handleEvent(e string, args interface{}) { switch e { case EventWriteTodoCmd: fallthrough case EventLoadTodoCmd: if v,ok := args.(*TodoEvent);ok { me.HandleTodoEvent(v) } break case EventReadTodoCmd: me.handleReadTodoList() }}func (me *tTodoReader) handleReadTodoList() { GlobalEventBus.Pub(EventReadTodoRet, me.All())}func (me *tTodoReader) All() []*TodoDTO { me.rwmutex.RLock() defer me.rwmutex.RUnlock() lst := make([]*TodoDTO, len(me.items)) for i,it := range me.items { lst[i] = it } return lst}func (me *tTodoReader) HandleTodoEvent(e *TodoEvent) { me.rwmutex.Lock() defer me.rwmutex.Unlock() switch e.Tag { case OPCreated: me.items = append(me.items, e.Data.Clone()) break case OPUpdated: for i,it := range me.items { if it.NO == e.Data.NO { me.items[i] = e.Data.Clone() break } } break case OPDeleted: for i,it := range me.items { if it.NO == e.Data.NO { lastI := len(me.items) - 1 if i == lastI { me.items[i] = nil } else { me.items[i], me.items[lastI] = me.items[lastI], nil } me.items = me.items[:lastI] break } } break } lines := []string{} for _,it := range me.items { lines = append(lines, fmt.Sprintf("%v", it)) } logger.Logf("tTodoReader.items: [%s]", strings.Join(lines, ", "))}var gReaderCounter int32 = 1
tMockTodoService.go
待办事宜服务的实现, 提供todo项的CRUD
package todo_apptype tMockTodoService struct { items []*TodoDTO writer iTodoWriter reader iTodoReader}func newMockTodoService() ITodoService { it := new(tMockTodoService) it.init() return it}func (me *tMockTodoService) init() { me.writer = newTodoWriter() me.reader = newTodoReader() GlobalEventBus.Sub(EventReadTodoRet, "tMockTodoService", me.handleReadTodoRet)}func (me *tMockTodoService) handleReadTodoRet(e string, args interface{}) { switch e { case EventReadTodoRet: if it,ok := args.([]*TodoDTO);ok { me.items = it } break }}func (me *tMockTodoService) Create(it *TodoDTO) { GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPCreated, it.Clone() })}func (me *tMockTodoService) Update(it *TodoDTO) { GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPUpdated, it.Clone() })}func (me *tMockTodoService) Delete(it *TodoDTO) { GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPDeleted, it.Clone() })}func (me *tMockTodoService) GetAll() []*TodoDTO { me.items = nil GlobalEventBus.Pub(EventReadTodoCmd, nil) lst := me.items me.items = nil return lst}var MockTodoService = newMockTodoService()
(ES-CQRS end)