乐趣区

关于golang:手撸golang-GO与微服务-Saga模式之1

缘起

最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之

Saga 模式

  • saga 模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过弥补动作进行撤销的
  • 事务动作和弥补动作都是幂等的, 容许反复执行而不会有副作用
Saga 由一系列的子事务“Ti”组成,每个 Ti 都有对应的弥补“Ci”,当 Ti 呈现问题时 Ci 用于解决 Ti 执行带来的问题。能够通过上面的两个公式了解 Saga 模式。T = T1 T2 … Tn
T = TCT

Saga 模式的核心理念是防止应用长期持有锁(如 14.2.2 节介绍的两阶段提交)的长事务,而应该将事务切分为一组按序顺次提交的短事务,Saga 模式满足 ACD(原子性、一致性、持久性)特色。摘自 <<Go 微服务实战 >> 刘金亮, 2021.1

指标

  • 为实现 saga 模式的分布式事务, 先模仿实现一个简略的 pub/sub 音讯服务
  • 应用 sqlx + sqlite3 长久化音讯, 应用 gin 作为 http 框架, 应用 toml 配置文件

代码构造

$ tree saga
saga
└── mqs
    ├── cmd
    │   └── boot.go
    ├── config
    │   └── config.go
    ├── database
    │   └── database.go
    ├── handlers
    │   ├── ping.go
    │   └── subscribe.go
    ├── logger
    │   └── logger.go
    └── routers
        └── routers.go

设计

  • config: 读取并解析 toml 配置文件
  • database: 封装 sqlx + sqlite3
  • handlers/ping: 预留 http 服务保活探针接口
  • handlers/subscribe: 注册一个音讯订阅者. 音讯订阅者蕴含订阅者 ID, 主题和回调地址 (以便推送音讯)
  • routers: 注册 gin 路由

config.go

读取并解析 toml 配置文件

package config

import  "github.com/BurntSushi/toml"

type tomlConfig struct {MQS tServiceConfig}

type tServiceConfig struct {
    Port int
    LogDir string
}

var gServiceConfig = &tServiceConfig{}

func init() {cfg := &tomlConfig{}
    if _,err := toml.DecodeFile("./mqs.toml", cfg);err != nil {panic(err)
    }
    *gServiceConfig = cfg.MQS
}

func GetPort() int {return gServiceConfig.Port}

func GetLogDir() string {return gServiceConfig.LogDir}

database.go

封装 sqlx + sqlite3

package database

import "github.com/jmoiron/sqlx"
import _ "github.com/mattn/go-sqlite3"

type DBFunc func(db *sqlx.DB) error
type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error

func init() {
    // must prepare tables
    err := DB(func(db *sqlx.DB) error {
        // table: sub_info
        _,e := db.Exec(`
        create table if not exists sub_info(
            id integer primary key autoincrement,
            client_id varchar(50) unique not null,
            topic varchar(100) not null,
            callback_url varchar(500) not null
        )`)
        return e
    })
    if err != nil {panic(err)
    }
}

func open() (*sqlx.DB, error) {return sqlx.Open("sqlite3", "./mqs.db")
}

func DB(action DBFunc) error {db,err := open()
    if err != nil {return err}
    defer func() { _ = db.Close() }()

    return action(db)
}

func TX(action TXFunc) error {return DB(func(db *sqlx.DB) error {tx, err := db.Beginx()
        if err != nil {return err}

        err = action(db, tx)
        if err == nil {return tx.Commit()
        } else {return tx.Rollback()
        }
    })
}

ping.go

预留 http 服务保活探针接口

package handlers

import (
    "github.com/gin-gonic/gin"
    "net/http"
    "time"
)

func Ping(c *gin.Context) {c.JSON(http.StatusOK, gin.H{ "ok": true, "time": time.Now().Format(time.RFC3339)})
}

subscribe.go

注册一个音讯订阅者. 音讯订阅者蕴含订阅者 ID, 主题和回调地址 (以便推送音讯)

package handlers

import (
    "github.com/gin-gonic/gin"
    "github.com/gin-gonic/gin/binding"
    "github.com/jmoiron/sqlx"
    "learning/gooop/saga/mqs/database"
    "net/http"
)

type tSubMsg struct {
    ClientID string
    Topic string
    CallbackUrl string
}

func Subscribe(c *gin.Context) {msg := &tSubMsg{}
    if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
        c.AbortWithStatusJSON(
            http.StatusInternalServerError,
            gin.H{"error": err.Error()})
        return
    }
    
    err := database.DB(func(db *sqlx.DB) error {
        _,e := db.Exec("replace into sub_info(client_id, topic, callback_url) values(?, ?, ?)",
            msg.ClientID,
            msg.Topic,
            msg.CallbackUrl)
        return e
    })

    if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
    } else {c.JSON(http.StatusOK, gin.H { "ok": true})
    }
}

routers.go

注册 gin 路由

package routers

import (
    "github.com/gin-gonic/gin"
    "learning/gooop/saga/mqs/handlers"
)

func RegisterRouters() *gin.Engine {r := gin.Default()
    r.Use(gin.Logger())
    r.GET("/ping", handlers.Ping)
    r.POST("/subscribe", handlers.Subscribe)
    return r
}

(未完待续)

退出移动版