缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1
指标
- 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标(Day 4)
欠缺投递worker
- 未解决音讯: 标记, 并尝试投递
- 已解决音讯: 判断是否超时, 并重试投递
- 投递胜利: 挪动到胜利投递表
- 投递失败: 重置标记, 下轮重试
数据库表相应的细节调整
- delivery_queue: 去掉failed_count, 减少update_time工夫戳
- success_queue: 去掉sub_id, 改为client_id, 并减少create_time工夫戳
- failed_queue: 因为不容许失败, 因而删除失败投递表
tDeliveryWorker.go
欠缺投递worker
- 未解决音讯: 标记, 并尝试投递
- 已解决音讯: 判断是否超时, 并重试投递
- 投递胜利: 挪动到胜利投递表
- 投递失败: 重置标记, 下轮重试
package deliveryimport ( "bytes" "encoding/json" "errors" "github.com/jmoiron/sqlx" "io/ioutil" "learning/gooop/saga/mqs/database" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "net/http" "time")type tDeliveryWorker struct { info *tWorkerInfo}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker { it := new(tDeliveryWorker) it.info = info go it.beginMainLoop() return it}func (me *tDeliveryWorker) beginMainLoop() { for !me.isExpired() { ok, msg := me.peek() if ok { switch msg.StatusFlag { case 0: // 未解决的音讯 me.handleUndeliveredMsg(msg) break case 1: // 解决中的音讯 me.handleDeliveringMsg(msg) break } } else { time.Sleep(time.Duration(1) * time.Second) } }}func (me *tDeliveryWorker) isExpired() bool { return time.Now().UnixNano() >= me.info.ExpireTime}// peek: 从待投递队列中获取最早的一条记录func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) { msg := &models.QueuedMsg{} e := database.DB(func(db *sqlx.DB) error { rows, err := db.Queryx( "select * from delivery_queue where client_id=? order by create_time asc limit 1", me.info.ClientID, ) if err != nil { return err } if rows.Next() { err = rows.StructScan(msg) if err != nil { return err } return nil } else { return gEmptyRowsErr } }) if e != nil { return false, nil } else { return true, msg }}// handleUndeliveredMsg: if msg unhandled, then try to deliver itfunc (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { now := time.Now().UnixNano() r,e := db.Exec( "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?", now, msg.ID, msg.UpdateTime, ) if e != nil { return e } rows, e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } msg.UpdateTime = now return nil }) if err != nil { logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) return } if me.deliver(msg) { me.afterDeliverySuccess(msg) } else { me.afterDeliveryFailed(msg) }}// deliver: use http.Post function to delivery msgfunc (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool { t := &models.TxMsg{ GlobalID: msg.GlobalID, SubID: msg.SubID, Topic: msg.Topic, CreateTime: msg.CreateTime, Content: msg.Content, } j,e := json.Marshal(t) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j)) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } defer r.Body.Close() rep, e := ioutil.ReadAll(r.Body) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } m := &models.OkMsg{} e = json.Unmarshal(rep, m) if e != nil { logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false } if m.OK { return true } else { logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) return false }}// handleDeliveringMsg: if delivery timeout, then retry deliveryfunc (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) { now := time.Now().UnixNano() if msg.UpdateTime + gDeliveryTimeoutNanos > now { return } // delivery timeout me.afterDeliveryTimeout(msg)}// afterDeliverySuccess: if done, move msg to success queuefunc (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) { err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error { r,e := db.Exec( "delete from delivery_queue where id=? and update_time=? and status_flag=1", msg.ID, msg.UpdateTime, ) if e != nil { return e } rows, e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } r, e = db.Exec( "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)", msg.ID, msg.ClientID, time.Now().UnixNano(), ) if e != nil { return e } rows, e = r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } return nil }) if err != nil { logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) } else { logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) }}// afterDeliveryFailed: if failed, do nothing but just log itfunc (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) { logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}// afterDeliveryTimeout: if timeout, then reset status and retryfunc (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { r,e := db.Exec( "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?", msg.ID, msg.UpdateTime, ) if e != nil { return e } rows,e := r.RowsAffected() if e != nil { return e } if rows != 1 { return gOneRowsErr } return nil }) if err != nil { logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error()) } else { logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID) }}var gEmptyRowsErr = errors.New("empty rows")var gOneRowsErr = errors.New("expecting one row affected")var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))
database.go
数据库表相应的细节调整
- delivery_queue: 去掉failed_count, 减少update_time工夫戳
- success_queue: 去掉sub_id, 改为client_id, 并减少create_time工夫戳
- failed_queue: 因为不容许失败, 因而删除失败投递表
package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() { // must prepare tables err := DB(initDB) if err != nil { panic(err) }}func initDB(db *sqlx.DB) error { // 订阅者/消费者: subscriber _, e := db.Exec(`create table if not exists subscriber( id integer primary key autoincrement, client_id varchar(50) unique not null, topic varchar(100) not null, notify_url varchar(500) not null, expire_time integer)`) if e != nil { return e } // 事务音讯: tx_msg _, e = db.Exec(`create table if not exists tx_msg ( id integer primary key autoincrement, global_id string varchar(50) not null, sub_id string varchar(50) unique not null, sender_id varchar(50) not null, create_time integer not null, topic varchar(100) not null, content nvarchar(2048) not null)`) if e != nil { return e } // 投递队列: delivery_queue _, e = db.Exec(`create table if not exists delivery_queue ( id integer primary key autoincrement, client_id varchar(50) not null, notify_url varchar(500) not null, msg_id integer not null, global_id string varchar(50) not null, sub_id string varchar(50) unique not null, sender_id varchar(50) not null, create_time integer not null, topic varchar(100) not null, content nvarchar(2048) not null, status_flag integer not null, update_time integer not null)`) if e != nil { return e } // 胜利投递队列: success_queue _, e = db.Exec(`create table if not exists success_queue ( id integer primary key autoincrement, msg_id integer not null, client_id varchar(50) not null, create_time integer not null)`) if e != nil { return e }// // 投递失败队列: failed_queue// _, e = db.Exec(`create table if not exists failed_queue (// id integer primary key autoincrement,// msg_id integer not null,// client_id varchar(50) not null,// create_time integer not null//)`)// if e != nil {// return e// } return nil}func open() (*sqlx.DB, error) { return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error { db,err := open() if err != nil { return err } defer func() { _ = db.Close() }() return action(db)}func TX(action TXFunc) error { return DB(func(db *sqlx.DB) error { tx, err := db.Beginx() if err != nil { return err } err = action(db, tx) if err == nil { return tx.Commit() } else { return tx.Rollback() } })}
(未完待续)