缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过弥补动作进行撤销的
- 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,每个Ti都有对应的弥补“Ci”,当Ti呈现问题时Ci用于解决Ti执行带来的问题。能够通过上面的两个公式了解Saga模式。T = T1 T2 … TnT = TCTSaga模式的核心理念是防止应用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga模式满足ACD(原子性、一致性、持久性)特色。摘自 <<Go微服务实战>> 刘金亮, 2021.1
指标
- 为实现saga模式的分布式事务, 先撸一个pub/sub事务音讯队列服务
事务音讯队列服务的功能性要求
- 音讯不会失落: 音讯的长久化
- 音讯的唯一性: 要求每个音讯有全局ID和子事务ID
- 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试
子目标(Day 7)
MQS已根本可用, 当初实现一个模仿的订单微服务, 并与MQ联动
- 长事务: 订单创立后, 联动库存服务, 扣减库存
弥补动作
- 如果扣库胜利, 更新订单状态为已出库(理论零碎中, 可能还波及物流发货等简单流程)
- 否则(库存有余), 更新订单状态为出库失败(理论零碎中, 可能还波及退款和告诉客户等简单流程)
流程
- 创立订单后, 向MQ公布[销售订单.创立]音讯
- 订阅MQ的[销售订单.出库.胜利], [销售订单.出库.失败]音讯
- 接管到MQ的出库音讯后, 更新订单状态
设计
- ISaleOrderService: 订单服务接口
- SaleOrder: 销售订单实体
- tSaleOrderService: 模仿订单服务, 实现ISaleOrderService接口
- NotifyStockOutbound: 接管库存服务的扣库后果音讯
ISaleOrderService.go
订单服务接口
package order// ISaleOrderService to manage sale order creation and modificationtype ISaleOrderService interface { // get order info Get(orderID string) *SaleOrder // create new order Create(it *SaleOrder) error // update order status Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)}
SaleOrder.go
销售订单实体
package ordertype SaleOrder struct { OrderID string CustomerID string ProductID string Quantity int Price float64 Amount float64 CreateTime int64 StatusFlag int32}const StatusNotDelivered int32 = 0const StatusStockOutboundDone int32 = 1const StatusStockOutboundFailed int32 = 2const StatusMQServiceFailed int32 = 3
tSaleOrderService.go
模仿订单服务, 实现ISaleOrderService接口
package orderimport ( "bytes" "encoding/json" "errors" "io/ioutil" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "net/http" "sync" "sync/atomic" "time")type tSaleOrderService struct { rwmutex *sync.RWMutex orders map[string]*SaleOrder bMQReady bool publishQueue chan *SaleOrder}func newSaleOrderService() ISaleOrderService { it := new(tSaleOrderService) it.init() return it}func (me *tSaleOrderService) init() { me.rwmutex = new(sync.RWMutex) me.orders = make(map[string]*SaleOrder) me.bMQReady = false me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg) go me.beginSubscribeMQ() go me.beginPublishMQ()}func (me *tSaleOrderService) beginSubscribeMQ() { expireDuration := int64(1 * time.Hour) subscribeDuration := 20 * time.Minute pauseDuration := 3*time.Second lastSubscribeTime := int64(0) for { now := time.Now().UnixNano() if now - lastSubscribeTime >= int64(subscribeDuration) { expireTime := now + expireDuration err := fnSubscribeMQ(expireTime) if err != nil { me.bMQReady = false logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err) } else { lastSubscribeTime = now me.bMQReady = true logger.Logf("tSaleOrderService.beginSubscribeMQ, done") } } time.Sleep(pauseDuration) }}func fnSubscribeMQ(expireTime int64) error { msg := &models.SubscribeMsg{ ClientID: gMQClientID, Topic: gMQSubscribeTopic, NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound, ExpireTime: expireTime, } url := gMQServerURL + "/subscribe" return fnPost(msg, url)}func fnPost(msg interface{}, url string) error { body,_ := json.Marshal(msg) rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body)) if err != nil { return err } defer rsp.Body.Close() j, err := ioutil.ReadAll(rsp.Body) if err != nil { return err } ok := &models.OkMsg{} err = json.Unmarshal(j, ok) if err != nil { return err } if !ok.OK { return gMQReplyFalse } return nil}func (me *tSaleOrderService) beginPublishMQ() { for { select { case msg := <- me.publishQueue : me.publishMQ(msg) break } }}func (me *tSaleOrderService) Get(orderID string) *SaleOrder { me.rwmutex.RLock() defer me.rwmutex.RUnlock() it,ok := me.orders[orderID] if ok { return it } else { return nil }}func (me *tSaleOrderService) Create(it *SaleOrder) error { me.rwmutex.Lock() defer me.rwmutex.Unlock() if len(me.publishQueue) >= gMQMaxQueuedMsg { return gMQNotAvailableError } me.orders[it.OrderID] = it me.publishQueue <- it return nil}func (me *tSaleOrderService) publishMQ(it *SaleOrder) { url := gMQServerURL + "/publish" j,_ := json.Marshal(it) msg := &models.TxMsg{ GlobalID: it.OrderID, SubID: it.OrderID, SenderID: gMQClientID, Topic: gMQPublishTopic, CreateTime: it.CreateTime, Content: string(j), } for i := 0;i < gMQMaxPublishRetry;i++ { err := fnPost(msg, url) if err != nil { logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it) time.Sleep(gMQPublishInterval) } else { logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it) return } } // publish failed logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it) _,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)}func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) { me.rwmutex.RLock() defer me.rwmutex.RUnlock() it, ok := me.orders[orderID] if !ok { return gNotFoundError, nil } if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) { return gStatusChangedError, it } it.StatusFlag = newStatusFlag return nil, it}var gMQReplyFalse = errors.New("mq reply false")var gMQNotAvailableError = errors.New("mq not ready")var gNotFoundError = errors.New("order not found")var gStatusChangedError = errors.New("status changed")var gMQMaxPublishRetry = 3var gMQPublishInterval = 1*time.Secondvar gMQSubscribeTopic = "sale-order.stock.outbound"var gMQPublishTopic = "sale-order.created"var gMQClientID = "sale-order-service"var gMQServerURL = "http://localhost:333"var gMQMaxQueuedMsg = 1024var SaleOrderService = newSaleOrderService()
NotifyStockOutbound.go
接管库存服务的扣库后果音讯
package orderimport ( "encoding/json" "github.com/gin-gonic/gin" "io/ioutil" "learning/gooop/saga/mqs/logger" "learning/gooop/saga/mqs/models" "net/http")func NotifyStockOutbound(c *gin.Context) { body := c.Request.Body defer body.Close() j, e := ioutil.ReadAll(body) if e != nil { logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll") c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()}) return } msg := &models.TxMsg{} e = json.Unmarshal(j, msg) if e != nil { logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal") c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()}) return } orderID := msg.GlobalID succeeded := msg.Content == "1" logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded) var newStatusFlag int32 if succeeded { newStatusFlag = StatusStockOutboundDone } else { newStatusFlag = StatusStockOutboundFailed } err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag) if err != nil { logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order) }}var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"
(未完待续)