缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和弥补动作都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1
指标
- 为实现saga模式的分布式事务, 先模仿实现一个简略的pub/sub音讯服务
- 应用sqlx + sqlite3长久化音讯, 应用gin作为http框架, 应用toml配置文件
代码构造
$ tree sagasaga└── mqs ├── cmd │ └── boot.go ├── config │ └── config.go ├── database │ └── database.go ├── handlers │ ├── ping.go │ └── subscribe.go ├── logger │ └── logger.go └── routers └── routers.go
设计
- config: 读取并解析toml配置文件
- database: 封装sqlx + sqlite3
- handlers/ping: 预留http服务保活探针接口
- handlers/subscribe: 注册一个音讯订阅者. 音讯订阅者蕴含订阅者ID, 主题和回调地址(以便推送音讯)
- routers: 注册gin路由
config.go
读取并解析toml配置文件
package configimport "github.com/BurntSushi/toml"type tomlConfig struct { MQS tServiceConfig}type tServiceConfig struct { Port int LogDir string}var gServiceConfig = &tServiceConfig{}func init() { cfg := &tomlConfig{} if _,err := toml.DecodeFile("./mqs.toml", cfg);err != nil { panic(err) } *gServiceConfig = cfg.MQS}func GetPort() int { return gServiceConfig.Port}func GetLogDir() string { return gServiceConfig.LogDir}
database.go
封装sqlx + sqlite3
package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() { // must prepare tables err := DB(func(db *sqlx.DB) error { // table: sub_info _,e := db.Exec(` create table if not exists sub_info( id integer primary key autoincrement, client_id varchar(50) unique not null, topic varchar(100) not null, callback_url varchar(500) not null )`) return e }) if err != nil { panic(err) }}func open() (*sqlx.DB, error) { return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error { db,err := open() if err != nil { return err } defer func() { _ = db.Close() }() return action(db)}func TX(action TXFunc) error { return DB(func(db *sqlx.DB) error { tx, err := db.Beginx() if err != nil { return err } err = action(db, tx) if err == nil { return tx.Commit() } else { return tx.Rollback() } })}
ping.go
预留http服务保活探针接口
package handlersimport ( "github.com/gin-gonic/gin" "net/http" "time")func Ping(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "ok": true, "time": time.Now().Format(time.RFC3339)})}
subscribe.go
注册一个音讯订阅者. 音讯订阅者蕴含订阅者ID, 主题和回调地址(以便推送音讯)
package handlersimport ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/jmoiron/sqlx" "learning/gooop/saga/mqs/database" "net/http")type tSubMsg struct { ClientID string Topic string CallbackUrl string}func Subscribe(c *gin.Context) { msg := &tSubMsg{} if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil { c.AbortWithStatusJSON( http.StatusInternalServerError, gin.H{"error": err.Error()}) return } err := database.DB(func(db *sqlx.DB) error { _,e := db.Exec( "replace into sub_info(client_id, topic, callback_url) values(?, ?, ?)", msg.ClientID, msg.Topic, msg.CallbackUrl) return e }) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) } else { c.JSON(http.StatusOK, gin.H { "ok": true }) }}
routers.go
注册gin路由
package routersimport ( "github.com/gin-gonic/gin" "learning/gooop/saga/mqs/handlers")func RegisterRouters() *gin.Engine { r := gin.Default() r.Use(gin.Logger()) r.GET("/ping", handlers.Ping) r.POST("/subscribe", handlers.Subscribe) return r}
(未完待续)