手撸golang GO与微服务 ES-CQRS模式之2

缘起

最近浏览 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
gitee:

  • https://gitee.com/ioly/learning.gooop

ES-CQRS模式

ES(Event Sourcing)事件溯源十分好了解,指的是将每次的事件都记录下来,而不是去记录对象的状态。比方新建、批改等都会作为事件记录下来,当须要最新的状态时,通过事件的重叠来计算最新的状态。依照事件溯源的模式进行架构设计,就是事件驱动架构(Event DrivenArchitecture, EDA)。命令查问职责拆散(CQRS)最早来自Betrand Meyer写的Object-OrientedSoftware Construction一书,指的是命令查问拆散(Command Query Separation,CQS)。其根本思维是任何一个对象的办法都能够分为以下两大类:▪ 命令(Command):不返回任何后果(void),但会扭转对象的状态。▪ 查问(Query):返回后果,然而不会扭转对象的状态,对系统没有副作用。CQRS的外围出发点就是把零碎分为读和写两局部,从而不便别离进行优化。

指标(Day 2)

  • 依据ES-CQRS模式, 大幅重构Day 1的设计, 并进行单元测试

设计

  • TodoDTO: 待办事宜数值对象
  • OperationTag: todo写入事件的类型标记
  • TodoEvent: todo写入事件
  • ClassTag: json序列化的类型标记
  • tJSONData: json序列化的数据容器
  • IEventBus: 事件总线接口
  • iTodoEventSerializer: 事件序列化到JSON数据的接口
  • iTodoReader: todo读取接口
  • iTodoWriter: todo写入接口
  • iJSONStore: json文件读写接口
  • ITodoService: todo待办事宜服务接口
  • tEventBus: 事件总线的实现
  • tTodoEventSerializer: 事件序列化到JSON的实现
  • tTodoWriter: 事件写入器的实现, 监听write指令, 并长久化到json存储
  • tMockJSONStore: 虚构的JSON文件读写实现
  • tTodoReader: 待办事宜读取器, 监听write和load指令, 并计算todo列表的以后状态
  • tMockTodoService: 待办事宜服务的实现

单元测试

todo_app_test.go

package es_cqrsimport (    td "learning/gooop/es_cqrs/todo_app"    "testing")func fnAssertTrue (t *testing.T, b bool, msg string) {    if !b {        t.Fatal(msg)    }}func Test_TodoApp(t *testing.T) {    t1 := &td.TodoDTO{ 1, "title-1", "content-1" }    td.MockTodoService.Create(t1)    all := td.MockTodoService.GetAll()    fnAssertTrue(t, len(all) == 1, "expecting 1 item")    fnAssertTrue(t, all[0].Title == t1.Title, "expecting " + t1.Title)    t.Log("pass creating")    t1.Content = "content-1 updated"    t1.Title = "title-1 updated"    td.MockTodoService.Update(t1)    all = td.MockTodoService.GetAll()    fnAssertTrue(t, len(all) == 1, "expecting 1 item")    fnAssertTrue(t, all[0].Content == t1.Content, "expecting " + t1.Content)    t.Log("pass updating")    td.MockTodoService.Delete(t1)    all = td.MockTodoService.GetAll()    fnAssertTrue(t, len(all) == 0, "expecting 0 items")    t.Log("pass deleting")}

测试输入

$ go test -v todo_app_test.go === RUN   Test_TodoApp22:38:08.180382833 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.122:38:08.180533659 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}22:38:08.180539669 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.222:38:08.180552255 tTodoReader.items: [&{1 title-1 content-1}]22:38:08.180557245 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.222:38:08.180560995 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService    todo_app_test.go:21: pass creating22:38:08.180580644 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.122:38:08.180604465 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}22:38:08.180612665 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.222:38:08.180618512 tTodoReader.items: [&{1 title-1 updated content-1 updated}]22:38:08.18062244 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.222:38:08.180626445 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService    todo_app_test.go:29: pass updating22:38:08.180642172 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.122:38:08.180656612 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}, {"Tag":3,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}22:38:08.180669129 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.222:38:08.180672774 tTodoReader.items: []22:38:08.180675952 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.222:38:08.180679309 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService    todo_app_test.go:34: pass deleting--- PASS: Test_TodoApp (0.00s)PASSok      command-line-arguments  0.002s

TodoDTO.go

待办事宜数值对象

package todo_apptype TodoDTO struct {    NO      int    Title   string    Content string}func (me *TodoDTO) Clone() *TodoDTO {    return &TodoDTO{        me.NO, me.Title, me.Content,    }}

OperationTag.go

todo写入事件的类型标记

package todo_apptype OperationTag intconst OPCreated OperationTag = 1const OPUpdated OperationTag = 2const OPDeleted OperationTag = 3

TodoEvent.go

todo写入事件

