手撸golang GO与微服务 Saga模式之8 集成测试

缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 8)

  • 创立虚构的库存服务

    • 启动时, 注册到MQ
    • 接管到订单创立的音讯时, 扣减库存
    • 扣库胜利时, 经MQ告诉订单服务扣库胜利
    • 扣库失败时, 经MQ告诉订单服务扣库失败

设计

  • IStockService: 模仿的库存服务接口
  • tStockService: 虚构库存服务, 实现IStockService接口
  • NotifySaleOrderCreated: 用于监听订单创立音讯的http回调处理器

单元测试

order_test.go

  1. 初始化10个产品库存
  2. 订单服务, 创立订单1, 尝试扣减1个库存, 预期胜利
  3. 订单服务, 创立订单2, 尝试扣减10个库存, 预期失败
  4. 校验订单1的最终状态为出库胜利
  5. 校验订单2的最终状态为出库失败
package sagaimport (    "github.com/jmoiron/sqlx"    "learning/gooop/saga/mqs/cmd"    "learning/gooop/saga/mqs/database"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/order"    "learning/gooop/saga/stock"    "sync"    "testing"    "time")var gRunOnce sync.Oncefunc fnBootMQS() {    gRunOnce.Do(func() {        // boot mqs        go cmd.BootMQS()        // wait for mqs up        time.Sleep(1 * time.Second)    })}func fnAssertTrue (t *testing.T, b bool, msg string) {    if !b {        t.Fatal(msg)    }}func Test_SagaSaleOrder(t *testing.T) {    // prepare mqs    fnClearDB(t)    fnBootMQS()    // 1 create prod stock    prodID := "test-prod-1"    err := stock.MockStockService.AddStock(prodID, 10)    if err != nil {        t.Fatal(err)    }    // create order 1    o1 := &order.SaleOrder{        OrderID: "test-order-1",        ProductID: prodID,        CustomerID: "test-customer-1",        Quantity: 1,        Price: 100,        Amount: 100,        CreateTime: time.Now().UnixNano(),        StatusFlag: order.StatusNotDelivered,    }    err = order.MockSaleOrderService.Create(o1)    if err != nil {        t.Fatal(err)    }    // create order 2    time.Sleep(10*time.Millisecond)    o2 := &order.SaleOrder{        OrderID: "test-order-2",        ProductID: prodID,        CustomerID: "test-customer-2",        Quantity: 10,        Price: 100,        Amount: 1000,        CreateTime: time.Now().UnixNano(),        StatusFlag: order.StatusNotDelivered,    }    err = order.MockSaleOrderService.Create(o2)    if err != nil {        t.Fatal(err)    }    time.Sleep(1 * time.Second)    logger.Logf("============================================")    log := "tSaleOrderService.beginSubscribeMQ, done"    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)    log = "tSaleOrderService.publishMQ, done, order=test-order-1"    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)    log = "tSaleOrderService.publishMQ, done, order=test-order-2"    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)    log = "stock.NotifySaleOrderCreated, order=test-order-1"    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)    log = "stock.NotifySaleOrderCreated, order=test-order-2"    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)    o1 = order.MockSaleOrderService.Get(o1.OrderID)    fnAssertTrue(t, o1.StatusFlag == order.StatusStockOutboundDone, "expecting o1 done")    o2 = order.MockSaleOrderService.Get(o2.OrderID)    fnAssertTrue(t, o2.StatusFlag == order.StatusStockOutboundFailed, "expecting o2 failed")    logger.Logf("test passed")}func fnClearDB(t *testing.T) {    fnDBExec(t, "delete from subscriber")    fnDBExec(t, "delete from tx_msg")    fnDBExec(t, "delete from delivery_queue")    fnDBExec(t, "delete from success_queue")}func fnDBExec(t *testing.T, sql string, args... interface{}) int {    rows := []int64{ 0 }    err := database.DB(func(db *sqlx.DB) error {        r,e := db.Exec(sql, args...)        if e != nil {            return e        }        rows[0], e = r.RowsAffected()        if e != nil {            return e        }        return nil    })    if err != nil {        t.Fatal(err)    }    return int(rows[0])}

测试输入

$ go test -v order_test.go === RUN   Test_SagaSaleOrder23:55:54.292132442 eventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production. - using env:   export GIN_MODE=release - using code:  gin.SetMode(gin.ReleaseMode)[GIN-debug] GET    /ping                     --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)[GIN-debug] POST   /subscribe                --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)[GIN-debug] POST   /publish                  --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)[GIN-debug] POST   /notify                   --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)[GIN-debug] POST   /notify/sale-order.stock.outbound --> learning/gooop/saga/order.NotifyStockOutbound (4 handlers)[GIN-debug] POST   /notify/sale-order.created --> learning/gooop/saga/stock.NotifySaleOrderCreated (4 handlers)[GIN-debug] Listening and serving HTTP on :333323:55:54.292287032 tDeliveryService.beginCleanExpiredWorkers23:55:54.292345845 tDeliveryService.beginCreatingWorkers23:55:54.356542981 handlers.Subscribe, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}23:55:54.356524325 handlers.Subscribe, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}23:55:54.365256441 handlers.Subscribe, event=subscriber.registered, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}23:55:54.365271105 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered[GIN] 2021/03/18 - 23:55:54 | 200 |    8.865173ms |             ::1 | POST     "/subscribe"[GIN] 2021/03/18 - 23:55:54 | 200 |    8.882138ms |             ::1 | POST     "/subscribe"23:55:54.365488163 tSaleOrderService.beginSubscribeMQ, done23:55:54.365861542 database.DB, err=empty rows23:55:54.366239244 tDeliveryWorker.afterInitialLoad, clientID=sale-order-service, rows=023:55:54.373588493 handlers.Subscribe, event=subscriber.registered, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}23:55:54.373605972 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered[GIN] 2021/03/18 - 23:55:54 | 200 |   17.189632ms |             ::1 | POST     "/subscribe"[GIN] 2021/03/18 - 23:55:54 | 200 |   17.205549ms |             ::1 | POST     "/subscribe"23:55:54.373843032 tStockService.beginSubscribeMQ, done23:55:54.3743926 database.DB, err=empty rows23:55:54.374499757 tDeliveryWorker.afterInitialLoad, clientID=stock-service, rows=023:55:55.292336699 tStockService.AddStock, done, prodId=test-prod-1, stock=0, delta=0, after=1023:55:55.323746568 handlers.Publish, msg=test-order-1/test-order-1/sale-order.created, msgId=112[GIN] 2021/03/18 - 23:55:55 | 200 |   31.112478ms |             ::1 | POST     "/publish"[GIN] 2021/03/18 - 23:55:55 | 200 |   31.125855ms |             ::1 | POST     "/publish"23:55:55.323811205 handlers.Publish, pubLiveMsg 11223:55:55.323910377 tSaleOrderService.publishMQ, done, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}23:55:55.324227736 handlers.Publish, pubLiveMsg, msgId=112, rows=123:55:55.324273573 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-1/test-order-1/http://localhost:3333/notify/sale-order.created23:55:55.32428051 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service23:55:55.324285512 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service23:55:55.324292286 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-1/test-order-1/sale-order.created23:55:55.324346678 tDeliveryWorker.beginPollAndDeliver, msg from live=&{98 stock-service http://localhost:3333/notify/sale-order.created 112 test-order-1 test-order-1 sale-order-service 1616082955292352151 sale-order.created {"OrderID":"test-order-1","CustomerID":"test-customer-1","ProductID":"test-prod-1","Quantity":1,"Price":100,"Amount":100,"CreateTime":1616082955292352151,"StatusFlag":0} 0 0}23:55:55.33925766 handlers.Publish, msg=test-order-2/test-order-2/sale-order.created, msgId=113[GIN] 2021/03/18 - 23:55:55 | 200 |   15.264561ms |             ::1 | POST     "/publish"[GIN] 2021/03/18 - 23:55:55 | 200 |   15.280884ms |             ::1 | POST     "/publish"23:55:55.339353768 handlers.Publish, pubLiveMsg 11323:55:55.339446893 tSaleOrderService.publishMQ, done, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}23:55:55.339909493 handlers.Publish, pubLiveMsg, msgId=113, rows=123:55:55.339919874 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-2/test-order-2/http://localhost:3333/notify/sale-order.created23:55:55.339925049 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service23:55:55.339929964 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service23:55:55.339935935 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-2/test-order-2/sale-order.created23:55:55.350117186 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-1/test-order-123:55:55.35041833 stock.NotifySaleOrderCreated, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}23:55:55.350429178 tStockService.AddStock, done, prodId=test-prod-1, stock=10, delta=-1, after=9[GIN] 2021/03/18 - 23:55:55 | 200 |      88.872µs |             ::1 | POST     "/notify/sale-order.created"[GIN] 2021/03/18 - 23:55:55 | 200 |     133.617µs |             ::1 | POST     "/notify/sale-order.created"23:55:55.350592351 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-1/test-order-123:55:55.367336707 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-1/test-order-123:55:55.36738322 tDeliveryWorker.beginPollAndDeliver, msg from live=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 0 0}23:55:55.367530495 database.DB, err=empty rows23:55:55.374978535 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-2/test-order-223:55:55.375201115 stock.NotifySaleOrderCreated, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}23:55:55.375211216 tStockService.AddStock, failed, prodId=test-prod-1, stock=9, delta=-1023:55:55.375219558 tStockService.HandleSaleOrderCreated, err=insufficient stock, order=&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}[GIN] 2021/03/18 - 23:55:55 | 200 |      102.52µs |             ::1 | POST     "/notify/sale-order.created"[GIN] 2021/03/18 - 23:55:55 | 200 |     116.933µs |             ::1 | POST     "/notify/sale-order.created"23:55:55.375354895 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-2/test-order-223:55:55.389901711 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-2/test-order-223:55:55.38993077 tDeliveryWorker.beginPollAndDeliver, msg from db=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 1 1616082955367401386}23:55:55.420121681 handlers.Publish, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound, msgId=114[GIN] 2021/03/18 - 23:55:55 | 200 |   69.507171ms |             ::1 | POST     "/publish"[GIN] 2021/03/18 - 23:55:55 | 200 |   69.520805ms |             ::1 | POST     "/publish"23:55:55.420220719 handlers.Publish, pubLiveMsg 11423:55:55.420321792 tStockService.publishMQ, done, msg=&{test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1}23:55:55.42071623 handlers.Publish, pubLiveMsg, msgId=114, rows=123:55:55.420731889 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/http://localhost:3333/notify/sale-order.stock.outbound23:55:55.420741935 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service23:55:55.420746401 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service23:55:55.420755367 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound23:55:55.42079505 tDeliveryWorker.beginPollAndDeliver, msg from live=&{100 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 114 test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1 0 0}23:55:55.435844021 handlers.Publish, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound, msgId=115[GIN] 2021/03/18 - 23:55:55 | 200 |   15.407267ms |             ::1 | POST     "/publish"[GIN] 2021/03/18 - 23:55:55 | 200 |   15.420327ms |             ::1 | POST     "/publish"23:55:55.4359058 handlers.Publish, pubLiveMsg 11523:55:55.436026025 tStockService.publishMQ, done, msg=&{test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0}23:55:55.436398324 handlers.Publish, pubLiveMsg, msgId=115, rows=123:55:55.436409937 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/http://localhost:3333/notify/sale-order.stock.outbound23:55:55.43642793 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service23:55:55.436433697 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service23:55:55.43644379 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound23:55:55.446599314 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-1/test-order-1.outbound23:55:55.446809726 order.NotifyStockOutbound, orderID=test-order-1, succeeded=true[GIN] 2021/03/18 - 23:55:55 | 200 |      61.898µs |             ::1 | POST     "/notify/sale-order.stock.outbound"[GIN] 2021/03/18 - 23:55:55 | 200 |      81.911µs |             ::1 | POST     "/notify/sale-order.stock.outbound"23:55:55.446951354 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-1/test-order-1.outbound23:55:55.462584405 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-1/test-order-1.outbound23:55:55.462615131 tDeliveryWorker.beginPollAndDeliver, msg from live=&{101 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 115 test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0 0 0}23:55:55.469999185 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-2/test-order-2.outbound23:55:55.470163043 order.NotifyStockOutbound, orderID=test-order-2, succeeded=false[GIN] 2021/03/18 - 23:55:55 | 200 |       85.14µs |             ::1 | POST     "/notify/sale-order.stock.outbound"[GIN] 2021/03/18 - 23:55:55 | 200 |     105.638µs |             ::1 | POST     "/notify/sale-order.stock.outbound"23:55:55.470369408 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-2/test-order-2.outbound23:55:55.486229145 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-2/test-order-2.outbound23:55:56.302885199 ============================================23:55:56.303470422 test passed--- PASS: Test_SagaSaleOrder (2.05s)PASSok      command-line-arguments  2.057s

