关于golang:手撸golang-GO与微服务-Saga模式之2

43次阅读

共计 4149 个字符,预计需要花费 11 分钟才能阅读完成。

缘起

最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之

Saga 模式

  • saga 模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 容许反复执行而不会有副作用
Saga 由一系列的子事务“Ti”组成,每个 Ti 都有对应的弥补“Ci”,当 Ti 呈现问题时 Ci 用于解决 Ti 执行带来的问题。能够通过上面的两个公式了解 Saga 模式。T = T1 T2 … Tn
T = TCT

Saga 模式的核心理念是防止应用长期持有锁(如 14.2.2 节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga 模式满足 ACD(原子性、一致性、持久性)特色。摘自 <<Go 微服务实战 >> 刘金亮, 2021.1

指标

  • 为实现 saga 模式的分布式事务, 先撸一个 pub/sub 事务音讯队列服务
  • 事务音讯队列服务的功能性要求

    • 音讯不会失落: 音讯的长久化
    • 音讯的唯一性: 要求每个音讯有全局 ID 和子事务 ID
    • 确保投递胜利: 投递队列长久化, 投递状态长久化, 失败重试

子目标 (Day 2)

  • 音讯的设计
  • 音讯的长久化
  • 投递队列的长久化
  • 投递状态的长久化

设计

  • TxMsg: 事务音讯模型
  • database: 增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表
  • publish: 音讯公布 api
  • routers: 增加 /publish 路由

TxMsg.go

事务音讯模型

package models

// 事务音讯体
type TxMsg struct {
    // 全局事务 ID
    GlobalID string

    // 子事务 ID
    SubID string

    // 发送者 ID
    SenderID string

    // 工夫戳, 应用 time.Now().UnixNano()
    CreateTime int64

    // 主题, 即音讯类型
    Topic string

    // 音讯内容, 个别是 json
    Content string
}

database.go

增加事务音讯表, 音讯投递表, 胜利投递表, 失败投递表

package database

import "github.com/jmoiron/sqlx"
import _ "github.com/mattn/go-sqlite3"

type DBFunc func(db *sqlx.DB) error
type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error

func init() {
    // must prepare tables
    err := DB(initDB)
    if err != nil {panic(err)
    }
}

func initDB(db *sqlx.DB) error {
    // 订阅者 / 消费者: subscriber
    _, e := db.Exec(`create table if not exists subscriber(
    id integer primary key autoincrement,
    client_id varchar(50) unique not null,
    topic varchar(100) not null,
    notify_url varchar(500) not null,
    expire_time integer
)`)
    if e != nil {return e}

    // 事务音讯: tx_msg
    _, e = db.Exec(`create table if not exists tx_msg (
    id integer primary key autoincrement,
    global_id string varchar(50) not null,
    sub_id string varchar(50) unique not null,
    sender_id varchar(50) not null,
    create_time integer not null,
    topic varchar(100) not null,
    content nvarchar(2048) not null
)`)
    if e != nil {return e}

    // 投递队列: delivery_queue
    _, e = db.Exec(`create table if not exists delivery_queue (
    id integer primary key autoincrement,
    msg_id integer not null,
    sub_id integer not null,
    status_flag integer not null,
    failed_count integer not null
)`)
    if e != nil {return e}

    // 胜利投递队列: success_queue
    _, e = db.Exec(`create table if not exists success_queue (
    id integer primary key autoincrement,
    msg_id integer not null,
    sub_id integer not null
)`)
    if e != nil {return e}

    // 投递失败队列: failed_queue
    _, e = db.Exec(`create table if not exists failed_queue (
    id integer primary key autoincrement,
    msg_id integer not null,
    sub_id integer not null
)`)
    if e != nil {return e}

    return nil
}

func open() (*sqlx.DB, error) {return sqlx.Open("sqlite3", "./mqs.db")
}

func DB(action DBFunc) error {db,err := open()
    if err != nil {return err}
    defer func() { _ = db.Close() }()

    return action(db)
}

func TX(action TXFunc) error {return DB(func(db *sqlx.DB) error {tx, err := db.Beginx()
        if err != nil {return err}

        err = action(db, tx)
        if err == nil {return tx.Commit()
        } else {return tx.Rollback()
        }
    })
}

publish.go

音讯公布 api

package handlers

import (
    "github.com/gin-gonic/gin"
    "github.com/gin-gonic/gin/binding"
    "github.com/jmoiron/sqlx"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
    "time"
)

func Publish(c *gin.Context) {
    // parse request
    msg := &models.TxMsg{}
    if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
        c.AbortWithStatusJSON(
            http.StatusInternalServerError,
            gin.H{"ok": false, "error": err.Error()})
        return
    }

    // fixme: validate msg

    // save to db
    ids := []int64{0}
    err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {id, e := saveTxMsg(db, tx, msg)
        if e != nil {return e}

        ids[0] = id
        return nil
    })

    if ids[0] > 0 {newMsgId := ids[0]
        logger.Logf("publish new msg: %d", newMsgId)

        // todo: 新增音讯, 开始投送
    }

    // reply
    if err != nil {c.JSON(http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()})
    } else {c.JSON(http.StatusOK, gin.H { "ok": true})
    }
}


func saveTxMsg(db *sqlx.DB, tx *sqlx.Tx, msg *models.TxMsg) (int64,error) {
    // insert tx_msg
    r, e := db.Exec(`replace into tx_msg(global_id, sub_id, sender_id, create_time, topic, content) values(?,?,?,?,?,?)`,
        msg.GlobalID, msg.SubID, msg.SenderID, msg.CreateTime, msg.Topic, msg.Content,
    )
    if e != nil {return 0,e}

    // get last insert id
    id,e := r.LastInsertId()
    if e != nil {return 0,e}

    if id > 0 {
        // copy to delivery queue
        now := time.Now().UnixNano()
        r, e = db.Exec(`
insert into delivery_queue(msg_id, sub_id, status_flag, failed_count) 
select
    ?, s.id, 0, 0 
from
    sub_info s
where
    s.expire_time>?
`, id, now)
        if e != nil {return 0,e}
    }

    return id, nil
}

routers.go

增加 /publish 路由

package routers

import (
    "github.com/gin-gonic/gin"
    "learning/gooop/saga/mqs/handlers"
)

func RegisterRouters() *gin.Engine {r := gin.Default()
    r.Use(gin.Logger())

    r.GET("/ping", handlers.Ping)
    r.POST("/subscribe", handlers.Subscribe)
    r.POST("/publish", handlers.Publish)

    return r
}

(未完待续)

正文完
 0