缘起
最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
Saga 模式
- saga 模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga 由一系列的子事务“Ti”组成,每个 Ti 都有对应的弥补“Ci”,当 Ti 呈现问题时 Ci 用于解决 Ti 执行带来的问题。能够通过上面的两个公式了解 Saga 模式。T = T1 T2 … Tn
T = TCT
Saga 模式的核心理念是防止应用长期持有锁(如 14.2.2 节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga 模式满足 ACD(原子性、一致性、持久性)特色。摘自 <<Go 微服务实战 >> 刘金亮, 2021.1
指标
- 为实现 saga 模式的分布式事务, 先撸一个 pub/sub 事务音讯队列服务
-
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局 ID 和子事务 ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标 (Day 4)
-
欠缺投递 worker
- 未解决音讯: 标记, 并尝试投递
- 已解决音讯: 判断是否超时, 并重试投递
- 投递胜利: 挪动到胜利投递表
- 投递失败: 重置标记, 下轮重试
-
数据库表相应的细节调整
- delivery_queue: 去掉 failed_count, 减少 update_time 工夫戳
- success_queue: 去掉 sub_id, 改为 client_id, 并减少 create_time 工夫戳
- failed_queue: 因为不容许失败, 因而删除失败投递表
tDeliveryWorker.go
-
欠缺投递 worker
- 未解决音讯: 标记, 并尝试投递
- 已解决音讯: 判断是否超时, 并重试投递
- 投递胜利: 挪动到胜利投递表
- 投递失败: 重置标记, 下轮重试
package delivery
import (
"bytes"
"encoding/json"
"errors"
"github.com/jmoiron/sqlx"
"io/ioutil"
"learning/gooop/saga/mqs/database"
"learning/gooop/saga/mqs/logger"
"learning/gooop/saga/mqs/models"
"net/http"
"time"
)
type tDeliveryWorker struct {info *tWorkerInfo}
func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {it := new(tDeliveryWorker)
it.info = info
go it.beginMainLoop()
return it
}
func (me *tDeliveryWorker) beginMainLoop() {for !me.isExpired() {ok, msg := me.peek()
if ok {
switch msg.StatusFlag {
case 0:
// 未解决的音讯
me.handleUndeliveredMsg(msg)
break
case 1:
// 解决中的音讯
me.handleDeliveringMsg(msg)
break
}
} else {time.Sleep(time.Duration(1) * time.Second)
}
}
}
func (me *tDeliveryWorker) isExpired() bool {return time.Now().UnixNano() >= me.info.ExpireTime}
// peek: 从待投递队列中获取最早的一条记录
func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {msg := &models.QueuedMsg{}
e := database.DB(func(db *sqlx.DB) error {
rows, err := db.Queryx(
"select * from delivery_queue where client_id=? order by create_time asc limit 1",
me.info.ClientID,
)
if err != nil {return err}
if rows.Next() {err = rows.StructScan(msg)
if err != nil {return err}
return nil
} else {return gEmptyRowsErr}
})
if e != nil {return false, nil} else {return true, msg}
}
// handleUndeliveredMsg: if msg unhandled, then try to deliver it
func (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {err := database.DB(func(db *sqlx.DB) error {now := time.Now().UnixNano()
r,e := db.Exec(
"update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",
now,
msg.ID,
msg.UpdateTime,
)
if e != nil {return e}
rows, e := r.RowsAffected()
if e != nil {return e}
if rows != 1 {return gOneRowsErr}
msg.UpdateTime = now
return nil
})
if err != nil {logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
return
}
if me.deliver(msg) {me.afterDeliverySuccess(msg)
} else {me.afterDeliveryFailed(msg)
}
}
// deliver: use http.Post function to delivery msg
func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
t := &models.TxMsg{
GlobalID: msg.GlobalID,
SubID: msg.SubID,
Topic: msg.Topic,
CreateTime: msg.CreateTime,
Content: msg.Content,
}
j,e := json.Marshal(t)
if e != nil {logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
return false
}
r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
if e != nil {logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
return false
}
defer r.Body.Close()
rep, e := ioutil.ReadAll(r.Body)
if e != nil {logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
return false
}
m := &models.OkMsg{}
e = json.Unmarshal(rep, m)
if e != nil {logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
return false
}
if m.OK {return true} else {logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
return false
}
}
// handleDeliveringMsg: if delivery timeout, then retry delivery
func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {now := time.Now().UnixNano()
if msg.UpdateTime + gDeliveryTimeoutNanos > now {return}
// delivery timeout
me.afterDeliveryTimeout(msg)
}
// afterDeliverySuccess: if done, move msg to success queue
func (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
r,e := db.Exec(
"delete from delivery_queue where id=? and update_time=? and status_flag=1",
msg.ID,
msg.UpdateTime,
)
if e != nil {return e}
rows, e := r.RowsAffected()
if e != nil {return e}
if rows != 1 {return gOneRowsErr}
r, e = db.Exec("insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",
msg.ID,
msg.ClientID,
time.Now().UnixNano(),
)
if e != nil {return e}
rows, e = r.RowsAffected()
if e != nil {return e}
if rows != 1 {return gOneRowsErr}
return nil
})
if err != nil {logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
} else {logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
}
}
// afterDeliveryFailed: if failed, do nothing but just log it
func (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
}
// afterDeliveryTimeout: if timeout, then reset status and retry
func (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {err := database.DB(func(db *sqlx.DB) error {
r,e := db.Exec(
"update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",
msg.ID,
msg.UpdateTime,
)
if e != nil {return e}
rows,e := r.RowsAffected()
if e != nil {return e}
if rows != 1 {return gOneRowsErr}
return nil
})
if err != nil {logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
} else {logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
}
}
var gEmptyRowsErr = errors.New("empty rows")
var gOneRowsErr = errors.New("expecting one row affected")
var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))
database.go
-
数据库表相应的细节调整
- delivery_queue: 去掉 failed_count, 减少 update_time 工夫戳
- success_queue: 去掉 sub_id, 改为 client_id, 并减少 create_time 工夫戳
- failed_queue: 因为不容许失败, 因而删除失败投递表
package database
import "github.com/jmoiron/sqlx"
import _ "github.com/mattn/go-sqlite3"
type DBFunc func(db *sqlx.DB) error
type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error
func init() {
// must prepare tables
err := DB(initDB)
if err != nil {panic(err)
}
}
func initDB(db *sqlx.DB) error {
// 订阅者 / 消费者: subscriber
_, e := db.Exec(`create table if not exists subscriber(
id integer primary key autoincrement,
client_id varchar(50) unique not null,
topic varchar(100) not null,
notify_url varchar(500) not null,
expire_time integer
)`)
if e != nil {return e}
// 事务音讯: tx_msg
_, e = db.Exec(`create table if not exists tx_msg (
id integer primary key autoincrement,
global_id string varchar(50) not null,
sub_id string varchar(50) unique not null,
sender_id varchar(50) not null,
create_time integer not null,
topic varchar(100) not null,
content nvarchar(2048) not null
)`)
if e != nil {return e}
// 投递队列: delivery_queue
_, e = db.Exec(`create table if not exists delivery_queue (
id integer primary key autoincrement,
client_id varchar(50) not null,
notify_url varchar(500) not null,
msg_id integer not null,
global_id string varchar(50) not null,
sub_id string varchar(50) unique not null,
sender_id varchar(50) not null,
create_time integer not null,
topic varchar(100) not null,
content nvarchar(2048) not null,
status_flag integer not null,
update_time integer not null
)`)
if e != nil {return e}
// 胜利投递队列: success_queue
_, e = db.Exec(`create table if not exists success_queue (
id integer primary key autoincrement,
msg_id integer not null,
client_id varchar(50) not null,
create_time integer not null
)`)
if e != nil {return e}
// // 投递失败队列: failed_queue
// _, e = db.Exec(`create table if not exists failed_queue (
// id integer primary key autoincrement,
// msg_id integer not null,
// client_id varchar(50) not null,
// create_time integer not null
//)`)
// if e != nil {
// return e
// }
return nil
}
func open() (*sqlx.DB, error) {return sqlx.Open("sqlite3", "./mqs.db")
}
func DB(action DBFunc) error {db,err := open()
if err != nil {return err}
defer func() { _ = db.Close() }()
return action(db)
}
func TX(action TXFunc) error {return DB(func(db *sqlx.DB) error {tx, err := db.Beginx()
if err != nil {return err}
err = action(db, tx)
if err == nil {return tx.Commit()
} else {return tx.Rollback()
}
})
}
(未完待续)