缘起
最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
Saga 模式
- saga 模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga 由一系列的子事务“Ti”组成,每个 Ti 都有对应的弥补“Ci”,当 Ti 呈现问题时 Ci 用于解决 Ti 执行带来的问题。能够通过上面的两个公式了解 Saga 模式。T = T1 T2 … Tn
T = TCT
Saga 模式的核心理念是防止应用长期持有锁(如 14.2.2 节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga 模式满足 ACD(原子性、一致性、持久性)特色。摘自 <<Go 微服务实战 >> 刘金亮, 2021.1
指标
- 为实现 saga 模式的分布式事务, 先撸一个 pub/sub 事务音讯队列服务
-
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局 ID 和子事务 ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标 (Day 6)
-
终于能够进行根本功能测试了
- 订阅接口测试: /subscribe
- 公布接口测试: /publish
- 告诉接口测试: /notify
-
重构
- database.go: 重构 DDL 语句, 以兼容 sqlx 的 StructScan 字段映射
- 所有 Queryx 返回的 rows, 必须加上 defer rows.Close()
- 增加大量的过程诊断日志
- tLiveMsgSource: fix handleMsgPublished 未接收数据就 return 的 bug
单元测试
mqs_test.go, 顺次清空数据库, 测试订阅接口, 公布接口和告诉接口, 并诊断过程日志和数据库记录变动.
package saga
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/jmoiron/sqlx"
"io/ioutil"
"learning/gooop/saga/mqs/cmd"
"learning/gooop/saga/mqs/database"
"learning/gooop/saga/mqs/logger"
"learning/gooop/saga/mqs/models"
"net/http"
"sync"
"testing"
"time"
)
var gRunOnce sync.Once
func fnBootMQS() {gRunOnce.Do(func() {
// boot mqs
go cmd.BootMQS()
// wait for mqs up
time.Sleep(1 * time.Second)
})
}
func fnAssertTrue (t *testing.T, b bool, msg string) {
if !b {t.Fatal(msg)
}
}
func Test_MQS(t *testing.T) {
// prepare mqs
fnClearDB(t)
fnBootMQS()
// subscribe
fnTestSubscribe(t)
t.Log("passed fnTestSubscribe")
// publish and notify
fnTestPublishAndNotify(t)
t.Log("passed fnTestPublishAndNotify")
}
func fnTestPublishAndNotify(t *testing.T) {t.Log("testing fnTestPublishAndNotify")
msg := &models.TxMsg{
GlobalID: "test-global-id",
SubID: "test-sub-id",
SenderID: "test-sender-id",
Topic: "test-topic",
Content: "test-content",
}
fnPost(t, msg, "http://localhost:3333/publish")
// check log
fnAssertTrue(t, logger.Count("handlers.Publish, msg=test-global-id")==1, "expecting log: handlers.Publish, msg=test-global-id")
// check notify
time.Sleep(100 * time.Millisecond)
fnAssertTrue(t, logger.Count("tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id")==1, "expecting log: tLiveMsgSource.handleMsgPublished")
fnAssertTrue(t, logger.Count("tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id")==1, "expecting log: tDeliveryWorker.afterDeliverySuccess")
fnAssertTrue(t, logger.Count("handlers.Notify, msg=")==1, "expecting log: handlers.Notify, msg=")
// check success queue
fnAssertTrue(t, fnDBCount(t, "select count(1) from success_queue where ClientID='test-client'")==1,"expectiang db.success_queue.count == 1")
}
func fnClearDB(t *testing.T) {fnDBExec(t, "delete from subscriber")
fnDBExec(t, "delete from tx_msg")
fnDBExec(t, "delete from delivery_queue")
fnDBExec(t, "delete from success_queue")
}
func fnDBCount(t *testing.T, sql string, args... interface{}) int {sum := []int{0}
err := database.DB(func(db *sqlx.DB) error {r,e := db.Queryx(sql, args...)
if e != nil {return e}
defer r.Close()
if !r.Next() {return errors.New("empty rows")
}
e = r.Scan(&sum[0])
if e != nil {return e}
return nil
})
if err != nil {t.Fatal(err)
}
return sum[0]
}
func fnDBExec(t *testing.T, sql string, args... interface{}) int {rows := []int64{0}
err := database.DB(func(db *sqlx.DB) error {r,e := db.Exec(sql, args...)
if e != nil {return e}
rows[0], e = r.RowsAffected()
if e != nil {return e}
return nil
})
if err != nil {t.Fatal(err)
}
return int(rows[0])
}
func fnTestSubscribe(t *testing.T) {t.Log("testing fnTestSubscribe")
// clear subscriber
fnDBExec(t, "delete from subscriber")
msg := &models.SubscribeMsg{
ClientID: "test-client",
Topic: "test-topic",
NotifyUrl: "http://localhost:3333/notify",
ExpireTime: time.Now().UnixNano() + int64(30*time.Second),
}
fnPost(t, msg, "http://localhost:3333/subscribe")
// check log
fnAssertTrue(t, logger.Count("handlers.Subscribe, event=subscriber.registered") == 1, "expecting event=subscriber.registered")
// check db
count := fnDBCount(t, "select count(1) as n from subscriber where ClientID=? and topic=?", msg.ClientID, msg.Topic)
fnAssertTrue(t, count == 1, "expecting subscriber.count == 1")
}
func fnPost(t *testing.T, msg interface{}, url string) {body,_ := json.Marshal(msg)
rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
if err != nil {t.Fatal(err)
}
defer rsp.Body.Close()
j, err := ioutil.ReadAll(rsp.Body)
if err != nil {t.Fatal(err)
}
ok := &models.OkMsg{}
err = json.Unmarshal(j, ok)
if err != nil {t.Fatal(err)
}
fnAssertTrue(t, ok.OK, fmt.Sprintf("expecting replying ok from %s", url))
}
测试输入
依据 assert 失败提醒, 逐渐排查诊断日志, 次要是 sqlx 字段映射谬误, 和未及时调用 rows.Close() 谬误.
$ go test -v mqs_test.go
=== RUN Test_MQS
eventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
- using env: export GIN_MODE=release
- using code: gin.SetMode(gin.ReleaseMode)
[GIN-debug] GET /ping --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)
[GIN-debug] POST /subscribe --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)
[GIN-debug] POST /publish --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)
[GIN-debug] POST /notify --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)
[GIN-debug] Listening and serving HTTP on :3333
tDeliveryService.beginCreatingWorkers
tDeliveryService.beginCleanExpiredWorkers
mqs_test.go:139: testing fnTestSubscribe
handlers.Subscribe, event=subscriber.registered, msg=&{test-client test-topic http://localhost:3333/notify 1615868155394998875}
eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
[GIN] 2021/03/16 - 12:15:25 | 200 | 8.118873ms | ::1 | POST "/subscribe"
[GIN] 2021/03/16 - 12:15:25 | 200 | 8.214968ms | ::1 | POST "/subscribe"
tDeliveryWorker.afterInitialLoad, clientID=test-client, rows=0
database.DB, err=empty rows
mqs_test.go:45: passed fnTestSubscribe
mqs_test.go:54: testing fnTestPublishAndNotify
handlers.Publish, msg=test-global-id/test-sub-id/test-topic, msgId=15
[GIN] 2021/03/16 - 12:15:25 | 200 | 15.200109ms | ::1 | POST "/publish"
[GIN] 2021/03/16 - 12:15:25 | 200 | 15.216578ms | ::1 | POST "/publish"
handlers.Publish, pubLiveMsg 15
handlers.Publish, pubLiveMsg, msgId=15, rows=1
handlers.Publish, event=msg.published, clientID=test-client, msg=test-global-id/test-sub-id/http://localhost:3333/notify
eventbus.Pub, event=msg.published, handler=tLiveMsgSource.test-client
tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id/test-sub-id/test-topic
handlers.Notify, msg=&{test-global-id test-sub-id 0 test-topic test-content}
[GIN] 2021/03/16 - 12:15:25 | 200 | 48.38µs | ::1 | POST "/notify"
[GIN] 2021/03/16 - 12:15:25 | 200 | 110.659µs | ::1 | POST "/notify"
tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id/test-sub-id
mqs_test.go:49: passed fnTestPublishAndNotify
--- PASS: Test_MQS (1.18s)
PASS
ok command-line-arguments 1.187s
(未完待续)