IStockService.go

模仿的库存服务接口

package stock;import "learning/gooop/saga/order"type IStockService interface {    GetStock(prodId string) int    AddStock(prodId string, delta int) error    HandleSaleOrderCreated(it *order.SaleOrder) error}

tStockService.go

虚构库存服务, 实现IStockService接口

package stockimport (    "bytes"    "encoding/json"    "errors"    "io/ioutil"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "learning/gooop/saga/order"    "net/http"    "sync"    "time")type tStockService struct {    rwmutex *sync.RWMutex    stock map[string]int    bMQReady bool    publishQueue chan *models.TxMsg}func newStockService() IStockService {    it := new(tStockService)    it.init()    return it}func (me *tStockService) init() {    me.rwmutex = new(sync.RWMutex)    me.stock = make(map[string]int)    me.bMQReady = false    me.publishQueue = make(chan *models.TxMsg, gMQMaxQueuedMsg)    go func() {        time.Sleep(100*time.Millisecond)        go me.beginSubscribeMQ()        go me.beginPublishMQ()    }()}func (me *tStockService) GetStock(prodId string) int {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()    it,ok := me.stock[prodId]    if ok {        return it    } else {        return 0    }}func (me *tStockService) AddStock(prodId string, delta int) error {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()    it,ok := me.stock[prodId]    if ok {        n := it + delta        if n < 0 {            logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=%d, delta=%d", prodId, it, delta)            return gInsufficientStockError        } else {            logger.Logf("tStockService.AddStock, done, prodId=%s, stock=%d, delta=%d, after=%d", prodId, it, delta, n)            me.stock[prodId] = n        }    } else {        if delta < 0 {            logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=0, delta=%d", prodId, delta)            return gInsufficientStockError        } else {            logger.Logf("tStockService.AddStock, done, prodId=%s, stock=0, delta=%d, after=%d", prodId, it, delta)            me.stock[prodId] = delta        }    }    return nil}func (me *tStockService) beginSubscribeMQ() {    expireDuration := int64(1 * time.Hour)    subscribeDuration := 20 * time.Minute    pauseDuration := 3*time.Second    lastSubscribeTime := int64(0)    for {        now := time.Now().UnixNano()        if now - lastSubscribeTime >= int64(subscribeDuration) {            expireTime := now + expireDuration            err := fnSubscribeMQ(expireTime)            if err != nil {                me.bMQReady = false                logger.Logf("tStockService.beginSubscribeMQ, failed, err=%v", err)            } else {                lastSubscribeTime = now                me.bMQReady = true                logger.Logf("tStockService.beginSubscribeMQ, done")            }        }        time.Sleep(pauseDuration)    }}func fnSubscribeMQ(expireTime int64) error {    msg := &models.SubscribeMsg{        ClientID: gMQClientID,        Topic: gMQSubscribeTopic,        NotifyUrl: gMQServerURL + PathOfNotifySaleOrderCreated,        ExpireTime: expireTime,    }    url := gMQServerURL + "/subscribe"    return fnPost(msg, url)}func fnPost(msg interface{}, url string) error {    body,_ := json.Marshal(msg)    rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))    if err != nil {        return err    }    defer rsp.Body.Close()    j, err := ioutil.ReadAll(rsp.Body)    if err != nil {        return err    }    ok := &models.OkMsg{}    err = json.Unmarshal(j, ok)    if err != nil {        return err    }    if !ok.OK {        return gMQReplyFalse    }    return nil}func (me *tStockService) beginPublishMQ() {    for {        select {        case msg := <- me.publishQueue :            me.publishMQ(msg)            break        }    }}func (me *tStockService) publishMQ(msg *models.TxMsg) {    url := gMQServerURL + "/publish"    for i := 0;i < gMQMaxPublishRetry;i++ {        err := fnPost(msg, url)        if err != nil {            logger.Logf("tStockService.publishMQ, failed, err=%v, msg=%v", err, msg)            time.Sleep(gMQPublishInterval)        } else {            logger.Logf("tStockService.publishMQ, done, msg=%v", msg)            return        }    }    // publish failed    logger.Logf("tStockService.publishMQ, failed max retries, msg=%v", msg)}func (me *tStockService) HandleSaleOrderCreated(it *order.SaleOrder) error {    msg := &models.TxMsg{}    msg.GlobalID = it.OrderID    msg.SubID = it.OrderID + ".outbound"    msg.SenderID = gMQClientID    msg.Topic = gMQPublishTopic    err := me.AddStock(it.ProductID, -it.Quantity)    msg.CreateTime = time.Now().UnixNano()    if err != nil {        logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", err.Error(), it)        msg.Content = "0"    } else {        msg.Content = "1"    }    if len(me.publishQueue) >= gMQMaxQueuedMsg {        logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", gMQBlocked.Error(), it)        return gMQBlocked    } else {        me.publishQueue <- msg        return err    }}var gInsufficientStockError = errors.New("insufficient stock")var gMQReplyFalse = errors.New("mq reply false")var gMQBlocked = errors.New("mq blocked")var gMQMaxPublishRetry = 10var gMQPublishInterval = 1*time.Secondvar gMQSubscribeTopic = "sale-order.created"var gMQPublishTopic = "sale-order.stock.outbound"var gMQClientID = "stock-service"var gMQServerURL = "http://localhost:3333"var gMQMaxQueuedMsg = 1024var MockStockService = newStockService()

NotifySaleOrderCreated.go

用于监听订单创立音讯的http回调处理器

package stockimport (    "encoding/json"    "github.com/gin-gonic/gin"    "io/ioutil"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "learning/gooop/saga/order"    "net/http")func NotifySaleOrderCreated(c *gin.Context) {    body := c.Request.Body    defer body.Close()    j, e := ioutil.ReadAll(body)    if e != nil {        logger.Logf("stock.NotifySaleOrderCreated, failed ioutil.ReadAll")        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})        return    }    msg := &models.TxMsg{}    e = json.Unmarshal(j, msg)    if e != nil {        logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal msg")        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})        return    }    order := &order.SaleOrder{}    e = json.Unmarshal([]byte(msg.Content), order)    if e != nil {        logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal order")        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})        return    }    logger.Logf("stock.NotifySaleOrderCreated, order=%s/%v", order.OrderID, order)    // notify stock service    _ = MockStockService.HandleSaleOrderCreated(order)    c.JSON(http.StatusOK, gin.H{ "ok": true })}var PathOfNotifySaleOrderCreated = "/notify/sale-order.created"

(未完待续)