package todo_apptype TodoEvent struct {    Tag OperationTag    Data *TodoDTO}

ClassTag.go

json序列化的类型标记

package todo_apptype ClassTag intconst TodoEventClass ClassTag = 1

tJSONData.go

json序列化的数据容器

package todo_appimport "encoding/json"type tJSONData struct {    Tag     ClassTag    Content []byte}func (me *tJSONData) Set(tag ClassTag, it interface{}) error {    me.Tag = tag    j, e := json.Marshal(it)    if e != nil {        return e    }    me.Content = j    return nil}func (me *tJSONData) Get(it interface{}) error {    return json.Unmarshal(me.Content, it)}

IEventBus.go

事件总线接口

package todo_apptype EventHandleFunc func(e string, args interface{})type EventHandler struct {    ID      string    Handler EventHandleFunc}type IEventBus interface {    Pub(e string, args interface{})    Sub(e string, id string, handleFunc EventHandleFunc)    Unsub(e string, id string)}const EventWriteTodoCmd = "todo.write.cmd"const EventReadTodoCmd = "todo.read.cmd"const EventReadTodoRet = "todo.read.ret"const EventLoadTodoCmd = "todo.load.cmd"

iTodoEventSerializer.go

事件序列化到JSON数据的接口

package todo_apptype iTodoEventSerializer interface {    Serialize(it *TodoEvent) *tJSONData}

iTodoReader.go

todo读取接口

package todo_apptype iTodoReader interface {    All() []*TodoDTO    HandleTodoEvent(e *TodoEvent)}

iTodoWriter.go

todo写入接口

package todo_apptype iTodoWriter interface {    HandleTodoEvent(e *TodoEvent)}

iJSONStore.go

json文件读写接口

package todo_apptype iJSONStore interface {    Load()    Append(it *tJSONData)}

ITodoService.go

todo待办事宜服务接口

package todo_apptype ITodoService interface {    Create(it *TodoDTO)    Update(it *TodoDTO)    Delete(it *TodoDTO)    GetAll() []*TodoDTO}

tEventBus.go

事件总线的实现

package todo_appimport (    "learning/gooop/saga/mqs/logger"    "sync")type tEventBus struct {    rwmutex *sync.RWMutex    items   map[string][]*EventHandler}func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {    return &EventHandler{        id, handleFunc,    }}func newEventBus() IEventBus {    it := new(tEventBus)    it.init()    return it}func (me *tEventBus) init() {    me.rwmutex = new(sync.RWMutex)    me.items = make(map[string][]*EventHandler)}func (me *tEventBus) Pub(e string, args interface{}) {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()    handlers, ok := me.items[e]    if ok {        for _, it := range handlers {            logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)            it.Handler(e, args)        }    }}func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    handler := newEventHandler(id, handleFunc)    handlers, ok := me.items[e]    if ok {        me.items[e] = append(handlers, handler)    } else {        me.items[e] = []*EventHandler{handler}    }}func (me *tEventBus) Unsub(e string, id string) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    handlers, ok := me.items[e]    if ok {        for i, it := range handlers {            if it.ID == id {                lastI := len(handlers) - 1                if i != lastI {                    handlers[i], handlers[lastI] = handlers[lastI], handlers[i]                }                me.items[e] = handlers[:lastI]            }        }    }}var GlobalEventBus = newEventBus()

tTodoEventSerializer.go

事件序列化到JSON的实现

package todo_apptype tTodoEventSerializer struct {}func newEventSeiralizer() iTodoEventSerializer {    it := new(tTodoEventSerializer)    return it}func (me *tTodoEventSerializer) Serialize(e *TodoEvent) *tJSONData {    it := new(tJSONData)    err := it.Set(TodoEventClass, e)    if err != nil {        return nil    }    return it}var gDefaultEventSerializer = newEventSeiralizer()

tTodoWriter.go

事件写入器的实现, 监听write指令, 并长久化到json存储

package todo_appimport (    "fmt"    "sync/atomic")type tTodoWriter struct {    id string}func newTodoWriter() iTodoWriter {    it := new(tTodoWriter)    it.init()    return it}func (me *tTodoWriter) init() {    me.id = fmt.Sprintf("tTodoWriter.%d", atomic.AddInt32(&gWriterCounter, 1))    GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleWriteTodoCmd)}func (me *tTodoWriter) handleWriteTodoCmd(e string, args interface{}) {    switch e {    case EventWriteTodoCmd:        if it, ok := args.(*TodoEvent); ok {            me.HandleTodoEvent(it)        }        break    }}func (me *tTodoWriter) HandleTodoEvent(e *TodoEvent) {    j := gDefaultEventSerializer.Serialize(e)    if j != nil {        MockJSONStore.Append(j)    }}var gWriterCounter int32 = 0

tMockJSONStore.go

虚构的JSON文件读写实现

