缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1
指标
- 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标(Day 3)
有序,牢靠地投递音讯
- 有序: 每个订阅者绑定独立的投递worker, 按工夫戳投递
- 牢靠: 投递状态的长久化
设计
- QueuedMsg: 把投递队列的音讯设计得胖一点, 以缩小join的应用
- IEventBus: 设计一个内部消息总线, 以缩小逻辑耦合
- tDeliveryService: 投递服务, 治理若干个投递worker
- tWorkerInfo: 投递worker的初始化参数
- tDeliveryWorker: 投递worker(未实现)
QueuedMsg.go
把待投递音讯设计得胖一点, 以缩小join的应用
package modelstype QueuedMsg struct { ID int ClientID string NotifyURL string MsgID int GlobalID string SubID string SenderID string CreateTime int64 Topic string Content string StatusFlag int FailedCount int}
IEventBus.go
设计一个内部消息总线, 以缩小逻辑耦合
package eventbusimport "sync"type EventHandleFunc func(e string, args interface{})type IEventBus interface { Pub(e string, args interface{}) Sub(e string, handler EventHandleFunc)}type tEventBus struct { rwmutex *sync.RWMutex items map[string][]EventHandleFunc}func newEventBus() IEventBus { it := new(tEventBus) it.init() return it}func (me *tEventBus) init() { me.rwmutex = new(sync.RWMutex) me.items = make(map[string][]EventHandleFunc)}func (me *tEventBus) Pub(e string, args interface{}) { me.rwmutex.RLock() defer me.rwmutex.RUnlock() handlers,ok := me.items[e] if ok { for _,it := range handlers { go it(e, args) } }}func (me *tEventBus) Sub(e string, handler EventHandleFunc) { me.rwmutex.Lock() defer me.rwmutex.Unlock() handlers,ok := me.items[e] if ok { me.items[e] = append(handlers, handler) } else { me.items[e] = []EventHandleFunc{handler } }}var GlobalEventBus = newEventBus()
tDeliveryService.go
投递服务, 治理若干个投递worker
package deliveryimport ( "github.com/jmoiron/sqlx" "learning/gooop/saga/mqs/database" "learning/gooop/saga/mqs/eventbus" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "sync" "time")type tDeliveryService struct { rwmutex *sync.RWMutex workers map[string]*tDeliveryWorker}func newDeliveryService() *tDeliveryService { it := new(tDeliveryService) it.init() return it}func (me *tDeliveryService) init() {}func (me *tDeliveryService) handleBootEvent(e string, args interface{}) { go me.beginCreatingWorkers() go me.beginCleanExpiredWorkers()}func (me *tDeliveryService) beginCreatingWorkers() { for { e := database.DB(func(db *sqlx.DB) error { now := time.Now().UnixNano() rows, err := db.Queryx("select client_id, notify_url, expire_time from subscriber where expire_time>?", now) if err != nil { return err } for rows.Next() { it := new(tWorkerInfo) err = rows.StructScan(it) if err != nil { return err } me.createWorker(it) } return nil }) if e != nil { logger.Logf("tDeliveryService.beginCreatingWorkers, error = %s", e.Error()) } time.Sleep(time.Duration(5) * time.Second) }}func (me *tDeliveryService) beginCleanExpiredWorkers() { for range time.Tick(time.Duration(30) * time.Second) { me.clean() }}func (me *tDeliveryService) clean() { me.rwmutex.RLock() var keys []string for k,v := range me.workers { if v.isExpired() { keys = append(keys, k) } } me.rwmutex.RUnlock() if len(keys) == 0 { return } me.rwmutex.Lock() defer me.rwmutex.Unlock() for _,k := range keys { delete(me.workers, k) }}func (me *tDeliveryService) handleNewSubscriber(args interface{}) { it, ok := args.(*models.SubMsg) if !ok { return } me.createWorker(&tWorkerInfo{ it.ClientID, it.NotifyUrl, it.ExpireTime, })}func maxInt64(a, b int64) int64 { if a >= b { return a } return b}func (me *tDeliveryService) createWorker(info *tWorkerInfo) { me.rwmutex.RLock() w,ok := me.workers[info.ClientID] me.rwmutex.RUnlock() if ok { w.info.ExpireTime = maxInt64(w.info.ExpireTime, info.ExpireTime) return } me.rwmutex.Lock() defer me.rwmutex.Unlock() me.workers[info.ClientID] = newDeliveryWorker(info)}var gDeliveryService = newDeliveryService()func init() { eventbus.GlobalEventBus.Sub("boot", gDeliveryService.handleBootEvent) eventbus.GlobalEventBus.Sub("subscriber.new", gDeliveryService.handleBootEvent)}
tWorkerInfo.go
投递worker的初始化参数
package deliverytype tWorkerInfo struct { ClientID string NotifyURL string ExpireTime int64}
tDeliveryWorker.go
投递worker, 未实现
package deliveryimport ( "bytes" "encoding/json" "errors" "github.com/jmoiron/sqlx" "io/ioutil" "learning/gooop/saga/mqs/database" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "net/http" "time")type tDeliveryWorker struct { info *tWorkerInfo}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker { it := new(tDeliveryWorker) it.info = info go it.beginMainLoop() return it}func (me *tDeliveryWorker) beginMainLoop() { for !me.isExpired() { ok, msg := me.peek() if ok { switch msg.StatusFlag { case 0: // 未解决的音讯 me.handleNewMsg(msg) break case 1: // 解决中的音讯 me.handleDeliveringMsg(msg) break } } else { time.Sleep(time.Duration(1) * time.Second) } }}func (me *tDeliveryWorker) isExpired() bool { return time.Now().UnixNano() >= me.info.ExpireTime}var gEmptyRowsErr = errors.New("empty rows")func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) { msg := &models.QueuedMsg{} e := database.DB(func(db *sqlx.DB) error { rows, err := db.Queryx("select * from delivery_queue where client_id=? order by create_time asc limit 1", me.info.ClientID) if err != nil { return err } if rows.Next() { err = rows.StructScan(msg) if err != nil { return err } return nil } else { return gEmptyRowsErr } }) if e != nil { return false, nil } else { return true, msg }}func (me *tDeliveryWorker) handleNewMsg(msg *models.QueuedMsg) bool { err := database.DB(func(db *sqlx.DB) error { r,e := db.Exec("update delivery_queue set status_flag=1 where id=?", msg.ID) if e != nil { return e } rows, e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gEmptyRowsErr } return nil }) if err != nil { logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) return false } if me.deliver(msg) { // todo: move msg to success queue } else { // todo: msg.failed_count++ } return false}func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool { t := &models.TxMsg{ GlobalID: msg.GlobalID, SubID: msg.SubID, Topic: msg.Topic, CreateTime: msg.CreateTime, Content: msg.Content, } j,e := json.Marshal(t) if e != nil { return false } r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j)) if e != nil { return false } defer r.Body.Close() rep, e := ioutil.ReadAll(r.Body) if e != nil { return false } m := &models.OkMsg{} e = json.Unmarshal(rep, m) if e != nil { return false } return m.OK}func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) bool { // todo: check if delivery timeout return false}
(未完待续)