缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 2)

  • 音讯的设计
  • 音讯的长久化
  • 投递队列的长久化
  • 投递状态的长久化

设计

  • TxMsg: 事务音讯模型
  • database: 增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表
  • publish: 音讯公布api
  • routers: 增加/publish路由

TxMsg.go

事务音讯模型

package models// 事务音讯体type TxMsg struct {    // 全局事务ID    GlobalID string    // 子事务ID    SubID string    // 发送者ID    SenderID string    // 工夫戳, 应用time.Now().UnixNano()    CreateTime int64    // 主题, 即音讯类型    Topic string    // 音讯内容, 个别是json    Content string}

database.go

增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表

package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() {    // must prepare tables    err := DB(initDB)    if err != nil {        panic(err)    }}func initDB(db *sqlx.DB) error {    // 订阅者/消费者: subscriber    _, e := db.Exec(`create table if not exists subscriber(    id integer primary key autoincrement,    client_id varchar(50) unique not null,    topic varchar(100) not null,    notify_url varchar(500) not null,    expire_time integer)`)    if e != nil {        return e    }    // 事务音讯: tx_msg    _, e = db.Exec(`create table if not exists tx_msg (    id integer primary key autoincrement,    global_id string varchar(50) not null,    sub_id string varchar(50) unique not null,    sender_id varchar(50) not null,    create_time integer not null,    topic varchar(100) not null,    content nvarchar(2048) not null)`)    if e != nil {        return e    }    // 投递队列: delivery_queue    _, e = db.Exec(`create table if not exists delivery_queue (    id integer primary key autoincrement,    msg_id integer not null,    sub_id integer not null,    status_flag integer not null,    failed_count integer not null)`)    if e != nil {        return e    }    // 胜利投递队列: success_queue    _, e = db.Exec(`create table if not exists success_queue (    id integer primary key autoincrement,    msg_id integer not null,    sub_id integer not null)`)    if e != nil {        return e    }    // 投递失败队列: failed_queue    _, e = db.Exec(`create table if not exists failed_queue (    id integer primary key autoincrement,    msg_id integer not null,    sub_id integer not null)`)    if e != nil {        return e    }    return nil}func open() (*sqlx.DB, error) {    return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error {    db,err := open()    if err != nil {        return err    }    defer func() { _ = db.Close() }()    return action(db)}func TX(action TXFunc) error {    return DB(func(db *sqlx.DB) error {        tx, err := db.Beginx()        if err != nil {            return err        }        err = action(db, tx)        if err == nil {            return tx.Commit()        } else {            return tx.Rollback()        }    })}

publish.go

音讯公布api

package handlersimport (    "github.com/gin-gonic/gin"    "github.com/gin-gonic/gin/binding"    "github.com/jmoiron/sqlx"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "net/http"    "time")func Publish(c *gin.Context) {    // parse request    msg := &models.TxMsg{}    if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {        c.AbortWithStatusJSON(            http.StatusInternalServerError,            gin.H{ "ok": false, "error": err.Error()})        return    }    // fixme: validate msg    // save to db    ids := []int64{0}    err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {        id, e := saveTxMsg(db, tx, msg)        if e != nil {            return e        }        ids[0] = id        return nil    })    if ids[0] > 0 {        newMsgId := ids[0]        logger.Logf("publish new msg: %d", newMsgId)        // todo: 新增音讯, 开始投送    }    // reply    if err != nil {        c.JSON(http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()})    } else {        c.JSON(http.StatusOK, gin.H { "ok": true })    }}func saveTxMsg(db *sqlx.DB, tx *sqlx.Tx, msg *models.TxMsg) (int64,error) {    // insert tx_msg    r, e := db.Exec(        `replace into tx_msg(global_id, sub_id, sender_id, create_time, topic, content) values(?,?,?,?,?,?)`,        msg.GlobalID, msg.SubID, msg.SenderID, msg.CreateTime, msg.Topic, msg.Content,    )    if e != nil {        return 0,e    }    // get last insert id    id,e := r.LastInsertId()    if e != nil {        return 0,e    }    if id > 0 {        // copy to delivery queue        now := time.Now().UnixNano()        r, e = db.Exec(`insert into delivery_queue(msg_id, sub_id, status_flag, failed_count) select    ?, s.id, 0, 0 from    sub_info swhere    s.expire_time>?`, id, now)        if e != nil {            return 0,e        }    }    return id, nil}

routers.go

增加/publish路由

package routersimport (    "github.com/gin-gonic/gin"    "learning/gooop/saga/mqs/handlers")func RegisterRouters() *gin.Engine {    r := gin.Default()    r.Use(gin.Logger())    r.GET("/ping", handlers.Ping)    r.POST("/subscribe", handlers.Subscribe)    r.POST("/publish", handlers.Publish)    return r}

(未完待续)