缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 4)

  • 欠缺投递worker

    • 未解决音讯: 标记, 并尝试投递
    • 已解决音讯: 判断是否超时, 并重试投递
    • 投递胜利: 挪动到胜利投递表
    • 投递失败: 重置标记, 下轮重试
  • 数据库表相应的细节调整

    • delivery_queue: 去掉failed_count, 减少update_time工夫戳
    • success_queue: 去掉sub_id, 改为client_id, 并减少create_time工夫戳
    • failed_queue: 因为不容许失败, 因而删除失败投递表

tDeliveryWorker.go

  • 欠缺投递worker

    • 未解决音讯: 标记, 并尝试投递
    • 已解决音讯: 判断是否超时, 并重试投递
    • 投递胜利: 挪动到胜利投递表
    • 投递失败: 重置标记, 下轮重试
package deliveryimport (    "bytes"    "encoding/json"    "errors"    "github.com/jmoiron/sqlx"    "io/ioutil"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "net/http"    "time")type tDeliveryWorker struct {    info *tWorkerInfo}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {    it := new(tDeliveryWorker)    it.info = info    go it.beginMainLoop()    return it}func (me *tDeliveryWorker) beginMainLoop() {    for !me.isExpired() {        ok, msg := me.peek()        if ok {            switch msg.StatusFlag {            case 0:                // 未解决的音讯                me.handleUndeliveredMsg(msg)                break            case 1:                // 解决中的音讯                me.handleDeliveringMsg(msg)                break            }        } else {            time.Sleep(time.Duration(1) * time.Second)        }    }}func (me *tDeliveryWorker) isExpired() bool {    return time.Now().UnixNano() >= me.info.ExpireTime}// peek: 从待投递队列中获取最早的一条记录func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {    msg := &models.QueuedMsg{}    e := database.DB(func(db *sqlx.DB) error {        rows, err := db.Queryx(            "select * from delivery_queue where client_id=? order by create_time asc limit 1",            me.info.ClientID,        )        if err != nil {            return err        }        if rows.Next() {            err = rows.StructScan(msg)            if err != nil {                return err            }            return nil        } else {            return gEmptyRowsErr        }    })    if e != nil {        return false, nil    } else {        return true, msg    }}// handleUndeliveredMsg: if msg unhandled, then try to deliver itfunc (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {    err := database.DB(func(db *sqlx.DB) error {        now := time.Now().UnixNano()        r,e := db.Exec(            "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",            now,            msg.ID,            msg.UpdateTime,        )        if e != nil {            return e        }        rows, e := r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        msg.UpdateTime = now        return nil    })    if err != nil {        logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())        return    }    if me.deliver(msg) {        me.afterDeliverySuccess(msg)    } else {        me.afterDeliveryFailed(msg)    }}// deliver: use http.Post function to delivery msgfunc (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {    t := &models.TxMsg{        GlobalID: msg.GlobalID,        SubID: msg.SubID,        Topic: msg.Topic,        CreateTime: msg.CreateTime,        Content: msg.Content,    }    j,e := json.Marshal(t)    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    defer r.Body.Close()    rep, e := ioutil.ReadAll(r.Body)    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    m := &models.OkMsg{}    e = json.Unmarshal(rep, m)    if e != nil {        logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }    if m.OK {        return true    } else {        logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)        return false    }}// handleDeliveringMsg: if delivery timeout, then retry deliveryfunc (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {    now := time.Now().UnixNano()    if msg.UpdateTime + gDeliveryTimeoutNanos > now {        return    }    // delivery timeout    me.afterDeliveryTimeout(msg)}// afterDeliverySuccess: if done, move msg to success queuefunc (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {    err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {        r,e := db.Exec(    "delete from delivery_queue where id=? and update_time=? and status_flag=1",            msg.ID,            msg.UpdateTime,        )        if e != nil {            return e        }        rows, e := r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        r, e = db.Exec(            "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",            msg.ID,            msg.ClientID,            time.Now().UnixNano(),        )        if e != nil {            return e        }        rows, e = r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        return nil    })    if err != nil {        logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())    } else {        logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)    }}// afterDeliveryFailed: if failed, do nothing but just log itfunc (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {    logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}// afterDeliveryTimeout: if timeout, then reset status and retryfunc (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {    err := database.DB(func(db *sqlx.DB) error {        r,e := db.Exec(            "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",            msg.ID,            msg.UpdateTime,        )        if e != nil {            return e        }        rows,e := r.RowsAffected()        if e != nil {            return e        }        if rows != 1 {            return gOneRowsErr        }        return nil    })    if err != nil {        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())    } else {        logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)    }}var gEmptyRowsErr = errors.New("empty rows")var gOneRowsErr = errors.New("expecting one row affected")var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))

database.go

  • 数据库表相应的细节调整

    • delivery_queue: 去掉failed_count, 减少update_time工夫戳
    • success_queue: 去掉sub_id, 改为client_id, 并减少create_time工夫戳
    • failed_queue: 因为不容许失败, 因而删除失败投递表
package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() {    // must prepare tables    err := DB(initDB)    if err != nil {        panic(err)    }}func initDB(db *sqlx.DB) error {    // 订阅者/消费者: subscriber    _, e := db.Exec(`create table if not exists subscriber(    id integer primary key autoincrement,    client_id varchar(50) unique not null,    topic varchar(100) not null,    notify_url varchar(500) not null,    expire_time integer)`)    if e != nil {        return e    }    // 事务音讯: tx_msg    _, e = db.Exec(`create table if not exists tx_msg (    id integer primary key autoincrement,    global_id string varchar(50) not null,    sub_id string varchar(50) unique not null,    sender_id varchar(50) not null,    create_time integer not null,    topic varchar(100) not null,    content nvarchar(2048) not null)`)    if e != nil {        return e    }    // 投递队列: delivery_queue    _, e = db.Exec(`create table if not exists delivery_queue (    id integer primary key autoincrement,        client_id varchar(50) not null,    notify_url varchar(500) not null,        msg_id integer not null,    global_id string varchar(50) not null,    sub_id string varchar(50) unique not null,    sender_id varchar(50) not null,    create_time integer not null,    topic varchar(100) not null,    content nvarchar(2048) not null,        status_flag integer not null,    update_time integer not null)`)    if e != nil {        return e    }    // 胜利投递队列: success_queue    _, e = db.Exec(`create table if not exists success_queue (    id integer primary key autoincrement,    msg_id integer not null,    client_id varchar(50) not null,    create_time integer not null)`)    if e != nil {        return e    }//    // 投递失败队列: failed_queue//    _, e = db.Exec(`create table if not exists failed_queue (//    id integer primary key autoincrement,//    msg_id integer not null,//    client_id varchar(50) not null,//    create_time integer not null//)`)//    if e != nil {//        return e//    }    return nil}func open() (*sqlx.DB, error) {    return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error {    db,err := open()    if err != nil {        return err    }    defer func() { _ = db.Close() }()    return action(db)}func TX(action TXFunc) error {    return DB(func(db *sqlx.DB) error {        tx, err := db.Beginx()        if err != nil {            return err        }        err = action(db, tx)        if err == nil {            return tx.Commit()        } else {            return tx.Rollback()        }    })}

(未完待续)