手撸golang GO与微服务 ES-CQRS模式之1

缘起

最近浏览 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
git地址: https://gitee.com/ioly/learning.gooop

ES-CQRS模式

ES(Event Sourcing)事件溯源十分好了解,指的是将每次的事件都记录下来,而不是去记录对象的状态。比方新建、批改等都会作为事件记录下来,当须要最新的状态时,通过事件的重叠来计算最新的状态。依照事件溯源的模式进行架构设计,就是事件驱动架构(Event DrivenArchitecture, EDA)。命令查问职责拆散(CQRS)最早来自Betrand Meyer写的Object-OrientedSoftware Construction一书,指的是命令查问拆散(Command Query Separation,CQS)。其根本思维是任何一个对象的办法都能够分为以下两大类:▪ 命令(Command):不返回任何后果(void),但会扭转对象的状态。▪ 查问(Query):返回后果,然而不会扭转对象的状态,对系统没有副作用。CQRS的外围出发点就是把零碎分为读和写两局部,从而不便别离进行优化。

指标(Day 1)

  • 依据ES-CQRS模式, 设计"TODO - 待办事宜"程序

设计

  • TodoDTO: 待办事宜数值对象
  • TodoCreatedEvent: 创立todo事件
  • TodoUpdatedEvent: 批改todo事件
  • TodoRemovedEvent: 删除todo事件
  • IEventBus: 事件总线接口
  • iTodoEventSerializer: 事件序列化到JSON数据的接口
  • iTodoReader: todo读取接口
  • iTodoWriter: todo写入接口
  • iJSONStore: json文件读写接口
  • tEventBus: 事件总线的实现
  • tTodoEventSerializer: 事件序列化到JSON的实现
  • tTodoWriter: 事件写入器的实现
  • tMockJSONStore: 虚构的JSON文件读写实现
  • tTodoReader: 未实现

TodoDTO.go

待办事宜数值对象

package todo_apptype TodoDTO struct {    NO int    Title string    Content string}

TodoCreatedEvent.go

todo事项创立事件

package todo_apptype TodoCreatedEvent struct {    Data *TodoDTO}

TodoUpdatedEvent.go

todo事项批改事件

package todo_apptype TodoUpdatedEvent struct {    Data *TodoDTO}

TodoRemovedEvent.go

todo事项删除事件

package todo_apptype TodoRemovedEvent struct {    NO int}

IEventBus.go

事件总线接口

package todo_apptype EventHandleFunc func(e string, args interface{})type EventHandler struct {    ID string    Handler EventHandleFunc}type IEventBus interface {    Pub(e string, args interface{})    Sub(e string, id string, handleFunc EventHandleFunc)    Unsub(e string, id string)}const EventTodoCreated = "todo.created"const EventTodoUpdated = "todo.updated"const EventTodoRemoved = "todo.removed"

iTodoEventSerializer.go

事件序列化到JSON数据的接口

package todo_apptype iTodoEventSerializer interface {    SerializeCreatedEvent(it *TodoCreatedEvent) *tJSONData    SerializeUpdatedEvent(it *TodoUpdatedEvent) *tJSONData    SerializeRemovedEvent(it *TodoRemovedEvent) *tJSONData}

iTodoReader.go

todo读取接口

package todo_apptype iTodoReader interface {    All() []*TodoDTO}

iTodoWriter.go

todo写入接口

package todo_apptype iTodoWriter interface {    HandleCreated(e *TodoCreatedEvent)    HandleUpdated(e *TodoUpdatedEvent)    HandleRemoved(e *TodoRemovedEvent)}

iJSONStore.go

json文件读写接口

package todo_apptype iJSONStore interface {    Load()    Append(it *tJSONData)}

tEventBus.go

事件总线的实现

package todo_appimport (    "learning/gooop/saga/mqs/logger"    "sync")type tEventBus struct {    rwmutex *sync.RWMutex    items map[string][]*EventHandler}func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {    return &EventHandler{        id, handleFunc,    }}func newEventBus() IEventBus {    it := new(tEventBus)    it.init()    return it}func (me *tEventBus) init() {    me.rwmutex = new(sync.RWMutex)    me.items = make(map[string][]*EventHandler)}func (me *tEventBus) Pub(e string, args interface{}) {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()    handlers,ok := me.items[e]    if ok {        for _,it := range handlers {            logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)            go it.Handler(e, args)        }    }}func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    handler := newEventHandler(id, handleFunc)    handlers,ok := me.items[e]    if ok {        me.items[e] = append(handlers, handler)    } else {        me.items[e] = []*EventHandler{handler }    }}func (me *tEventBus) Unsub(e string, id string) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    handlers,ok := me.items[e]    if ok {        for i,it := range handlers {            if it.ID == id {                lastI := len(handlers) - 1                if i != lastI {                    handlers[i], handlers[lastI] = handlers[lastI], handlers[i]                }                me.items[e] = handlers[:lastI]            }        }    }}var GlobalEventBus = newEventBus()

