缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 3)

  • 有序,牢靠地投递音讯

    • 有序: 每个订阅者绑定独立的投递worker, 按工夫戳投递
    • 牢靠: 投递状态的长久化

设计

  • QueuedMsg: 把投递队列的音讯设计得胖一点, 以缩小join的应用
  • IEventBus: 设计一个内部消息总线, 以缩小逻辑耦合
  • tDeliveryService: 投递服务, 治理若干个投递worker
  • tWorkerInfo: 投递worker的初始化参数
  • tDeliveryWorker: 投递worker(未实现)

QueuedMsg.go

把待投递音讯设计得胖一点, 以缩小join的应用

package modelstype QueuedMsg struct {    ID int    ClientID string    NotifyURL string    MsgID int    GlobalID string    SubID string    SenderID string    CreateTime int64    Topic string    Content string    StatusFlag int    FailedCount int}

IEventBus.go

设计一个内部消息总线, 以缩小逻辑耦合

package eventbusimport "sync"type EventHandleFunc func(e string, args interface{})type IEventBus interface {    Pub(e string, args interface{})    Sub(e string, handler EventHandleFunc)}type tEventBus struct {    rwmutex *sync.RWMutex    items map[string][]EventHandleFunc}func newEventBus() IEventBus {    it := new(tEventBus)    it.init()    return it}func (me *tEventBus) init() {    me.rwmutex = new(sync.RWMutex)    me.items = make(map[string][]EventHandleFunc)}func (me *tEventBus) Pub(e string, args interface{}) {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()        handlers,ok := me.items[e]    if ok {        for _,it := range handlers {            go it(e, args)        }    }}func (me *tEventBus) Sub(e string, handler EventHandleFunc) {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    handlers,ok := me.items[e]    if ok {        me.items[e] = append(handlers, handler)    } else {        me.items[e] = []EventHandleFunc{handler }    }}var GlobalEventBus = newEventBus()

tDeliveryService.go

投递服务, 治理若干个投递worker

package deliveryimport (    "github.com/jmoiron/sqlx"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/eventbus"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "sync"    "time")type tDeliveryService struct {    rwmutex *sync.RWMutex    workers map[string]*tDeliveryWorker}func newDeliveryService() *tDeliveryService {    it := new(tDeliveryService)    it.init()    return it}func (me *tDeliveryService) init() {}func (me *tDeliveryService) handleBootEvent(e string, args interface{}) {    go me.beginCreatingWorkers()    go me.beginCleanExpiredWorkers()}func (me *tDeliveryService) beginCreatingWorkers() {    for {        e := database.DB(func(db *sqlx.DB) error {            now := time.Now().UnixNano()            rows, err := db.Queryx("select client_id, notify_url, expire_time from subscriber where expire_time>?", now)            if err != nil {                return err            }            for rows.Next() {                it := new(tWorkerInfo)                err = rows.StructScan(it)                if err != nil {                    return err                }                me.createWorker(it)            }            return nil        })        if e != nil {            logger.Logf("tDeliveryService.beginCreatingWorkers, error = %s", e.Error())        }        time.Sleep(time.Duration(5) * time.Second)    }}func (me *tDeliveryService) beginCleanExpiredWorkers() {    for range time.Tick(time.Duration(30) * time.Second) {        me.clean()    }}func (me *tDeliveryService) clean() {    me.rwmutex.RLock()    var keys []string    for k,v := range me.workers {        if v.isExpired() {            keys = append(keys, k)        }    }    me.rwmutex.RUnlock()    if len(keys) == 0 {        return    }    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    for _,k := range keys {        delete(me.workers, k)    }}func (me *tDeliveryService) handleNewSubscriber(args interface{}) {    it, ok := args.(*models.SubMsg)    if !ok {        return    }    me.createWorker(&tWorkerInfo{        it.ClientID, it.NotifyUrl, it.ExpireTime,    })}func maxInt64(a, b int64) int64 {    if a >= b {        return a    }    return b}func (me *tDeliveryService) createWorker(info *tWorkerInfo) {    me.rwmutex.RLock()    w,ok := me.workers[info.ClientID]    me.rwmutex.RUnlock()    if ok {        w.info.ExpireTime = maxInt64(w.info.ExpireTime, info.ExpireTime)        return    }    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    me.workers[info.ClientID] = newDeliveryWorker(info)}var gDeliveryService = newDeliveryService()func init() {    eventbus.GlobalEventBus.Sub("boot", gDeliveryService.handleBootEvent)    eventbus.GlobalEventBus.Sub("subscriber.new", gDeliveryService.handleBootEvent)}

tWorkerInfo.go

投递worker的初始化参数

package deliverytype tWorkerInfo struct {    ClientID string    NotifyURL string    ExpireTime int64}

tDeliveryWorker.go

投递worker, 未实现

package deliveryimport (    "bytes"    "encoding/json"    "errors"    "github.com/jmoiron/sqlx"    "io/ioutil"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "net/http"    "time")type tDeliveryWorker struct {    info *tWorkerInfo}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {    it := new(tDeliveryWorker)    it.info = info    go it.beginMainLoop()    return it}func (me *tDeliveryWorker) beginMainLoop() {    for !me.isExpired() {        ok, msg := me.peek()        if ok {            switch msg.StatusFlag {            case 0:                // 未解决的音讯                me.handleNewMsg(msg)                break            case 1:                // 解决中的音讯                me.handleDeliveringMsg(msg)                break            }        } else {            time.Sleep(time.Duration(1) * time.Second)        }    }}func (me *tDeliveryWorker) isExpired() bool {    return time.Now().UnixNano() >= me.info.ExpireTime}var gEmptyRowsErr = errors.New("empty rows")func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {    msg := &models.QueuedMsg{}    e := database.DB(func(db *sqlx.DB) error {        rows, err := db.Queryx("select * from delivery_queue where client_id=? order by create_time asc limit 1", me.info.ClientID)        if err != nil {            return err        }        if rows.Next() {            err = rows.StructScan(msg)            if err != nil {                return err            }            return nil        } else {            return gEmptyRowsErr        }    })    if e != nil {        return false, nil    } else {        return true, msg    }}func (me *tDeliveryWorker) handleNewMsg(msg *models.QueuedMsg) bool {    err := database.DB(func(db *sqlx.DB) error {        r,e := db.Exec("update delivery_queue set status_flag=1 where id=?", msg.ID)        if e != nil {            return e        }        rows, e := r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gEmptyRowsErr        }        return nil    })    if err != nil {        logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())        return false    }    if me.deliver(msg) {        // todo: move msg to success queue    } else {        // todo: msg.failed_count++    }    return false}func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {    t := &models.TxMsg{        GlobalID: msg.GlobalID,        SubID: msg.SubID,        Topic: msg.Topic,        CreateTime: msg.CreateTime,        Content: msg.Content,    }    j,e := json.Marshal(t)    if e != nil {        return false    }    r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))    if e != nil {        return false    }    defer r.Body.Close()    rep, e := ioutil.ReadAll(r.Body)    if e != nil {        return false    }    m := &models.OkMsg{}    e = json.Unmarshal(rep, m)    if e != nil {        return false    }    return m.OK}func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) bool {    // todo: check if delivery timeout    return false}

(未完待续)