缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 5)

  • 重构和欠缺音讯投递机制

    • iMsgHeap: 应用待投递音讯堆缓存音讯. 总是优先投递创立工夫最小的音讯
    • iMsgSource: 定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus
    • iMsgHistoryRing: 应用ring buffer记录近期已投递胜利的音讯, 避免反复投递
    • tConcurrentMsgHeap: 最小CreateTime优先的音讯堆, 实现iMsgHeap接口, 并且是线程平安的.
    • tDBMsgSource: 从数据库拉取待投递音讯, 实现iMsgSource接口
    • tLiveMsgSource: 监听eventbus即时推送的投递音讯, 实现iMsgSource接口
    • tMsgHistoryRing: 历史音讯的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递胜利的音讯
    • tDeliveryWorker:

      • 初始化时, 优先从数据库加载待投递音讯
      • 应用iMsgHeap缓存待投递音讯, 并确保有序
      • 应用iMsgSource接口, 别离从db和eventbus接管投递音讯
      • 应用iMsgHistoryRing, 缓存已投递胜利的音讯, 避免反复投递

iMsgHeap.go

应用待投递音讯堆缓存音讯. 总是优先投递创立工夫最小的音讯

package deliveryimport "learning/gooop/saga/mqs/models"type iMsgHeap interface {    Size() int    IsEmpty() bool    IsNotEmpty() bool    Push(msg *models.QueuedMsg)    Peek() *models.QueuedMsg    Pop() *models.QueuedMsg}

iMsgSource.go

定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus

package deliveryimport "learning/gooop/saga/mqs/models"type iMsgSource interface {    MsgChan() <- chan *models.QueuedMsg}type tSourceExpireFunc func() bool

iMsgHistoryRing.go

应用ring buffer记录近期已投递胜利的音讯, 避免反复投递

package deliveryimport "learning/gooop/saga/mqs/models"type iMsgHistoryRing interface {    Push(msg *models.QueuedMsg)    Has(id int) bool}

tConcurrentMsgHeap.go

最小CreateTime优先的音讯堆, 实现iMsgHeap接口, 并且是线程平安的.

package deliveryimport (    "learning/gooop/saga/mqs/models"    "sync")type tConcurrentMsgHeap struct {    items []*models.QueuedMsg    size int    mutex *sync.Mutex}func newMsgHeap() iMsgHeap {    it := new(tConcurrentMsgHeap)    it.init()    return it}func (me *tConcurrentMsgHeap) init() {    me.items = make([]*models.QueuedMsg, 0)    me.size = 0    me.mutex = new(sync.Mutex)}func (me *tConcurrentMsgHeap) Size() int {    return me.size}func (me *tConcurrentMsgHeap) IsEmpty() bool {    return me.size <= 0}func (me *tConcurrentMsgHeap) IsNotEmpty() bool {    return !me.IsEmpty()}func (me *tConcurrentMsgHeap) has(msgID int) bool {    for _,it := range me.items {        if it.MsgID == msgID {            return true        }    }    return false}func (me *tConcurrentMsgHeap) Push(msg *models.QueuedMsg) {    me.mutex.Lock()    defer me.mutex.Unlock()    if me.has(msg.MsgID) {        return    }    me.ensureSize(me.size + 1)    me.items[me.size] = msg    me.size++    me.shiftUp(me.size - 1)}func (me *tConcurrentMsgHeap) ensureSize(size int) {    for ;len(me.items) < size; {        me.items = append(me.items, nil)    }}func (me *tConcurrentMsgHeap) parentOf(i int) int {    return (i - 1) / 2}func (me *tConcurrentMsgHeap) leftChildOf(i int) int {    return i*2 + 1}func (me *tConcurrentMsgHeap) rightChildOf(i int) int {    return me.leftChildOf(i) + 1}func (me *tConcurrentMsgHeap) last() (i int, v *models.QueuedMsg) {    if me.IsEmpty() {        return -1, nil    }    i = me.size - 1    v = me.items[i]    return i,v}func (me *tConcurrentMsgHeap) shiftUp(i int) {    if i <= 0 {        return    }    v := me.items[i]    pi := me.parentOf(i)    pv := me.items[pi]    if me.less(v, pv) {        me.items[pi], me.items[i] = v, pv        me.shiftUp(pi)    }}func (me *tConcurrentMsgHeap) less(a, b *models.QueuedMsg) bool {    return a.CreateTime < b.CreateTime}func (me *tConcurrentMsgHeap) Pop() *models.QueuedMsg {    me.mutex.Lock()    defer me.mutex.Unlock()    if me.IsEmpty() {        return nil    }    top := me.items[0]    li, lv := me.last()    me.items[0] = nil    me.size--    if me.IsEmpty() {        return top    }    me.items[0] = lv    me.items[li] = nil    me.shiftDown(0)    return top}func (me *tConcurrentMsgHeap) Peek() *models.QueuedMsg {    me.mutex.Lock()    defer me.mutex.Unlock()    if me.IsEmpty() {        return nil    }    return me.items[0]}func (me *tConcurrentMsgHeap) shiftDown(i int) {    pv := me.items[i]    ok, ci, cv := me.minChildOf(i)    if ok && me.less(cv, pv) {        me.items[i], me.items[ci] = cv, pv        me.shiftDown(ci)    }}func (me *tConcurrentMsgHeap) minChildOf(p int) (ok bool, i int, v *models.QueuedMsg) {    li := me.leftChildOf(p)    if li >= me.size {        return false, 0, nil    }    lv := me.items[li]    ri := me.rightChildOf(p)    if ri >= me.size {        return true, li, lv    }    rv := me.items[ri]    if me.less(lv, rv) {        return true, li, lv    } else {        return true, ri, rv    }}

