缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1
指标
- 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标(Day 5)
重构和欠缺音讯投递机制
- iMsgHeap: 应用待投递音讯堆缓存音讯. 总是优先投递创立工夫最小的音讯
- iMsgSource: 定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus
- iMsgHistoryRing: 应用ring buffer记录近期已投递胜利的音讯, 避免反复投递
- tConcurrentMsgHeap: 最小CreateTime优先的音讯堆, 实现iMsgHeap接口, 并且是线程平安的.
- tDBMsgSource: 从数据库拉取待投递音讯, 实现iMsgSource接口
- tLiveMsgSource: 监听eventbus即时推送的投递音讯, 实现iMsgSource接口
- tMsgHistoryRing: 历史音讯的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递胜利的音讯
tDeliveryWorker:
- 初始化时, 优先从数据库加载待投递音讯
- 应用iMsgHeap缓存待投递音讯, 并确保有序
- 应用iMsgSource接口, 别离从db和eventbus接管投递音讯
- 应用iMsgHistoryRing, 缓存已投递胜利的音讯, 避免反复投递
iMsgHeap.go
应用待投递音讯堆缓存音讯. 总是优先投递创立工夫最小的音讯
package deliveryimport "learning/gooop/saga/mqs/models"type iMsgHeap interface { Size() int IsEmpty() bool IsNotEmpty() bool Push(msg *models.QueuedMsg) Peek() *models.QueuedMsg Pop() *models.QueuedMsg}
iMsgSource.go
定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus
package deliveryimport "learning/gooop/saga/mqs/models"type iMsgSource interface { MsgChan() <- chan *models.QueuedMsg}type tSourceExpireFunc func() bool
iMsgHistoryRing.go
应用ring buffer记录近期已投递胜利的音讯, 避免反复投递
package deliveryimport "learning/gooop/saga/mqs/models"type iMsgHistoryRing interface { Push(msg *models.QueuedMsg) Has(id int) bool}
tConcurrentMsgHeap.go
最小CreateTime优先的音讯堆, 实现iMsgHeap接口, 并且是线程平安的.
package deliveryimport ( "learning/gooop/saga/mqs/models" "sync")type tConcurrentMsgHeap struct { items []*models.QueuedMsg size int mutex *sync.Mutex}func newMsgHeap() iMsgHeap { it := new(tConcurrentMsgHeap) it.init() return it}func (me *tConcurrentMsgHeap) init() { me.items = make([]*models.QueuedMsg, 0) me.size = 0 me.mutex = new(sync.Mutex)}func (me *tConcurrentMsgHeap) Size() int { return me.size}func (me *tConcurrentMsgHeap) IsEmpty() bool { return me.size <= 0}func (me *tConcurrentMsgHeap) IsNotEmpty() bool { return !me.IsEmpty()}func (me *tConcurrentMsgHeap) has(msgID int) bool { for _,it := range me.items { if it.MsgID == msgID { return true } } return false}func (me *tConcurrentMsgHeap) Push(msg *models.QueuedMsg) { me.mutex.Lock() defer me.mutex.Unlock() if me.has(msg.MsgID) { return } me.ensureSize(me.size + 1) me.items[me.size] = msg me.size++ me.shiftUp(me.size - 1)}func (me *tConcurrentMsgHeap) ensureSize(size int) { for ;len(me.items) < size; { me.items = append(me.items, nil) }}func (me *tConcurrentMsgHeap) parentOf(i int) int { return (i - 1) / 2}func (me *tConcurrentMsgHeap) leftChildOf(i int) int { return i*2 + 1}func (me *tConcurrentMsgHeap) rightChildOf(i int) int { return me.leftChildOf(i) + 1}func (me *tConcurrentMsgHeap) last() (i int, v *models.QueuedMsg) { if me.IsEmpty() { return -1, nil } i = me.size - 1 v = me.items[i] return i,v}func (me *tConcurrentMsgHeap) shiftUp(i int) { if i <= 0 { return } v := me.items[i] pi := me.parentOf(i) pv := me.items[pi] if me.less(v, pv) { me.items[pi], me.items[i] = v, pv me.shiftUp(pi) }}func (me *tConcurrentMsgHeap) less(a, b *models.QueuedMsg) bool { return a.CreateTime < b.CreateTime}func (me *tConcurrentMsgHeap) Pop() *models.QueuedMsg { me.mutex.Lock() defer me.mutex.Unlock() if me.IsEmpty() { return nil } top := me.items[0] li, lv := me.last() me.items[0] = nil me.size-- if me.IsEmpty() { return top } me.items[0] = lv me.items[li] = nil me.shiftDown(0) return top}func (me *tConcurrentMsgHeap) Peek() *models.QueuedMsg { me.mutex.Lock() defer me.mutex.Unlock() if me.IsEmpty() { return nil } return me.items[0]}func (me *tConcurrentMsgHeap) shiftDown(i int) { pv := me.items[i] ok, ci, cv := me.minChildOf(i) if ok && me.less(cv, pv) { me.items[i], me.items[ci] = cv, pv me.shiftDown(ci) }}func (me *tConcurrentMsgHeap) minChildOf(p int) (ok bool, i int, v *models.QueuedMsg) { li := me.leftChildOf(p) if li >= me.size { return false, 0, nil } lv := me.items[li] ri := me.rightChildOf(p) if ri >= me.size { return true, li, lv } rv := me.items[ri] if me.less(lv, rv) { return true, li, lv } else { return true, ri, rv }}
tDBMsgSource.go
从数据库拉取待投递音讯, 实现iMsgSource接口
package deliveryimport ( "github.com/jmoiron/sqlx" "learning/gooop/saga/mqs/database" "learning/gooop/saga/mqs/models" "time")type tDBMsgSource struct { clientID string expireFunc tSourceExpireFunc msgChan chan *models.QueuedMsg}func newDBMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource { it := new(tDBMsgSource) it.init(clientID, expireFunc) return it}func (me *tDBMsgSource) init(clientID string, expireFunc tSourceExpireFunc) { me.clientID = clientID me.expireFunc = expireFunc me.msgChan = make(chan *models.QueuedMsg, 1) go me.beginPollDB()}func (me *tDBMsgSource) MsgChan() <- chan *models.QueuedMsg { return me.msgChan}func (me *tDBMsgSource) beginPollDB() { interval := time.Duration(1) * time.Second for !me.expireFunc() { if len(me.msgChan) <= 0 { ok, msg := me.poll() if ok { me.msgChan <- msg continue } } // poll failed, or chan full time.Sleep(interval) } close(me.msgChan)}func (me *tDBMsgSource) poll() (bool, *models.QueuedMsg) { msg := &models.QueuedMsg{} e := database.DB(func(db *sqlx.DB) error { rows, err := db.Queryx( "select * from delivery_queue where client_id=? order by create_time asc limit 1", me.clientID, ) if err != nil { return err } if rows.Next() { err = rows.StructScan(msg) if err != nil { return err } return nil } else { return gEmptyRowsErr } }) if e != nil { return false, nil } else { return true, msg }}
tLiveMsgSource.go
监听eventbus即时推送的投递音讯, 实现iMsgSource接口
package deliveryimport ( "fmt" "learning/gooop/saga/mqs/eventbus" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "learning/gooop/saga/mqs/models/events" "time")type tLiveMsgSource struct { clientID string expireFunc tSourceExpireFunc msgChan chan *models.QueuedMsg}func newLiveMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource { it := new(tLiveMsgSource) it.init(clientID, expireFunc) return it}func (me *tLiveMsgSource) init(clientID string, expireFunc tSourceExpireFunc) { me.clientID = clientID me.expireFunc = expireFunc me.msgChan = make(chan *models.QueuedMsg, 1) eventbus.GlobalEventBus.Sub(events.MsgPublishedEvent, me.id(), me.handleMsgPublished) go me.beginWatchExpire()}func (me *tLiveMsgSource) id() string { return fmt.Sprintf("tLiveMsgSource.%s", me.clientID)}func (me *tLiveMsgSource) beginWatchExpire() { for range time.Tick(1 * time.Second) { if me.expireFunc() { me.afterExpired() return } }}func (me *tLiveMsgSource) afterExpired() { eventbus.GlobalEventBus.Unsub(events.MsgPublishedEvent, me.id()) close(me.msgChan)}func (me *tLiveMsgSource) handleMsgPublished(_ string, args interface{}) { msg, ok := args.(*models.QueuedMsg) if !ok { return } if msg.ClientID != me.clientID { return } if len(me.msgChan) >= 0 { return } logger.Logf( "tLiveMsgSource.handleMsgPublished, clientID=%s, msg=%s/%s/%s", me.clientID, msg.GlobalID, msg.SubID, msg.Topic ) me.msgChan <- msg}func (me *tLiveMsgSource) MsgChan() <- chan *models.QueuedMsg { return me.msgChan}
tMsgHistoryRing.go
历史音讯的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递胜利的音讯
package deliveryimport "learning/gooop/saga/mqs/models"type tMsgHistoryRing struct { items []*models.QueuedMsg capacity int index int}func newMsgHistoryRing(capacity int) iMsgHistoryRing { it := new(tMsgHistoryRing) it.init(capacity) return it}func (me *tMsgHistoryRing) init(capacity int) { me.items = make([]*models.QueuedMsg, capacity) me.capacity = capacity me.index = 0}func (me *tMsgHistoryRing) Has(id int) bool { for _,it := range me.items { if it != nil && it.ID == id { return true } } return false}func (me *tMsgHistoryRing) Push(msg *models.QueuedMsg) { me.items[me.index] = msg me.index++ if me.index >= me.capacity { me.index = 0 }}
tDeliveryWorker.go
- 初始化时, 优先从数据库加载待投递音讯
- 应用iMsgHeap缓存待投递音讯, 并确保有序
- 应用iMsgSource接口, 别离从db和eventbus接管投递音讯
- 应用iMsgHistoryRing, 缓存已投递胜利的音讯, 避免反复投递
package deliveryimport ( "bytes" "encoding/json" "errors" "github.com/jmoiron/sqlx" "io/ioutil" "learning/gooop/saga/mqs/database" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "net/http" "time")type tDeliveryWorker struct { info *tWorkerInfo successRing iMsgHistoryRing dbSource iMsgSource liveSource iMsgSource msgHeap iMsgHeap}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker { it := new(tDeliveryWorker) it.init(info) return it}// init: do initialization, and start initial loadfunc (me *tDeliveryWorker) init(info *tWorkerInfo) { me.info = info me.successRing = newMsgHistoryRing(64) me.dbSource = newDBMsgSource(info.ClientID, me.isExpired) me.liveSource = newLiveMsgSource(info.ClientID, me.isExpired) me.msgHeap = newMsgHeap() go me.beginInitialLoadFromDB()}// beginInitialLoadFromDB: initially, load queued msg from databasefunc (me *tDeliveryWorker) beginInitialLoadFromDB() { buf := [][]*models.QueuedMsg{ nil } for !me.isExpired() { err := database.DB(func(db *sqlx.DB) error { e, rows := me.loadFromDB(db) if e != nil { return e } buf[0] = rows return nil }) if err != nil { logger.Logf("tDeliveryWorker.initialLoadFromDB, clientID=%s, err=%s", me.info.ClientID, err.Error()) time.Sleep(3 * time.Second) } else { me.afterInitialLoad(buf[0]) } }}// loadFromDB: load queued msg from databasefunc (me *tDeliveryWorker) loadFromDB(db *sqlx.DB) (error, []*models.QueuedMsg) { rows, err := db.Queryx( "select * from delivery_queue where client_id=? order by create_time asc limit ?", me.info.ClientID, gInitialLoadRows, ) if err != nil { return err, nil } msgList := []*models.QueuedMsg{} for rows.Next() { msg := &models.QueuedMsg{} err = rows.StructScan(msg) if err != nil { return err, nil } msgList = append(msgList, msg) } return nil, msgList}// afterInitialLoad: after initial load done, push msgs into heap, and start delivery loopfunc (me *tDeliveryWorker) afterInitialLoad(msgList []*models.QueuedMsg) { logger.Logf("tDeliveryWorker.afterInitialLoad, clientID=%s, rows=%d", me.info.ClientID, len(msgList)) for _,it := range msgList { me.msgHeap.Push(it) } go me.beginPollAndDeliver()}// beginPollAndDeliver: poll msg from heap, and then deliver itfunc (me *tDeliveryWorker) beginPollAndDeliver() { for !me.isExpired() { select { case msg := <- me.dbSource.MsgChan(): me.msgHeap.Push(msg) break case msg := <- me.liveSource.MsgChan(): me.msgHeap.Push(msg) break } if me.msgHeap.IsEmpty() { continue } msg := me.msgHeap.Pop() if msg == nil { continue } switch msg.StatusFlag { case 0: // 未解决的音讯 me.handleUndeliveredMsg(msg) break case 1: // 解决中的音讯 me.handleDeliveringMsg(msg) break } }}// isExpired: is me expired?func (me *tDeliveryWorker) isExpired() bool { return time.Now().UnixNano() >= me.info.ExpireTime}// handleUndeliveredMsg: if msg unhandled, then try to deliver itfunc (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { now := time.Now().UnixNano() r,e := db.Exec( "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?", now, msg.ID, msg.UpdateTime, ) if e != nil { return e } rows, e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } msg.StatusFlag = 1 msg.UpdateTime = now return nil }) if err != nil { logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) return } if me.deliver(msg) { me.afterDeliverySuccess(msg) } else { me.afterDeliveryFailed(msg) }}// deliver: use http.Post function to delivery msgfunc (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool { if me.successRing.Has(msg.ID) { return true } t := &models.TxMsg{ GlobalID: msg.GlobalID, SubID: msg.SubID, Topic: msg.Topic, CreateTime: msg.CreateTime, Content: msg.Content, } j,e := json.Marshal(t) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j)) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } defer r.Body.Close() rep, e := ioutil.ReadAll(r.Body) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } m := &models.OkMsg{} e = json.Unmarshal(rep, m) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } if m.OK { return true } else { logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false }}// handleDeliveringMsg: if delivery timeout, then retry deliveryfunc (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) { now := time.Now().UnixNano() if msg.UpdateTime + gDeliveryTimeoutNanos > now { return } // delivery timeout me.afterDeliveryTimeout(msg)}// afterDeliverySuccess: if done, move msg to success queuefunc (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) { if me.successRing.Has(msg.ID) { return } me.successRing.Push(msg) err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error { r,e := db.Exec( "delete from delivery_queue where id=? and update_time=? and status_flag=1", msg.ID, msg.UpdateTime, ) if e != nil { return e } rows, e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } r, e = db.Exec( "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)", msg.ID, msg.ClientID, time.Now().UnixNano(), ) if e != nil { return e } rows, e = r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } return nil }) if err != nil { logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) } else { logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) }}// afterDeliveryFailed: if failed, do nothing but just log itfunc (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) { logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}// afterDeliveryTimeout: if timeout, then reset status and retryfunc (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { r,e := db.Exec( "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?", msg.ID, msg.UpdateTime, ) if e != nil { return e } rows,e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } return nil }) if err != nil { logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) } else { logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) }}var gEmptyRowsErr = errors.New("empty rows")var gOneRowsErr = errors.New("expecting one row affected")var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))var gInitialLoadRows = 100
(未完待续)