共计 4149 个字符,预计需要花费 11 分钟才能阅读完成。
缘起
最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
Saga 模式
- saga 模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga 由一系列的子事务“Ti”组成,每个 Ti 都有对应的弥补“Ci”,当 Ti 呈现问题时 Ci 用于解决 Ti 执行带来的问题。能够通过上面的两个公式了解 Saga 模式。T = T1 T2 … Tn
T = TCT
Saga 模式的核心理念是防止应用长期持有锁(如 14.2.2 节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga 模式满足 ACD(原子性、一致性、持久性)特色。摘自 <<Go 微服务实战 >> 刘金亮, 2021.1
指标
- 为实现 saga 模式的分布式事务, 先撸一个 pub/sub 事务音讯队列服务
-
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局 ID 和子事务 ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标 (Day 2)
- 音讯的设计
- 音讯的长久化
- 投递队列的长久化
- 投递状态的长久化
设计
- TxMsg: 事务音讯模型
- database: 增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表
- publish: 音讯公布 api
- routers: 增加 /publish 路由
TxMsg.go
事务音讯模型
package models
// 事务音讯体
type TxMsg struct {
// 全局事务 ID
GlobalID string
// 子事务 ID
SubID string
// 发送者 ID
SenderID string
// 工夫戳, 应用 time.Now().UnixNano()
CreateTime int64
// 主题, 即音讯类型
Topic string
// 音讯内容, 个别是 json
Content string
}
database.go
增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表
package database
import "github.com/jmoiron/sqlx"
import _ "github.com/mattn/go-sqlite3"
type DBFunc func(db *sqlx.DB) error
type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error
func init() {
// must prepare tables
err := DB(initDB)
if err != nil {panic(err)
}
}
func initDB(db *sqlx.DB) error {
// 订阅者 / 消费者: subscriber
_, e := db.Exec(`create table if not exists subscriber(
id integer primary key autoincrement,
client_id varchar(50) unique not null,
topic varchar(100) not null,
notify_url varchar(500) not null,
expire_time integer
)`)
if e != nil {return e}
// 事务音讯: tx_msg
_, e = db.Exec(`create table if not exists tx_msg (
id integer primary key autoincrement,
global_id string varchar(50) not null,
sub_id string varchar(50) unique not null,
sender_id varchar(50) not null,
create_time integer not null,
topic varchar(100) not null,
content nvarchar(2048) not null
)`)
if e != nil {return e}
// 投递队列: delivery_queue
_, e = db.Exec(`create table if not exists delivery_queue (
id integer primary key autoincrement,
msg_id integer not null,
sub_id integer not null,
status_flag integer not null,
failed_count integer not null
)`)
if e != nil {return e}
// 胜利投递队列: success_queue
_, e = db.Exec(`create table if not exists success_queue (
id integer primary key autoincrement,
msg_id integer not null,
sub_id integer not null
)`)
if e != nil {return e}
// 投递失败队列: failed_queue
_, e = db.Exec(`create table if not exists failed_queue (
id integer primary key autoincrement,
msg_id integer not null,
sub_id integer not null
)`)
if e != nil {return e}
return nil
}
func open() (*sqlx.DB, error) {return sqlx.Open("sqlite3", "./mqs.db")
}
func DB(action DBFunc) error {db,err := open()
if err != nil {return err}
defer func() { _ = db.Close() }()
return action(db)
}
func TX(action TXFunc) error {return DB(func(db *sqlx.DB) error {tx, err := db.Beginx()
if err != nil {return err}
err = action(db, tx)
if err == nil {return tx.Commit()
} else {return tx.Rollback()
}
})
}
publish.go
音讯公布 api
package handlers
import (
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/jmoiron/sqlx"
"learning/gooop/saga/mqs/database"
"learning/gooop/saga/mqs/logger"
"learning/gooop/saga/mqs/models"
"net/http"
"time"
)
func Publish(c *gin.Context) {
// parse request
msg := &models.TxMsg{}
if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
c.AbortWithStatusJSON(
http.StatusInternalServerError,
gin.H{"ok": false, "error": err.Error()})
return
}
// fixme: validate msg
// save to db
ids := []int64{0}
err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {id, e := saveTxMsg(db, tx, msg)
if e != nil {return e}
ids[0] = id
return nil
})
if ids[0] > 0 {newMsgId := ids[0]
logger.Logf("publish new msg: %d", newMsgId)
// todo: 新增音讯, 开始投送
}
// reply
if err != nil {c.JSON(http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()})
} else {c.JSON(http.StatusOK, gin.H { "ok": true})
}
}
func saveTxMsg(db *sqlx.DB, tx *sqlx.Tx, msg *models.TxMsg) (int64,error) {
// insert tx_msg
r, e := db.Exec(`replace into tx_msg(global_id, sub_id, sender_id, create_time, topic, content) values(?,?,?,?,?,?)`,
msg.GlobalID, msg.SubID, msg.SenderID, msg.CreateTime, msg.Topic, msg.Content,
)
if e != nil {return 0,e}
// get last insert id
id,e := r.LastInsertId()
if e != nil {return 0,e}
if id > 0 {
// copy to delivery queue
now := time.Now().UnixNano()
r, e = db.Exec(`
insert into delivery_queue(msg_id, sub_id, status_flag, failed_count)
select
?, s.id, 0, 0
from
sub_info s
where
s.expire_time>?
`, id, now)
if e != nil {return 0,e}
}
return id, nil
}
routers.go
增加 /publish 路由
package routers
import (
"github.com/gin-gonic/gin"
"learning/gooop/saga/mqs/handlers"
)
func RegisterRouters() *gin.Engine {r := gin.Default()
r.Use(gin.Logger())
r.GET("/ping", handlers.Ping)
r.POST("/subscribe", handlers.Subscribe)
r.POST("/publish", handlers.Publish)
return r
}
(未完待续)
正文完