package todo_appimport (    "fmt"    "learning/gooop/saga/mqs/logger"    "strings"    "sync")type tMockJSONStore struct {    rwmutex *sync.RWMutex    once    sync.Once    items   []*tJSONData}func newMockJSONStore() iJSONStore {    it := new(tMockJSONStore)    it.init()    return it}func (me *tMockJSONStore) init() {    me.rwmutex = new(sync.RWMutex)    me.items = []*tJSONData{}}func (me *tMockJSONStore) Load() {    me.once.Do(func() {        me.rwmutex.RLock()        defer me.rwmutex.RUnlock()        for _, it := range me.items {            switch it.Tag {            case TodoEventClass:                v := new(TodoEvent)                e := it.Get(v)                if e == nil {                    GlobalEventBus.Pub(EventLoadTodoCmd, e)                }                break            }        }    })}func (me *tMockJSONStore) Append(it *tJSONData) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    me.items = append(me.items, it)    lines := []string{}    for _,it := range me.items {        lines = append(lines, fmt.Sprintf("%s", string(it.Content)))    }    logger.Logf("tMockJSONStore.items: %s", strings.Join(lines, ", "))}var MockJSONStore = newMockJSONStore()

tTodoReader.go

待办事宜读取器, 监听write和load指令, 并计算todo列表的以后状态

package todo_appimport (    "fmt"    "learning/gooop/saga/mqs/logger"    "strings"    "sync"    "sync/atomic")type tTodoReader struct {    id string    rwmutex *sync.RWMutex    items []*TodoDTO}func newTodoReader() iTodoReader {    it := new(tTodoReader)    it.init()    return it}func (me *tTodoReader) init() {    id := fmt.Sprintf("tTodoReader.%d", atomic.AddInt32(&gReaderCounter, 1))    me.id = id    me.rwmutex = new(sync.RWMutex)    GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleEvent)    GlobalEventBus.Sub(EventLoadTodoCmd, me.id, me.handleEvent)    GlobalEventBus.Sub(EventReadTodoCmd, me.id, me.handleEvent)}func (me *tTodoReader) handleEvent(e string, args interface{}) {    switch e {    case EventWriteTodoCmd:        fallthrough    case EventLoadTodoCmd:        if v,ok := args.(*TodoEvent);ok {            me.HandleTodoEvent(v)        }        break    case EventReadTodoCmd:        me.handleReadTodoList()    }}func (me *tTodoReader) handleReadTodoList() {    GlobalEventBus.Pub(EventReadTodoRet, me.All())}func (me *tTodoReader) All() []*TodoDTO {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()    lst := make([]*TodoDTO, len(me.items))    for i,it := range me.items {        lst[i] = it    }    return lst}func (me *tTodoReader) HandleTodoEvent(e *TodoEvent) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    switch e.Tag {    case OPCreated:        me.items = append(me.items, e.Data.Clone())        break    case OPUpdated:        for i,it := range me.items {            if it.NO == e.Data.NO {                me.items[i] = e.Data.Clone()                break            }        }        break    case OPDeleted:        for i,it := range me.items {            if it.NO == e.Data.NO {                lastI := len(me.items) - 1                if i == lastI {                    me.items[i] = nil                } else {                    me.items[i], me.items[lastI] = me.items[lastI], nil                }                me.items = me.items[:lastI]                break            }        }        break    }    lines := []string{}    for _,it := range me.items {        lines = append(lines, fmt.Sprintf("%v", it))    }    logger.Logf("tTodoReader.items: [%s]", strings.Join(lines, ", "))}var gReaderCounter int32 = 1

tMockTodoService.go

待办事宜服务的实现, 提供todo项的CRUD

package todo_apptype tMockTodoService struct {    items []*TodoDTO    writer iTodoWriter    reader iTodoReader}func newMockTodoService() ITodoService {    it := new(tMockTodoService)    it.init()    return it}func (me *tMockTodoService) init() {    me.writer = newTodoWriter()    me.reader = newTodoReader()    GlobalEventBus.Sub(EventReadTodoRet, "tMockTodoService", me.handleReadTodoRet)}func (me *tMockTodoService) handleReadTodoRet(e string, args interface{}) {    switch e {    case EventReadTodoRet:        if it,ok := args.([]*TodoDTO);ok {            me.items = it        }        break    }}func (me *tMockTodoService) Create(it *TodoDTO) {    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPCreated, it.Clone() })}func (me *tMockTodoService) Update(it *TodoDTO) {    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPUpdated, it.Clone() })}func (me *tMockTodoService) Delete(it *TodoDTO) {    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPDeleted, it.Clone() })}func (me *tMockTodoService) GetAll() []*TodoDTO {    me.items = nil    GlobalEventBus.Pub(EventReadTodoCmd, nil)    lst := me.items    me.items = nil    return lst}var MockTodoService = newMockTodoService()

(ES-CQRS end)