缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1

指标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 7)

  • MQS已根本可用, 当初实现一个模仿的订单微服务, 并与MQ联动

    • 长事务: 订单创立后, 联动库存服务, 扣减库存
    • 弥补动作

      • 如果扣库胜利, 更新订单状态为已出库(理论零碎中, 可能还波及物流发货等简单流程)
      • 否则(库存有余), 更新订单状态为出库失败(理论零碎中, 可能还波及退款和告诉客户等简单流程)
  • 流程

    • 创立订单后, 向MQ公布[销售订单.创立]音讯
    • 订阅MQ的[销售订单.出库.胜利], [销售订单.出库.失败]音讯
    • 接管到MQ的出库音讯后, 更新订单状态

设计

  • ISaleOrderService: 订单服务接口
  • SaleOrder: 销售订单实体
  • tSaleOrderService: 模仿订单服务, 实现ISaleOrderService接口
  • NotifyStockOutbound: 接管库存服务的扣库后果音讯

ISaleOrderService.go

订单服务接口

package order// ISaleOrderService to manage sale order creation and modificationtype ISaleOrderService interface {    // get order info    Get(orderID string) *SaleOrder    // create new order    Create(it *SaleOrder) error    // update order status    Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)}

SaleOrder.go

销售订单实体

package ordertype SaleOrder struct {    OrderID string    CustomerID string    ProductID string    Quantity int    Price float64    Amount float64    CreateTime int64    StatusFlag int32}const StatusNotDelivered int32 = 0const StatusStockOutboundDone int32 = 1const StatusStockOutboundFailed int32 = 2const StatusMQServiceFailed int32 = 3

tSaleOrderService.go

模仿订单服务, 实现ISaleOrderService接口

package orderimport (    "bytes"    "encoding/json"    "errors"    "io/ioutil"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "net/http"    "sync"    "sync/atomic"    "time")type tSaleOrderService struct {    rwmutex *sync.RWMutex    orders map[string]*SaleOrder    bMQReady bool    publishQueue chan *SaleOrder}func newSaleOrderService() ISaleOrderService {    it := new(tSaleOrderService)    it.init()    return it}func (me *tSaleOrderService) init() {    me.rwmutex = new(sync.RWMutex)    me.orders = make(map[string]*SaleOrder)    me.bMQReady = false    me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)    go me.beginSubscribeMQ()    go me.beginPublishMQ()}func (me *tSaleOrderService) beginSubscribeMQ() {    expireDuration := int64(1 * time.Hour)    subscribeDuration := 20 * time.Minute    pauseDuration := 3*time.Second    lastSubscribeTime := int64(0)    for {        now := time.Now().UnixNano()        if now - lastSubscribeTime >= int64(subscribeDuration) {            expireTime := now + expireDuration            err := fnSubscribeMQ(expireTime)            if err != nil {                me.bMQReady = false                logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)            } else {                lastSubscribeTime = now                me.bMQReady = true                logger.Logf("tSaleOrderService.beginSubscribeMQ, done")            }        }        time.Sleep(pauseDuration)    }}func fnSubscribeMQ(expireTime int64) error {    msg := &models.SubscribeMsg{        ClientID: gMQClientID,        Topic: gMQSubscribeTopic,        NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,        ExpireTime: expireTime,    }    url := gMQServerURL + "/subscribe"    return fnPost(msg, url)}func fnPost(msg interface{}, url string) error {    body,_ := json.Marshal(msg)    rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))    if err != nil {        return err    }    defer rsp.Body.Close()    j, err := ioutil.ReadAll(rsp.Body)    if err != nil {        return err    }    ok := &models.OkMsg{}    err = json.Unmarshal(j, ok)    if err != nil {        return err    }    if !ok.OK {        return gMQReplyFalse    }    return nil}func (me *tSaleOrderService) beginPublishMQ() {    for {        select {        case msg := <- me.publishQueue :            me.publishMQ(msg)            break        }    }}func (me *tSaleOrderService) Get(orderID string) *SaleOrder {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()    it,ok := me.orders[orderID]    if ok {        return it    } else {        return nil    }}func (me *tSaleOrderService) Create(it *SaleOrder) error {    me.rwmutex.Lock()    defer me.rwmutex.Unlock()    if len(me.publishQueue) >= gMQMaxQueuedMsg {        return gMQNotAvailableError    }    me.orders[it.OrderID] = it    me.publishQueue <- it    return nil}func (me *tSaleOrderService) publishMQ(it *SaleOrder) {    url := gMQServerURL + "/publish"    j,_ := json.Marshal(it)    msg := &models.TxMsg{        GlobalID: it.OrderID,        SubID: it.OrderID,        SenderID: gMQClientID,        Topic: gMQPublishTopic,        CreateTime: it.CreateTime,        Content: string(j),    }    for i := 0;i < gMQMaxPublishRetry;i++ {        err := fnPost(msg, url)        if err != nil {            logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)            time.Sleep(gMQPublishInterval)        } else {            logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)            return        }    }    // publish failed    logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)    _,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)}func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {    me.rwmutex.RLock()    defer me.rwmutex.RUnlock()    it, ok := me.orders[orderID]    if !ok {        return gNotFoundError, nil    }    if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {        return gStatusChangedError, it    }    it.StatusFlag = newStatusFlag    return nil, it}var gMQReplyFalse = errors.New("mq reply false")var gMQNotAvailableError = errors.New("mq not ready")var gNotFoundError = errors.New("order not found")var gStatusChangedError = errors.New("status changed")var gMQMaxPublishRetry = 3var gMQPublishInterval = 1*time.Secondvar gMQSubscribeTopic = "sale-order.stock.outbound"var gMQPublishTopic = "sale-order.created"var gMQClientID = "sale-order-service"var gMQServerURL = "http://localhost:333"var gMQMaxQueuedMsg = 1024var SaleOrderService = newSaleOrderService()

NotifyStockOutbound.go

接管库存服务的扣库后果音讯

package orderimport (    "encoding/json"    "github.com/gin-gonic/gin"    "io/ioutil"    "learning/gooop/saga/mqs/logger"    "learning/gooop/saga/mqs/models"    "net/http")func NotifyStockOutbound(c *gin.Context) {    body := c.Request.Body    defer body.Close()    j, e := ioutil.ReadAll(body)    if e != nil {        logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})        return    }    msg := &models.TxMsg{}    e = json.Unmarshal(j, msg)    if e != nil {        logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})        return    }    orderID := msg.GlobalID    succeeded := msg.Content == "1"    logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)    var newStatusFlag int32    if succeeded {        newStatusFlag = StatusStockOutboundDone    } else {        newStatusFlag = StatusStockOutboundFailed    }    err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)    if err != nil {        logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)    }}var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"

(未完待续)