缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和弥补动作都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先模仿实现一个简略的pub/sub音讯服务
  • 应用sqlx + sqlite3长久化音讯, 应用gin作为http框架, 应用toml配置文件

代码构造

$ tree sagasaga└── mqs    ├── cmd    │   └── boot.go    ├── config    │   └── config.go    ├── database    │   └── database.go    ├── handlers    │   ├── ping.go    │   └── subscribe.go    ├── logger    │   └── logger.go    └── routers        └── routers.go

设计

  • config: 读取并解析toml配置文件
  • database: 封装sqlx + sqlite3
  • handlers/ping: 预留http服务保活探针接口
  • handlers/subscribe: 注册一个音讯订阅者. 音讯订阅者蕴含订阅者ID, 主题和回调地址(以便推送音讯)
  • routers: 注册gin路由

config.go

读取并解析toml配置文件

package configimport  "github.com/BurntSushi/toml"type tomlConfig struct {    MQS tServiceConfig}type tServiceConfig struct {    Port int    LogDir string}var gServiceConfig = &tServiceConfig{}func init() {    cfg := &tomlConfig{}    if _,err := toml.DecodeFile("./mqs.toml", cfg);err != nil {        panic(err)    }    *gServiceConfig = cfg.MQS}func GetPort() int {    return gServiceConfig.Port}func GetLogDir() string {    return gServiceConfig.LogDir}

database.go

封装sqlx + sqlite3

package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() {    // must prepare tables    err := DB(func(db *sqlx.DB) error {        // table: sub_info        _,e := db.Exec(`        create table if not exists sub_info(            id integer primary key autoincrement,            client_id varchar(50) unique not null,            topic varchar(100) not null,            callback_url varchar(500) not null        )`)        return e    })    if err != nil {        panic(err)    }}func open() (*sqlx.DB, error) {    return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error {    db,err := open()    if err != nil {        return err    }    defer func() { _ = db.Close() }()    return action(db)}func TX(action TXFunc) error {    return DB(func(db *sqlx.DB) error {        tx, err := db.Beginx()        if err != nil {            return err        }        err = action(db, tx)        if err == nil {            return tx.Commit()        } else {            return tx.Rollback()        }    })}

ping.go

预留http服务保活探针接口

package handlersimport (    "github.com/gin-gonic/gin"    "net/http"    "time")func Ping(c *gin.Context) {    c.JSON(http.StatusOK, gin.H{ "ok": true, "time": time.Now().Format(time.RFC3339)})}

subscribe.go

注册一个音讯订阅者. 音讯订阅者蕴含订阅者ID, 主题和回调地址(以便推送音讯)

package handlersimport (    "github.com/gin-gonic/gin"    "github.com/gin-gonic/gin/binding"    "github.com/jmoiron/sqlx"    "learning/gooop/saga/mqs/database"    "net/http")type tSubMsg struct {    ClientID string    Topic string    CallbackUrl string}func Subscribe(c *gin.Context) {    msg := &tSubMsg{}    if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {        c.AbortWithStatusJSON(            http.StatusInternalServerError,            gin.H{"error": err.Error()})        return    }        err := database.DB(func(db *sqlx.DB) error {        _,e := db.Exec(            "replace into sub_info(client_id, topic, callback_url) values(?, ?, ?)",            msg.ClientID,            msg.Topic,            msg.CallbackUrl)        return e    })    if err != nil {        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})    } else {        c.JSON(http.StatusOK, gin.H { "ok": true })    }}

routers.go

注册gin路由

package routersimport (    "github.com/gin-gonic/gin"    "learning/gooop/saga/mqs/handlers")func RegisterRouters() *gin.Engine {    r := gin.Default()    r.Use(gin.Logger())    r.GET("/ping", handlers.Ping)    r.POST("/subscribe", handlers.Subscribe)    return r}

(未完待续)