tTodoEventSerializer.go

事件序列化到JSON的实现

package todo_apptype tTodoEventSerializer struct {}func newEventSeiralizer() iTodoEventSerializer {    it := new(tTodoEventSerializer)    return it}func (me *tTodoEventSerializer) serializeWithTag(tag int, v interface{}) *tJSONData {    it := new(tJSONData)    err := it.Set(TagCreated, v)    if err != nil {        return nil    }    return it}func (me *tTodoEventSerializer) SerializeCreatedEvent(e *TodoCreatedEvent) *tJSONData {    return me.serializeWithTag(TagCreated, e)}func (me *tTodoEventSerializer) SerializeUpdatedEvent(e *TodoUpdatedEvent) *tJSONData {    return me.serializeWithTag(TagUpdated, e)}func (me *tTodoEventSerializer) SerializeRemovedEvent(e *TodoRemovedEvent) *tJSONData {    return me.serializeWithTag(TagRemoved, e)}const TagCreated = 1const TagUpdated = 2const TagRemoved = 3var gDefaultEventSerializer = newEventSeiralizer()

tTodoWriter.go

事件写入器的实现

package todo_apptype tTodoWriter struct {}func newTodoWriter() iTodoWriter {    it := new(tTodoWriter)    it.init()    return it}func (me *tTodoWriter) init() {    GlobalEventBus.Sub("todo.created", "", me.handleEvent)}func (me *tTodoWriter) handleEvent(e string, args interface{}) {    switch e {    case EventTodoCreated:        if it,ok := args.(*TodoCreatedEvent);ok {            me.HandleCreated(it)        }        break    case EventTodoUpdated:        if it,ok := args.(*TodoUpdatedEvent);ok {            me.HandleUpdated(it)        }        break    case EventTodoRemoved:        if it,ok := args.(*TodoRemovedEvent);ok {            me.HandleRemoved(it)        }        break    }}func (me *tTodoWriter) HandleCreated(e *TodoCreatedEvent) {    j := gDefaultEventSerializer.SerializeCreatedEvent(e)    if j != nil {        MockJSONStore.Append(j)    }}func (me *tTodoWriter) HandleUpdated(e *TodoUpdatedEvent) {    j := gDefaultEventSerializer.SerializeUpdatedEvent(e)    if j != nil {        MockJSONStore.Append(j)    }}func (me *tTodoWriter) HandleRemoved(e *TodoRemovedEvent) {    j := gDefaultEventSerializer.SerializeRemovedEvent(e)    if j != nil {        MockJSONStore.Append(j)    }}

tMockJSONStore.go

虚构的JSON文件读写实现

package todo_appimport "sync"type tMockJSONStore struct {    rwmutex *sync.RWMutex    once sync.Once    items []*tJSONData}func newMockJSONStore() iJSONStore {    it := new(tMockJSONStore)    it.init()    return it}func (me *tMockJSONStore) init() {    me.rwmutex = new(sync.RWMutex)    me.items = []*tJSONData{}}func (me *tMockJSONStore) Load() {    me.once.Do(func() {        me.rwmutex.RLock()        defer me.rwmutex.RUnlock()        for _,it := range me.items {            switch it.Tag {            case TagCreated:                v := new(TodoCreatedEvent)                e := it.Get(v)                if e == nil {                    GlobalEventBus.Pub(EventTodoCreated, e)                }                break            case TagUpdated:                v := new(TodoUpdatedEvent)                e := it.Get(v)                if e == nil {                    GlobalEventBus.Pub(EventTodoUpdated, e)                }                break            case TagRemoved:                v := new(TodoRemovedEvent)                e := it.Get(v)                if e == nil {                    GlobalEventBus.Pub(EventTodoRemoved, e)                }                break            }        }    })}func (me *tMockJSONStore) Append(it *tJSONData) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    me.items = append(me.items, it)}var MockJSONStore = newMockJSONStore()

(未完待续)