缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 6)

  • 终于能够进行根本功能测试了

    • 订阅接口测试: /subscribe
    • 公布接口测试: /publish
    • 告诉接口测试: /notify
  • 重构

    • database.go: 重构DDL语句, 以兼容sqlx的StructScan字段映射
    • 所有Queryx返回的rows, 必须加上defer rows.Close()
    • 增加大量的过程诊断日志
    • tLiveMsgSource: fix handleMsgPublished未接收数据就return的bug

单元测试

mqs_test.go, 顺次清空数据库, 测试订阅接口, 公布接口和告诉接口, 并诊断过程日志和数据库记录变动.

package sagaimport (    "bytes"    "encoding/json"    "errors"    "fmt"    "github.com/jmoiron/sqlx"    "io/ioutil"    "learning/gooop/saga/mqs/cmd"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "net/http"    "sync"    "testing"    "time")var gRunOnce sync.Oncefunc fnBootMQS() {    gRunOnce.Do(func() {        // boot mqs        go cmd.BootMQS()        // wait for mqs up        time.Sleep(1 * time.Second)    })}func fnAssertTrue (t *testing.T, b bool, msg string) {    if !b {        t.Fatal(msg)    }}func Test_MQS(t *testing.T) {    // prepare mqs    fnClearDB(t)    fnBootMQS()    // subscribe    fnTestSubscribe(t)    t.Log("passed fnTestSubscribe")    // publish and notify    fnTestPublishAndNotify(t)    t.Log("passed fnTestPublishAndNotify")}func fnTestPublishAndNotify(t *testing.T) {    t.Log("testing fnTestPublishAndNotify")    msg := &models.TxMsg{        GlobalID: "test-global-id",        SubID: "test-sub-id",        SenderID: "test-sender-id",        Topic: "test-topic",        Content: "test-content",    }    fnPost(t, msg, "http://localhost:3333/publish")    // check log    fnAssertTrue(t, logger.Count("handlers.Publish, msg=test-global-id")==1, "expecting log: handlers.Publish, msg=test-global-id")    // check notify    time.Sleep(100 * time.Millisecond)    fnAssertTrue(t, logger.Count("tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id")==1, "expecting log: tLiveMsgSource.handleMsgPublished")    fnAssertTrue(t, logger.Count("tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id")==1, "expecting log: tDeliveryWorker.afterDeliverySuccess")    fnAssertTrue(t, logger.Count("handlers.Notify, msg=")==1, "expecting log: handlers.Notify, msg=")    // check success queue    fnAssertTrue(t, fnDBCount(t, "select count(1) from success_queue where ClientID='test-client'")==1, "expectiang db.success_queue.count == 1")}func fnClearDB(t *testing.T) {    fnDBExec(t, "delete from subscriber")    fnDBExec(t, "delete from tx_msg")    fnDBExec(t, "delete from delivery_queue")    fnDBExec(t, "delete from success_queue")}func fnDBCount(t *testing.T, sql string, args... interface{}) int {    sum := []int{ 0 }    err := database.DB(func(db *sqlx.DB) error {        r,e := db.Queryx(sql, args...)        if e != nil {            return e        }        defer r.Close()        if !r.Next() {            return errors.New("empty rows")        }        e = r.Scan(&sum[0])        if e != nil {            return e        }        return nil    })    if err != nil {        t.Fatal(err)    }    return sum[0]}func fnDBExec(t *testing.T, sql string, args... interface{}) int {    rows := []int64{ 0 }    err := database.DB(func(db *sqlx.DB) error {        r,e := db.Exec(sql, args...)        if e != nil {            return e        }        rows[0], e = r.RowsAffected()        if e != nil {            return e        }        return nil    })    if err != nil {        t.Fatal(err)    }    return int(rows[0])}func fnTestSubscribe(t *testing.T) {    t.Log("testing fnTestSubscribe")    // clear subscriber    fnDBExec(t, "delete from subscriber")    msg := &models.SubscribeMsg{        ClientID: "test-client",        Topic: "test-topic",        NotifyUrl: "http://localhost:3333/notify",        ExpireTime: time.Now().UnixNano() + int64(30*time.Second),    }    fnPost(t, msg, "http://localhost:3333/subscribe")    // check log    fnAssertTrue(t, logger.Count("handlers.Subscribe, event=subscriber.registered") == 1, "expecting event=subscriber.registered")    // check db    count := fnDBCount(t, "select count(1) as n from subscriber where ClientID=? and topic=?", msg.ClientID, msg.Topic)    fnAssertTrue(t, count == 1, "expecting subscriber.count == 1")}func fnPost(t *testing.T, msg interface{}, url string) {    body,_ := json.Marshal(msg)    rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))    if err != nil {        t.Fatal(err)    }    defer rsp.Body.Close()    j, err := ioutil.ReadAll(rsp.Body)    if err != nil {        t.Fatal(err)    }    ok := &models.OkMsg{}    err = json.Unmarshal(j, ok)    if err != nil {        t.Fatal(err)    }    fnAssertTrue(t, ok.OK, fmt.Sprintf("expecting replying ok from %s", url))}

测试输入

依据assert失败提醒, 逐渐排查诊断日志, 次要是sqlx字段映射谬误, 和未及时调用rows.Close()谬误.

$ go test -v mqs_test.go === RUN   Test_MQSeventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production. - using env:   export GIN_MODE=release - using code:  gin.SetMode(gin.ReleaseMode)[GIN-debug] GET    /ping                     --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)[GIN-debug] POST   /subscribe                --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)[GIN-debug] POST   /publish                  --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)[GIN-debug] POST   /notify                   --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)[GIN-debug] Listening and serving HTTP on :3333tDeliveryService.beginCreatingWorkerstDeliveryService.beginCleanExpiredWorkers    mqs_test.go:139: testing fnTestSubscribehandlers.Subscribe, event=subscriber.registered, msg=&{test-client test-topic http://localhost:3333/notify 1615868155394998875}eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered[GIN] 2021/03/16 - 12:15:25 | 200 |    8.118873ms |             ::1 | POST     "/subscribe"[GIN] 2021/03/16 - 12:15:25 | 200 |    8.214968ms |             ::1 | POST     "/subscribe"tDeliveryWorker.afterInitialLoad, clientID=test-client, rows=0database.DB, err=empty rows    mqs_test.go:45: passed fnTestSubscribe    mqs_test.go:54: testing fnTestPublishAndNotifyhandlers.Publish, msg=test-global-id/test-sub-id/test-topic, msgId=15[GIN] 2021/03/16 - 12:15:25 | 200 |   15.200109ms |             ::1 | POST     "/publish"[GIN] 2021/03/16 - 12:15:25 | 200 |   15.216578ms |             ::1 | POST     "/publish"handlers.Publish, pubLiveMsg 15handlers.Publish, pubLiveMsg, msgId=15, rows=1handlers.Publish, event=msg.published, clientID=test-client, msg=test-global-id/test-sub-id/http://localhost:3333/notifyeventbus.Pub, event=msg.published, handler=tLiveMsgSource.test-clienttLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id/test-sub-id/test-topichandlers.Notify, msg=&{test-global-id test-sub-id  0 test-topic test-content}[GIN] 2021/03/16 - 12:15:25 | 200 |       48.38µs |             ::1 | POST     "/notify"[GIN] 2021/03/16 - 12:15:25 | 200 |     110.659µs |             ::1 | POST     "/notify"tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id/test-sub-id    mqs_test.go:49: passed fnTestPublishAndNotify--- PASS: Test_MQS (1.18s)PASSok      command-line-arguments  1.187s

(未完待续)