乐趣区

关于golang:手撸golang-GO与微服务-Saga模式之7

缘起

最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之

Saga 模式

  • saga 模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga 由一系列的子事务“Ti”组成,每个 Ti 都有对应的弥补“Ci”,当 Ti 呈现问题时 Ci 用于解决 Ti 执行带来的问题。能够通过上面的两个公式了解 Saga 模式。T = T1 T2 … Tn
T = TCT

Saga 模式的核心理念是防止应用长期持有锁(如 14.2.2 节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga 模式满足 ACD(原子性、一致性、持久性)特色。摘自 <<Go 微服务实战 >> 刘金亮, 2021.1

指标

  • 为实现 saga 模式的分布式事务, 先撸一个 pub/sub 事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局 ID 和子事务 ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标(Day 7)

  • MQS 已根本可用, 当初实现一个模仿的订单微服务, 并与 MQ 联动

    • 长事务: 订单创立后, 联动库存服务, 扣减库存
    • 弥补动作

      • 如果扣库胜利, 更新订单状态为已出库(理论零碎中, 可能还波及物流发货等简单流程)
      • 否则(库存有余), 更新订单状态为出库失败(理论零碎中, 可能还波及退款和告诉客户等简单流程)
  • 流程

    • 创立订单后, 向 MQ 公布 [销售订单. 创立] 音讯
    • 订阅 MQ 的 [销售订单. 出库. 胜利], [销售订单. 出库. 失败] 音讯
    • 接管到 MQ 的出库音讯后, 更新订单状态

设计

  • ISaleOrderService: 订单服务接口
  • SaleOrder: 销售订单实体
  • tSaleOrderService: 模仿订单服务, 实现 ISaleOrderService 接口
  • NotifyStockOutbound: 接管库存服务的扣库后果音讯

ISaleOrderService.go

订单服务接口

package order

// ISaleOrderService to manage sale order creation and modification
type ISaleOrderService interface {
    // get order info
    Get(orderID string) *SaleOrder

    // create new order
    Create(it *SaleOrder) error

    // update order status
    Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)
}

SaleOrder.go

销售订单实体

package order

type SaleOrder struct {
    OrderID string
    CustomerID string
    ProductID string
    Quantity int
    Price float64
    Amount float64
    CreateTime int64
    StatusFlag int32
}

const StatusNotDelivered int32 = 0
const StatusStockOutboundDone int32 = 1
const StatusStockOutboundFailed int32 = 2
const StatusMQServiceFailed int32 = 3

tSaleOrderService.go

模仿订单服务, 实现 ISaleOrderService 接口

package order

import (
    "bytes"
    "encoding/json"
    "errors"
    "io/ioutil"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
    "sync"
    "sync/atomic"
    "time"
)

type tSaleOrderService struct {
    rwmutex *sync.RWMutex
    orders map[string]*SaleOrder
    bMQReady bool
    publishQueue chan *SaleOrder
}

func newSaleOrderService() ISaleOrderService {it := new(tSaleOrderService)
    it.init()
    return it
}

func (me *tSaleOrderService) init() {me.rwmutex = new(sync.RWMutex)
    me.orders = make(map[string]*SaleOrder)
    me.bMQReady = false
    me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)

    go me.beginSubscribeMQ()
    go me.beginPublishMQ()}

func (me *tSaleOrderService) beginSubscribeMQ() {expireDuration := int64(1 * time.Hour)
    subscribeDuration := 20 * time.Minute
    pauseDuration := 3*time.Second
    lastSubscribeTime := int64(0)

    for {now := time.Now().UnixNano()
        if now - lastSubscribeTime >= int64(subscribeDuration) {
            expireTime := now + expireDuration
            err := fnSubscribeMQ(expireTime)

            if err != nil {
                me.bMQReady = false
                logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)

            } else {
                lastSubscribeTime = now
                me.bMQReady = true
                logger.Logf("tSaleOrderService.beginSubscribeMQ, done")
            }
        }
        time.Sleep(pauseDuration)
    }
}

func fnSubscribeMQ(expireTime int64) error {
    msg := &models.SubscribeMsg{
        ClientID: gMQClientID,
        Topic: gMQSubscribeTopic,
        NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,
        ExpireTime: expireTime,
    }
    url := gMQServerURL + "/subscribe"
    return fnPost(msg, url)
}


func fnPost(msg interface{}, url string) error {body,_ := json.Marshal(msg)
    rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
    if err != nil {return err}

    defer rsp.Body.Close()
    j, err := ioutil.ReadAll(rsp.Body)
    if err != nil {return err}
    ok := &models.OkMsg{}
    err = json.Unmarshal(j, ok)
    if err != nil {return err}

    if !ok.OK {return gMQReplyFalse}

    return nil
}


func (me *tSaleOrderService) beginPublishMQ() {
    for {
        select {
        case msg := <- me.publishQueue :
            me.publishMQ(msg)
            break
        }
    }
}


func (me *tSaleOrderService) Get(orderID string) *SaleOrder {me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    it,ok := me.orders[orderID]
    if ok {return it} else {return nil}
}

func (me *tSaleOrderService) Create(it *SaleOrder) error {me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    if len(me.publishQueue) >= gMQMaxQueuedMsg {return gMQNotAvailableError}
    me.orders[it.OrderID] = it
    me.publishQueue <- it

    return nil
}


func (me *tSaleOrderService) publishMQ(it *SaleOrder) {
    url := gMQServerURL + "/publish"

    j,_ := json.Marshal(it)
    msg := &models.TxMsg{
        GlobalID: it.OrderID,
        SubID: it.OrderID,
        SenderID: gMQClientID,
        Topic: gMQPublishTopic,
        CreateTime: it.CreateTime,
        Content: string(j),
    }

    for i := 0;i < gMQMaxPublishRetry;i++ {err := fnPost(msg, url)
        if err != nil {logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)
            time.Sleep(gMQPublishInterval)

        } else {logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)
            return
        }
    }

    // publish failed
    logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)
    _,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)
}


func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    it, ok := me.orders[orderID]
    if !ok {return gNotFoundError, nil}

    if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {return gStatusChangedError, it}

    it.StatusFlag = newStatusFlag
    return nil, it
}

var gMQReplyFalse = errors.New("mq reply false")
var gMQNotAvailableError = errors.New("mq not ready")
var gNotFoundError = errors.New("order not found")
var gStatusChangedError = errors.New("status changed")
var gMQMaxPublishRetry = 3
var gMQPublishInterval = 1*time.Second
var gMQSubscribeTopic = "sale-order.stock.outbound"
var gMQPublishTopic = "sale-order.created"
var gMQClientID = "sale-order-service"
var gMQServerURL = "http://localhost:333"
var gMQMaxQueuedMsg = 1024

var SaleOrderService = newSaleOrderService()

NotifyStockOutbound.go

接管库存服务的扣库后果音讯

package order

import (
    "encoding/json"
    "github.com/gin-gonic/gin"
    "io/ioutil"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
)

func NotifyStockOutbound(c *gin.Context) {
    body := c.Request.Body
    defer body.Close()

    j, e := ioutil.ReadAll(body)
    if e != nil {logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    msg := &models.TxMsg{}
    e = json.Unmarshal(j, msg)
    if e != nil {logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    orderID := msg.GlobalID
    succeeded := msg.Content == "1"
    logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)

    var newStatusFlag int32
    if succeeded {newStatusFlag = StatusStockOutboundDone} else {newStatusFlag = StatusStockOutboundFailed}

    err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)
    if err != nil {logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)
    }
}

var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"

(未完待续)

退出移动版