tDBMsgSource.go

从数据库拉取待投递音讯, 实现iMsgSource接口

package deliveryimport (    "github.com/jmoiron/sqlx"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/models"    "time")type tDBMsgSource struct {    clientID string    expireFunc tSourceExpireFunc    msgChan chan *models.QueuedMsg}func newDBMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {    it := new(tDBMsgSource)    it.init(clientID, expireFunc)    return it}func (me *tDBMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {    me.clientID = clientID    me.expireFunc = expireFunc    me.msgChan = make(chan *models.QueuedMsg, 1)    go me.beginPollDB()}func (me *tDBMsgSource) MsgChan() <- chan *models.QueuedMsg {    return me.msgChan}func (me *tDBMsgSource) beginPollDB() {    interval := time.Duration(1) * time.Second    for !me.expireFunc() {        if len(me.msgChan) <= 0 {            ok, msg := me.poll()            if ok {                me.msgChan <- msg                continue            }        }        // poll failed, or chan full        time.Sleep(interval)    }    close(me.msgChan)}func (me *tDBMsgSource) poll() (bool, *models.QueuedMsg) {    msg := &models.QueuedMsg{}    e := database.DB(func(db *sqlx.DB) error {        rows, err := db.Queryx(            "select * from delivery_queue where client_id=? order by create_time asc limit 1",            me.clientID,        )        if err != nil {            return err        }        if rows.Next() {            err = rows.StructScan(msg)            if err != nil {                return err            }            return nil        } else {            return gEmptyRowsErr        }    })    if e != nil {        return false, nil    } else {        return true, msg    }}

tLiveMsgSource.go

监听eventbus即时推送的投递音讯, 实现iMsgSource接口

package deliveryimport (    "fmt"    "learning/gooop/saga/mqs/eventbus"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "learning/gooop/saga/mqs/models/events"    "time")type tLiveMsgSource struct {    clientID string    expireFunc tSourceExpireFunc    msgChan chan *models.QueuedMsg}func newLiveMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {    it := new(tLiveMsgSource)    it.init(clientID, expireFunc)    return it}func (me *tLiveMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {    me.clientID = clientID    me.expireFunc = expireFunc    me.msgChan = make(chan *models.QueuedMsg, 1)    eventbus.GlobalEventBus.Sub(events.MsgPublishedEvent,        me.id(),        me.handleMsgPublished)    go me.beginWatchExpire()}func (me *tLiveMsgSource) id() string {    return fmt.Sprintf("tLiveMsgSource.%s", me.clientID)}func (me *tLiveMsgSource) beginWatchExpire() {    for range time.Tick(1 * time.Second) {        if me.expireFunc() {            me.afterExpired()            return        }    }}func (me *tLiveMsgSource) afterExpired() {    eventbus.GlobalEventBus.Unsub(events.MsgPublishedEvent, me.id())    close(me.msgChan)}func (me *tLiveMsgSource) handleMsgPublished(_ string, args interface{}) {    msg, ok := args.(*models.QueuedMsg)    if !ok {        return    }    if msg.ClientID != me.clientID {        return    }    if len(me.msgChan) >= 0 {        return    }    logger.Logf(        "tLiveMsgSource.handleMsgPublished, clientID=%s, msg=%s/%s/%s",        me.clientID, msg.GlobalID, msg.SubID, msg.Topic )    me.msgChan <- msg}func (me *tLiveMsgSource) MsgChan() <- chan *models.QueuedMsg {    return me.msgChan}

tMsgHistoryRing.go

历史音讯的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递胜利的音讯

package deliveryimport "learning/gooop/saga/mqs/models"type tMsgHistoryRing struct {    items []*models.QueuedMsg    capacity int    index int}func newMsgHistoryRing(capacity int) iMsgHistoryRing {    it := new(tMsgHistoryRing)    it.init(capacity)    return it}func (me *tMsgHistoryRing) init(capacity int) {    me.items = make([]*models.QueuedMsg, capacity)    me.capacity = capacity    me.index = 0}func (me *tMsgHistoryRing) Has(id int) bool {    for _,it := range me.items {        if it != nil && it.ID == id {            return true        }    }    return false}func (me *tMsgHistoryRing) Push(msg *models.QueuedMsg) {    me.items[me.index] = msg    me.index++    if me.index >= me.capacity {        me.index = 0    }}

tDeliveryWorker.go

  • 初始化时, 优先从数据库加载待投递音讯
  • 应用iMsgHeap缓存待投递音讯, 并确保有序
  • 应用iMsgSource接口, 别离从db和eventbus接管投递音讯
  • 应用iMsgHistoryRing, 缓存已投递胜利的音讯, 避免反复投递
package deliveryimport (    "bytes"    "encoding/json"    "errors"    "github.com/jmoiron/sqlx"    "io/ioutil"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "net/http"    "time")type tDeliveryWorker struct {    info        *tWorkerInfo    successRing iMsgHistoryRing    dbSource iMsgSource    liveSource iMsgSource    msgHeap iMsgHeap}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {    it := new(tDeliveryWorker)    it.init(info)    return it}// init: do initialization, and start initial loadfunc (me *tDeliveryWorker) init(info *tWorkerInfo) {    me.info = info    me.successRing = newMsgHistoryRing(64)    me.dbSource = newDBMsgSource(info.ClientID, me.isExpired)    me.liveSource = newLiveMsgSource(info.ClientID, me.isExpired)    me.msgHeap = newMsgHeap()    go me.beginInitialLoadFromDB()}// beginInitialLoadFromDB: initially, load queued msg from databasefunc (me *tDeliveryWorker) beginInitialLoadFromDB() {    buf := [][]*models.QueuedMsg{ nil }    for !me.isExpired() {        err := database.DB(func(db *sqlx.DB) error {            e, rows := me.loadFromDB(db)            if e != nil {                return e            }            buf[0] = rows            return nil        })        if err != nil {            logger.Logf("tDeliveryWorker.initialLoadFromDB, clientID=%s, err=%s", me.info.ClientID, err.Error())            time.Sleep(3 * time.Second)        } else {            me.afterInitialLoad(buf[0])        }    }}// loadFromDB: load queued msg from databasefunc (me *tDeliveryWorker) loadFromDB(db *sqlx.DB) (error, []*models.QueuedMsg) {    rows, err := db.Queryx(        "select * from delivery_queue where client_id=? order by create_time asc limit ?",        me.info.ClientID,        gInitialLoadRows,    )    if err != nil {        return err, nil    }    msgList := []*models.QueuedMsg{}    for rows.Next() {        msg := &models.QueuedMsg{}        err = rows.StructScan(msg)        if err != nil {            return err, nil        }        msgList = append(msgList, msg)    }    return nil, msgList}// afterInitialLoad: after initial load done, push msgs into heap, and start delivery loopfunc (me *tDeliveryWorker) afterInitialLoad(msgList []*models.QueuedMsg) {    logger.Logf("tDeliveryWorker.afterInitialLoad, clientID=%s, rows=%d", me.info.ClientID, len(msgList))    for _,it := range msgList {        me.msgHeap.Push(it)    }    go me.beginPollAndDeliver()}// beginPollAndDeliver: poll msg from heap, and then deliver itfunc (me *tDeliveryWorker) beginPollAndDeliver() {    for !me.isExpired() {        select {        case msg := <- me.dbSource.MsgChan():            me.msgHeap.Push(msg)            break        case msg := <- me.liveSource.MsgChan():            me.msgHeap.Push(msg)            break        }        if me.msgHeap.IsEmpty() {            continue        }        msg := me.msgHeap.Pop()        if msg == nil {            continue        }        switch msg.StatusFlag {        case 0:            // 未解决的音讯            me.handleUndeliveredMsg(msg)            break        case 1:            // 解决中的音讯            me.handleDeliveringMsg(msg)            break        }    }}// isExpired: is me expired?func (me *tDeliveryWorker) isExpired() bool {    return time.Now().UnixNano() >= me.info.ExpireTime}// handleUndeliveredMsg: if msg unhandled, then try to deliver itfunc (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {    err := database.DB(func(db *sqlx.DB) error {        now := time.Now().UnixNano()        r,e := db.Exec(            "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",            now,            msg.ID,            msg.UpdateTime,        )        if e != nil {            return e        }        rows, e := r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        msg.StatusFlag = 1        msg.UpdateTime = now        return nil    })    if err != nil {        logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())        return    }    if me.deliver(msg) {        me.afterDeliverySuccess(msg)    } else {        me.afterDeliveryFailed(msg)    }}// deliver: use http.Post function to delivery msgfunc (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {    if me.successRing.Has(msg.ID) {        return true    }    t := &models.TxMsg{        GlobalID: msg.GlobalID,        SubID: msg.SubID,        Topic: msg.Topic,        CreateTime: msg.CreateTime,        Content: msg.Content,    }    j,e := json.Marshal(t)    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    defer r.Body.Close()    rep, e := ioutil.ReadAll(r.Body)    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    m := &models.OkMsg{}    e = json.Unmarshal(rep, m)    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    if m.OK {        return true    } else {        logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }}// handleDeliveringMsg: if delivery timeout, then retry deliveryfunc (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {    now := time.Now().UnixNano()    if msg.UpdateTime + gDeliveryTimeoutNanos > now {        return    }    // delivery timeout    me.afterDeliveryTimeout(msg)}// afterDeliverySuccess: if done, move msg to success queuefunc (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {    if me.successRing.Has(msg.ID) {        return    }    me.successRing.Push(msg)    err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {        r,e := db.Exec(    "delete from delivery_queue where id=? and update_time=? and status_flag=1",            msg.ID,            msg.UpdateTime,        )        if e != nil {            return e        }        rows, e := r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        r, e = db.Exec(            "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",            msg.ID,            msg.ClientID,            time.Now().UnixNano(),        )        if e != nil {            return e        }        rows, e = r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        return nil    })    if err != nil {        logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())    } else {        logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)    }}// afterDeliveryFailed: if failed, do nothing but just log itfunc (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {    logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}// afterDeliveryTimeout: if timeout, then reset status and retryfunc (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {    err := database.DB(func(db *sqlx.DB) error {        r,e := db.Exec(            "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",            msg.ID,            msg.UpdateTime,        )        if e != nil {            return e        }        rows,e := r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        return nil    })    if err != nil {        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())    } else {        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)    }}var gEmptyRowsErr = errors.New("empty rows")var gOneRowsErr = errors.New("expecting one row affected")var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))var gInitialLoadRows = 100

(未完待续)