缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1
指标
- 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标(Day 2)
- 音讯的设计
- 音讯的长久化
- 投递队列的长久化
- 投递状态的长久化
设计
- TxMsg: 事务音讯模型
- database: 增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表
- publish: 音讯公布api
- routers: 增加/publish路由
TxMsg.go
事务音讯模型
package models// 事务音讯体type TxMsg struct { // 全局事务ID GlobalID string // 子事务ID SubID string // 发送者ID SenderID string // 工夫戳, 应用time.Now().UnixNano() CreateTime int64 // 主题, 即音讯类型 Topic string // 音讯内容, 个别是json Content string}
database.go
增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表
package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() { // must prepare tables err := DB(initDB) if err != nil { panic(err) }}func initDB(db *sqlx.DB) error { // 订阅者/消费者: subscriber _, e := db.Exec(`create table if not exists subscriber( id integer primary key autoincrement, client_id varchar(50) unique not null, topic varchar(100) not null, notify_url varchar(500) not null, expire_time integer)`) if e != nil { return e } // 事务音讯: tx_msg _, e = db.Exec(`create table if not exists tx_msg ( id integer primary key autoincrement, global_id string varchar(50) not null, sub_id string varchar(50) unique not null, sender_id varchar(50) not null, create_time integer not null, topic varchar(100) not null, content nvarchar(2048) not null)`) if e != nil { return e } // 投递队列: delivery_queue _, e = db.Exec(`create table if not exists delivery_queue ( id integer primary key autoincrement, msg_id integer not null, sub_id integer not null, status_flag integer not null, failed_count integer not null)`) if e != nil { return e } // 胜利投递队列: success_queue _, e = db.Exec(`create table if not exists success_queue ( id integer primary key autoincrement, msg_id integer not null, sub_id integer not null)`) if e != nil { return e } // 投递失败队列: failed_queue _, e = db.Exec(`create table if not exists failed_queue ( id integer primary key autoincrement, msg_id integer not null, sub_id integer not null)`) if e != nil { return e } return nil}func open() (*sqlx.DB, error) { return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error { db,err := open() if err != nil { return err } defer func() { _ = db.Close() }() return action(db)}func TX(action TXFunc) error { return DB(func(db *sqlx.DB) error { tx, err := db.Beginx() if err != nil { return err } err = action(db, tx) if err == nil { return tx.Commit() } else { return tx.Rollback() } })}
publish.go
音讯公布api
package handlersimport ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/jmoiron/sqlx" "learning/gooop/saga/mqs/database" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "net/http" "time")func Publish(c *gin.Context) { // parse request msg := &models.TxMsg{} if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil { c.AbortWithStatusJSON( http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()}) return } // fixme: validate msg // save to db ids := []int64{0} err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error { id, e := saveTxMsg(db, tx, msg) if e != nil { return e } ids[0] = id return nil }) if ids[0] > 0 { newMsgId := ids[0] logger.Logf("publish new msg: %d", newMsgId) // todo: 新增音讯, 开始投送 } // reply if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()}) } else { c.JSON(http.StatusOK, gin.H { "ok": true }) }}func saveTxMsg(db *sqlx.DB, tx *sqlx.Tx, msg *models.TxMsg) (int64,error) { // insert tx_msg r, e := db.Exec( `replace into tx_msg(global_id, sub_id, sender_id, create_time, topic, content) values(?,?,?,?,?,?)`, msg.GlobalID, msg.SubID, msg.SenderID, msg.CreateTime, msg.Topic, msg.Content, ) if e != nil { return 0,e } // get last insert id id,e := r.LastInsertId() if e != nil { return 0,e } if id > 0 { // copy to delivery queue now := time.Now().UnixNano() r, e = db.Exec(`insert into delivery_queue(msg_id, sub_id, status_flag, failed_count) select ?, s.id, 0, 0 from sub_info swhere s.expire_time>?`, id, now) if e != nil { return 0,e } } return id, nil}
routers.go
增加/publish路由
package routersimport ( "github.com/gin-gonic/gin" "learning/gooop/saga/mqs/handlers")func RegisterRouters() *gin.Engine { r := gin.Default() r.Use(gin.Logger()) r.GET("/ping", handlers.Ping) r.POST("/subscribe", handlers.Subscribe) r.POST("/publish", handlers.Publish) return r}
(未完待续)