缘起
最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
Saga 模式
- saga 模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga 由一系列的子事务“Ti”组成,每个 Ti 都有对应的弥补“Ci”,当 Ti 呈现问题时 Ci 用于解决 Ti 执行带来的问题。能够通过上面的两个公式了解 Saga 模式。T = T1 T2 … Tn
T = TCT
Saga 模式的核心理念是防止应用长期持有锁(如 14.2.2 节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga 模式满足 ACD(原子性、一致性、持久性)特色。摘自 <<Go 微服务实战 >> 刘金亮, 2021.1
指标
- 为实现 saga 模式的分布式事务, 先撸一个 pub/sub 事务音讯队列服务
-
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局 ID 和子事务 ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标(Day 7)
-
MQS 已根本可用, 当初实现一个模仿的订单微服务, 并与 MQ 联动
- 长事务: 订单创立后, 联动库存服务, 扣减库存
-
弥补动作
- 如果扣库胜利, 更新订单状态为已出库(理论零碎中, 可能还波及物流发货等简单流程)
- 否则(库存有余), 更新订单状态为出库失败(理论零碎中, 可能还波及退款和告诉客户等简单流程)
-
流程
- 创立订单后, 向 MQ 公布 [销售订单. 创立] 音讯
- 订阅 MQ 的 [销售订单. 出库. 胜利], [销售订单. 出库. 失败] 音讯
- 接管到 MQ 的出库音讯后, 更新订单状态
设计
- ISaleOrderService: 订单服务接口
- SaleOrder: 销售订单实体
- tSaleOrderService: 模仿订单服务, 实现 ISaleOrderService 接口
- NotifyStockOutbound: 接管库存服务的扣库后果音讯
ISaleOrderService.go
订单服务接口
package order
// ISaleOrderService to manage sale order creation and modification
type ISaleOrderService interface {
// get order info
Get(orderID string) *SaleOrder
// create new order
Create(it *SaleOrder) error
// update order status
Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)
}
SaleOrder.go
销售订单实体
package order
type SaleOrder struct {
OrderID string
CustomerID string
ProductID string
Quantity int
Price float64
Amount float64
CreateTime int64
StatusFlag int32
}
const StatusNotDelivered int32 = 0
const StatusStockOutboundDone int32 = 1
const StatusStockOutboundFailed int32 = 2
const StatusMQServiceFailed int32 = 3
tSaleOrderService.go
模仿订单服务, 实现 ISaleOrderService 接口
package order
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"learning/gooop/saga/mqs/logger"
"learning/gooop/saga/mqs/models"
"net/http"
"sync"
"sync/atomic"
"time"
)
type tSaleOrderService struct {
rwmutex *sync.RWMutex
orders map[string]*SaleOrder
bMQReady bool
publishQueue chan *SaleOrder
}
func newSaleOrderService() ISaleOrderService {it := new(tSaleOrderService)
it.init()
return it
}
func (me *tSaleOrderService) init() {me.rwmutex = new(sync.RWMutex)
me.orders = make(map[string]*SaleOrder)
me.bMQReady = false
me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)
go me.beginSubscribeMQ()
go me.beginPublishMQ()}
func (me *tSaleOrderService) beginSubscribeMQ() {expireDuration := int64(1 * time.Hour)
subscribeDuration := 20 * time.Minute
pauseDuration := 3*time.Second
lastSubscribeTime := int64(0)
for {now := time.Now().UnixNano()
if now - lastSubscribeTime >= int64(subscribeDuration) {
expireTime := now + expireDuration
err := fnSubscribeMQ(expireTime)
if err != nil {
me.bMQReady = false
logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)
} else {
lastSubscribeTime = now
me.bMQReady = true
logger.Logf("tSaleOrderService.beginSubscribeMQ, done")
}
}
time.Sleep(pauseDuration)
}
}
func fnSubscribeMQ(expireTime int64) error {
msg := &models.SubscribeMsg{
ClientID: gMQClientID,
Topic: gMQSubscribeTopic,
NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,
ExpireTime: expireTime,
}
url := gMQServerURL + "/subscribe"
return fnPost(msg, url)
}
func fnPost(msg interface{}, url string) error {body,_ := json.Marshal(msg)
rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
if err != nil {return err}
defer rsp.Body.Close()
j, err := ioutil.ReadAll(rsp.Body)
if err != nil {return err}
ok := &models.OkMsg{}
err = json.Unmarshal(j, ok)
if err != nil {return err}
if !ok.OK {return gMQReplyFalse}
return nil
}
func (me *tSaleOrderService) beginPublishMQ() {
for {
select {
case msg := <- me.publishQueue :
me.publishMQ(msg)
break
}
}
}
func (me *tSaleOrderService) Get(orderID string) *SaleOrder {me.rwmutex.RLock()
defer me.rwmutex.RUnlock()
it,ok := me.orders[orderID]
if ok {return it} else {return nil}
}
func (me *tSaleOrderService) Create(it *SaleOrder) error {me.rwmutex.Lock()
defer me.rwmutex.Unlock()
if len(me.publishQueue) >= gMQMaxQueuedMsg {return gMQNotAvailableError}
me.orders[it.OrderID] = it
me.publishQueue <- it
return nil
}
func (me *tSaleOrderService) publishMQ(it *SaleOrder) {
url := gMQServerURL + "/publish"
j,_ := json.Marshal(it)
msg := &models.TxMsg{
GlobalID: it.OrderID,
SubID: it.OrderID,
SenderID: gMQClientID,
Topic: gMQPublishTopic,
CreateTime: it.CreateTime,
Content: string(j),
}
for i := 0;i < gMQMaxPublishRetry;i++ {err := fnPost(msg, url)
if err != nil {logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)
time.Sleep(gMQPublishInterval)
} else {logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)
return
}
}
// publish failed
logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)
_,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)
}
func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {me.rwmutex.RLock()
defer me.rwmutex.RUnlock()
it, ok := me.orders[orderID]
if !ok {return gNotFoundError, nil}
if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {return gStatusChangedError, it}
it.StatusFlag = newStatusFlag
return nil, it
}
var gMQReplyFalse = errors.New("mq reply false")
var gMQNotAvailableError = errors.New("mq not ready")
var gNotFoundError = errors.New("order not found")
var gStatusChangedError = errors.New("status changed")
var gMQMaxPublishRetry = 3
var gMQPublishInterval = 1*time.Second
var gMQSubscribeTopic = "sale-order.stock.outbound"
var gMQPublishTopic = "sale-order.created"
var gMQClientID = "sale-order-service"
var gMQServerURL = "http://localhost:333"
var gMQMaxQueuedMsg = 1024
var SaleOrderService = newSaleOrderService()
NotifyStockOutbound.go
接管库存服务的扣库后果音讯
package order
import (
"encoding/json"
"github.com/gin-gonic/gin"
"io/ioutil"
"learning/gooop/saga/mqs/logger"
"learning/gooop/saga/mqs/models"
"net/http"
)
func NotifyStockOutbound(c *gin.Context) {
body := c.Request.Body
defer body.Close()
j, e := ioutil.ReadAll(body)
if e != nil {logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")
c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
return
}
msg := &models.TxMsg{}
e = json.Unmarshal(j, msg)
if e != nil {logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")
c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
return
}
orderID := msg.GlobalID
succeeded := msg.Content == "1"
logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)
var newStatusFlag int32
if succeeded {newStatusFlag = StatusStockOutboundDone} else {newStatusFlag = StatusStockOutboundFailed}
err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)
if err != nil {logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)
}
}
var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"
(未完待续)