手撸 golang GO 与微服务 ES-CQRS 模式之 1
缘起
最近浏览 [Go 微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
git 地址: https://gitee.com/ioly/learning.gooop
ES-CQRS 模式
ES(Event Sourcing) 事件溯源十分好了解,指的是将每次的事件都记录下来,而不是去记录对象的状态。比方新建、批改等都会作为事件记录下来,当须要最新的状态时,通过事件的重叠来计算最新的状态。依照事件溯源的模式进行架构设计,就是事件驱动架构(Event DrivenArchitecture, EDA)。命令查问职责拆散(CQRS)最早来自 Betrand Meyer 写的
Object-OrientedSoftware Construction 一书,指的是命令查问拆散(Command Query Separation,CQS)。其根本思维是任何一个对象的办法都能够分为以下两大类:
▪ 命令(Command):不返回任何后果(void),但会扭转对象的状态。▪ 查问(Query):返回后果,然而不会扭转对象的状态,对系统没有副作用。CQRS 的外围出发点就是把零碎分为读和写两局部,从而不便别离进行优化。
指标 (Day 1)
- 依据 ES-CQRS 模式, 设计 ”TODO – 待办事宜 ” 程序
设计
- TodoDTO: 待办事宜数值对象
- TodoCreatedEvent: 创立 todo 事件
- TodoUpdatedEvent: 批改 todo 事件
- TodoRemovedEvent: 删除 todo 事件
- IEventBus: 事件总线接口
- iTodoEventSerializer: 事件序列化到 JSON 数据的接口
- iTodoReader: todo 读取接口
- iTodoWriter: todo 写入接口
- iJSONStore: json 文件读写接口
- tEventBus: 事件总线的实现
- tTodoEventSerializer: 事件序列化到 JSON 的实现
- tTodoWriter: 事件写入器的实现
- tMockJSONStore: 虚构的 JSON 文件读写实现
- tTodoReader: 未实现
TodoDTO.go
待办事宜数值对象
package todo_app
type TodoDTO struct {
NO int
Title string
Content string
}
TodoCreatedEvent.go
todo 事项创立事件
package todo_app
type TodoCreatedEvent struct {Data *TodoDTO}
TodoUpdatedEvent.go
todo 事项批改事件
package todo_app
type TodoUpdatedEvent struct {Data *TodoDTO}
TodoRemovedEvent.go
todo 事项删除事件
package todo_app
type TodoRemovedEvent struct {NO int}
IEventBus.go
事件总线接口
package todo_app
type EventHandleFunc func(e string, args interface{})
type EventHandler struct {
ID string
Handler EventHandleFunc
}
type IEventBus interface {Pub(e string, args interface{})
Sub(e string, id string, handleFunc EventHandleFunc)
Unsub(e string, id string)
}
const EventTodoCreated = "todo.created"
const EventTodoUpdated = "todo.updated"
const EventTodoRemoved = "todo.removed"
iTodoEventSerializer.go
事件序列化到 JSON 数据的接口
package todo_app
type iTodoEventSerializer interface {SerializeCreatedEvent(it *TodoCreatedEvent) *tJSONData
SerializeUpdatedEvent(it *TodoUpdatedEvent) *tJSONData
SerializeRemovedEvent(it *TodoRemovedEvent) *tJSONData
}
iTodoReader.go
todo 读取接口
package todo_app
type iTodoReader interface {All() []*TodoDTO}
iTodoWriter.go
todo 写入接口
package todo_app
type iTodoWriter interface {HandleCreated(e *TodoCreatedEvent)
HandleUpdated(e *TodoUpdatedEvent)
HandleRemoved(e *TodoRemovedEvent)
}
iJSONStore.go
json 文件读写接口
package todo_app
type iJSONStore interface {Load()
Append(it *tJSONData)
}
tEventBus.go
事件总线的实现
package todo_app
import (
"learning/gooop/saga/mqs/logger"
"sync"
)
type tEventBus struct {
rwmutex *sync.RWMutex
items map[string][]*EventHandler}
func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {
return &EventHandler{id, handleFunc,}
}
func newEventBus() IEventBus {it := new(tEventBus)
it.init()
return it
}
func (me *tEventBus) init() {me.rwmutex = new(sync.RWMutex)
me.items = make(map[string][]*EventHandler)
}
func (me *tEventBus) Pub(e string, args interface{}) {me.rwmutex.RLock()
defer me.rwmutex.RUnlock()
handlers,ok := me.items[e]
if ok {
for _,it := range handlers {logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)
go it.Handler(e, args)
}
}
}
func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {me.rwmutex.Lock()
defer me.rwmutex.Unlock()
handler := newEventHandler(id, handleFunc)
handlers,ok := me.items[e]
if ok {me.items[e] = append(handlers, handler)
} else {me.items[e] = []*EventHandler{handler}
}
}
func (me *tEventBus) Unsub(e string, id string) {me.rwmutex.Lock()
defer me.rwmutex.Unlock()
handlers,ok := me.items[e]
if ok {
for i,it := range handlers {
if it.ID == id {lastI := len(handlers) - 1
if i != lastI {handlers[i], handlers[lastI] = handlers[lastI], handlers[i]
}
me.items[e] = handlers[:lastI]
}
}
}
}
var GlobalEventBus = newEventBus()
tTodoEventSerializer.go
事件序列化到 JSON 的实现
package todo_app
type tTodoEventSerializer struct {
}
func newEventSeiralizer() iTodoEventSerializer {it := new(tTodoEventSerializer)
return it
}
func (me *tTodoEventSerializer) serializeWithTag(tag int, v interface{}) *tJSONData {it := new(tJSONData)
err := it.Set(TagCreated, v)
if err != nil {return nil}
return it
}
func (me *tTodoEventSerializer) SerializeCreatedEvent(e *TodoCreatedEvent) *tJSONData {return me.serializeWithTag(TagCreated, e)
}
func (me *tTodoEventSerializer) SerializeUpdatedEvent(e *TodoUpdatedEvent) *tJSONData {return me.serializeWithTag(TagUpdated, e)
}
func (me *tTodoEventSerializer) SerializeRemovedEvent(e *TodoRemovedEvent) *tJSONData {return me.serializeWithTag(TagRemoved, e)
}
const TagCreated = 1
const TagUpdated = 2
const TagRemoved = 3
var gDefaultEventSerializer = newEventSeiralizer()
tTodoWriter.go
事件写入器的实现
package todo_app
type tTodoWriter struct {
}
func newTodoWriter() iTodoWriter {it := new(tTodoWriter)
it.init()
return it
}
func (me *tTodoWriter) init() {GlobalEventBus.Sub("todo.created", "", me.handleEvent)
}
func (me *tTodoWriter) handleEvent(e string, args interface{}) {
switch e {
case EventTodoCreated:
if it,ok := args.(*TodoCreatedEvent);ok {me.HandleCreated(it)
}
break
case EventTodoUpdated:
if it,ok := args.(*TodoUpdatedEvent);ok {me.HandleUpdated(it)
}
break
case EventTodoRemoved:
if it,ok := args.(*TodoRemovedEvent);ok {me.HandleRemoved(it)
}
break
}
}
func (me *tTodoWriter) HandleCreated(e *TodoCreatedEvent) {j := gDefaultEventSerializer.SerializeCreatedEvent(e)
if j != nil {MockJSONStore.Append(j)
}
}
func (me *tTodoWriter) HandleUpdated(e *TodoUpdatedEvent) {j := gDefaultEventSerializer.SerializeUpdatedEvent(e)
if j != nil {MockJSONStore.Append(j)
}
}
func (me *tTodoWriter) HandleRemoved(e *TodoRemovedEvent) {j := gDefaultEventSerializer.SerializeRemovedEvent(e)
if j != nil {MockJSONStore.Append(j)
}
}
tMockJSONStore.go
虚构的 JSON 文件读写实现
package todo_app
import "sync"
type tMockJSONStore struct {
rwmutex *sync.RWMutex
once sync.Once
items []*tJSONData}
func newMockJSONStore() iJSONStore {it := new(tMockJSONStore)
it.init()
return it
}
func (me *tMockJSONStore) init() {me.rwmutex = new(sync.RWMutex)
me.items = []*tJSONData{}
}
func (me *tMockJSONStore) Load() {me.once.Do(func() {me.rwmutex.RLock()
defer me.rwmutex.RUnlock()
for _,it := range me.items {
switch it.Tag {
case TagCreated:
v := new(TodoCreatedEvent)
e := it.Get(v)
if e == nil {GlobalEventBus.Pub(EventTodoCreated, e)
}
break
case TagUpdated:
v := new(TodoUpdatedEvent)
e := it.Get(v)
if e == nil {GlobalEventBus.Pub(EventTodoUpdated, e)
}
break
case TagRemoved:
v := new(TodoRemovedEvent)
e := it.Get(v)
if e == nil {GlobalEventBus.Pub(EventTodoRemoved, e)
}
break
}
}
})
}
func (me *tMockJSONStore) Append(it *tJSONData) {me.rwmutex.Lock()
defer me.rwmutex.Unlock()
me.items = append(me.items, it)
}
var MockJSONStore = newMockJSONStore()
(未完待续)