乐趣区

关于golang:学习笔记带你十天轻松搞定-Go-微服务系列大结局十-分布式事务

1、学习课程

带你十天轻松搞定 Go 微服务系列(十)- 分布式事务

明天是学习 go 微服务的最初一天,明天是学习分布式事务

2、go-zero 应用 DTM

2.1 增加 DTM 服务配置

参见 第一章 环境搭建,批改 dtm->config.yml 配置文件。咱们只有批改 MicroService 中的 Target,EndPoint 配置即可,将 dtm 注册到 etcd 中。

# ......

# 微服务
MicroService:
  Driver: 'dtm-driver-gozero'           # 要解决注册 / 发现的驱动程序的名称
  Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址
  EndPoint: 'dtm:36790'

# ......

2.2 增加 dtm_barrier 数据表

应用 DTM 提供的子事务屏障技术则须要在业务数据库中创立子事务屏障相干的表,建表语句如下:

create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default '',
  gid varchar(128) default '',
  branch_id varchar(128) default '',
  op varchar(45) default '',
  barrier_id varchar(45) default '',
  reason varchar(45) default ''comment'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
);

留神:库名和表名请勿批改,如果您自定义了表名,请在应用前调用 dtmcli.SetBarrierTableName。

2.3 批改 OrderModel 和 ProductModel

在每一个子事务中,很多操作逻辑,须要应用到本地事务,所以咱们增加一些 model 办法兼容 DTM 的子事务屏障

$ vim mall/service/order/model/ordermodel.go
package model

......

type (
 OrderModel interface {TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)
  TxUpdate(tx *sql.Tx, data *Order) error
    FindOneByUid(uid int64) (*Order, error)
 }
)

......

func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) {query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet)
 ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status)

 return ret, err
}

func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error {productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id)
 _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)
  return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id)
 }, productIdKey)
 return err
}

func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) {
 var resp Order

 query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table)
 err := m.QueryRowNoCache(&resp, query, uid)

 switch err {
 case nil:
  return &resp, nil
 case sqlc.ErrNotFound:
  return nil, ErrNotFound
 default:
  return nil, err
 }
}

$ vim mall/service/product/model/productmodel.go

package model

......

type (
 ProductModel interface {TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error)
 }
)

......

func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) {productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id)
 return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)
  return tx.Exec(query, delta, delta, id)
 }, productIdKey)
}

2.4 批改 product rpc 服务

增加 DecrStock, DecrStockRevert 接口办法

咱们须要为 product rpc 服务增加 DecrStock、DecrStockRevert 两个接口办法,别离用于 产品库存更新 产品库存更新的弥补

$ vim mall/service/product/rpc/product.proto
syntax = "proto3";

package productclient;

option go_package = "product";

......

// 减产品库存
message DecrStockRequest {
    int64 id = 1;
    int64 num = 2;
}
message DecrStockResponse {
}
// 减产品库存

service Product {
    ......
    rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);
    rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);
}

提醒:批改后应用 goctl 工具从新生成下代码。 要在 golang 容器下运行,可参考:Linux 下部署 go-zero,运行 goctl model 运行模板生成命令报错解决办法

$ cd mall/service/product
$ goctl rpc proto -src ./rpc/product.proto -dir ./rpc

实现 DecrStock 接口办法

在这里只有库存有余时,咱们不须要再重试,间接回滚。

$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logic

import (
 "context"
 "database/sql"

 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/codes"
 "google.golang.org/grpc/status"
)

type DecrStockLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic {
 return &DecrStockLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新产品库存
  result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1)
  if err != nil {return err}

  affected, err := result.RowsAffected()
  // 库存有余,返回子事务失败
  if err == nil && affected == 0 {return dtmcli.ErrFailure}

  return err
 })

 // 这种状况是库存有余,不再重试,走回滚
 if err == dtmcli.ErrFailure {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
 }

 if err != nil {return nil, err}

 return &product.DecrStockResponse{}, nil}

实现 DecrStockRevert 接口办法

在 DecrStock 接口办法中,产品库存是减去指定的数量,在这里咱们把它给加回来。这样产品库存就回到在 DecrStock 接口办法减去之前的数量。

$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logic

import (
 "context"
 "database/sql"

 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type DecrStockRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic {
 return &DecrStockRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新产品库存
  _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)
  return err
 })

 if err != nil {return nil, err}

 return &product.DecrStockResponse{}, nil}

2.5 批改 order rpc 服务

2.5.1 增加 CreateRevert 接口办法

order rpc 服务中曾经有 Create 接口办法、咱们须要创立它的弥补接口办法 CreateRevert。

$ vim mall/service/order/rpc/order.proto
syntax = "proto3";

package orderclient;

option go_package = "order";

......

service Order {rpc Create(CreateRequest) returns(CreateResponse);
    rpc CreateRevert(CreateRequest) returns(CreateResponse);
    ......
}

提醒:批改后应用 goctl 工具从新生成下代码。 也要在 golang 容器下运行。

$ cd mall/service/order
$ goctl rpc proto -src ./rpc/order.proto -dir ./rpc

2.5.2 批改 Create 接口办法

原来 Create 接口办法中产品库存判断和更新操作,咱们曾经在 product rpcDecrStock 接口办法中实现了,所以咱们这里只有创立订单一个操作即可。

$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logic

import (
 "context"
 "database/sql"
 "fmt"

 "mall/service/order/model"
 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type CreateLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {
 return &CreateLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查问用户是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{Id: in.Uid,})
  if err != nil {return fmt.Errorf("用户不存在")
  }

  newOrder := model.Order{
   Uid:    in.Uid,
   Pid:    in.Pid,
   Amount: in.Amount,
   Status: 0,
  }
  // 创立订单
  _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)
  if err != nil {return fmt.Errorf("订单创立失败")
  }

  return nil
 }); err != nil {return nil, status.Error(500, err.Error())
 }

 return &order.CreateResponse{}, nil}

2.5.3 实现 CreateRevert 接口办法

在这个接口中咱们查问用户刚刚创立的订单,把订单的状态改为 9(有效状态)。

$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logic

import (
 "context"
 "database/sql"
 "fmt"

 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type CreateRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic {
 return &CreateRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查问用户是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{Id: in.Uid,})
  if err != nil {return fmt.Errorf("用户不存在")
  }
  // 查问用户最新创立的订单
  resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)
  if err != nil {return fmt.Errorf("订单不存在")
  }
  // 批改订单状态 9,标识订单已生效,并更新订单
  resOrder.Status = 9
  err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)
  if err != nil {return fmt.Errorf("订单更新失败")
  }

  return nil
 }); err != nil {return nil, status.Error(500, err.Error())
 }

 return &order.CreateResponse{}, nil}

2.6 批改 order api 服务

咱们把 order rpc 服务 Create、CreateRevert 接口办法,product rpc 服务 DecrStock、DecrStockRevert 接口办法,提到 order api 服务中做成一个以 SAGA 事务模式 的分布式事务操作。

2.6.1 增加 pproduct rpc 依赖配置

$ vim mall/service/order/api/etc/order.yaml
Name: Order
Host: 0.0.0.0
Port: 8002

......

OrderRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: order.rpc

ProductRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: product.rpc

2.6.2 增加 pproduct rpc 服务配置的实例化

$ vim mall/service/order/api/internal/config/config.go
package config

import (
 "github.com/zeromicro/go-zero/rest"
 "github.com/zeromicro/go-zero/zrpc"
)

type Config struct {
 rest.RestConf

 Auth struct {
  AccessSecret string
  AccessExpire int64
 }

 OrderRpc   zrpc.RpcClientConf
 ProductRpc zrpc.RpcClientConf
}

2.6.3 注册服务上下文 pproduct rpc 的依赖

$ vim mall/service/order/api/internal/svc/servicecontext.go
package svc

import (
 "mall/service/order/api/internal/config"
 "mall/service/order/rpc/orderclient"
 "mall/service/product/rpc/productclient"

 "github.com/zeromicro/go-zero/zrpc"
)

type ServiceContext struct {
 Config config.Config

 OrderRpc   orderclient.Order
 ProductRpc productclient.Product
}

func NewServiceContext(c config.Config) *ServiceContext {
 return &ServiceContext{
  Config:     c,
  OrderRpc:   orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),
  ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
 }
}

2.6.4 增加导入 gozero 的 dtm 驱动

$ vim mall/service/order/api/order.go
package main

import (
 ......

 _ "github.com/dtm-labs/driver-gozero" // 增加导入 `gozero` 的 `dtm` 驱动
)

var configFile = flag.String("f", "etc/order.yaml", "the config file")

func main() {......}

2.6.5 批改 order apiCreate 接口办法

$ vim mall/service/order/api/internal/logic/createlogic.go
package logic

import (
 "context"

 "mall/service/order/api/internal/svc"
 "mall/service/order/api/internal/types"
 "mall/service/order/rpc/order"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "google.golang.org/grpc/status"
)

type CreateLogic struct {
 logx.Logger
 ctx    context.Context
 svcCtx *svc.ServiceContext
}

func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic {
 return CreateLogic{Logger: logx.WithContext(ctx),
  ctx:    ctx,
  svcCtx: svcCtx,
 }
}

func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) {
 // 获取 OrderRpc BuildTarget
 orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()
 if err != nil {return nil, status.Error(100, "订单创立异样")
 }

 // 获取 ProductRpc BuildTarget
 productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()
 if err != nil {return nil, status.Error(100, "订单创立异样")
 }

 // dtm 服务的 etcd 注册地址
 var dtmServer = "etcd://etcd:2379/dtmservice"
 // 创立一个 gid
 gid := dtmgrpc.MustGenGid(dtmServer)
 // 创立一个 saga 协定的事务
 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
  Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{
   Uid:    req.Uid,
   Pid:    req.Pid,
   Amount: req.Amount,
   Status: 0,
  }).
  Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{
   Id:  req.Pid,
   Num: 1,
  })

 // 事务提交
 err = saga.Submit()
 if err != nil {return nil, status.Error(500, err.Error())
 }

 return &types.CreateResponse{}, nil}
  • 提醒:SagaGrpc.Add 办法第一个参数 action 是微服务 grpc 拜访的办法门路,这个办法门路须要别离去以下文件中寻找。
mall/service/order/rpc/order/order.pb.go
mall/service/product/rpc/product/product.pb.go

按关键字 Invoke 搜寻即可找到。

3 测试 go-zero + DTM

3.1 测试分布式事务失常流程

应用 postman 调用 /api/product/create 接口,创立一个产品,库存 stock 为 1。

可是报错 401,no token present in request,这是因为创立商品的时候没有主动带登录的时候的 token,没有权限创立

解决形式能够参考:新版 Postman 设置所有申请都主动带 token 登陆权限验证(Postman 版本 9.10.0)

应用 postman 调用 /api/order/create 接口,创立一个订单,记得和下面一样设置申请主动带 token 登陆权限验证,产品 ID pid 为 1。

咱们能够看出,产品的库存从原来的 1 曾经变成了 0。

咱们再看下子事务屏障表 barrier 里的数据,咱们能够看出两个服务的操作均曾经实现。

4、系列

学习笔记:带你十天轻松搞定 Go 微服务系列(一)- 环境搭建
学习笔记:带你十天轻松搞定 Go 微服务系列(二)- 服务拆分
学习笔记:带你十天轻松搞定 Go 微服务系列(三)- 用户服务
学习笔记:带你十天轻松搞定 Go 微服务系列(四)- 产品服务
学习笔记:带你十天轻松搞定 Go 微服务系列(五)- 订单服务
学习笔记:带你十天轻松搞定 Go 微服务系列(六)- 领取服务
学习笔记:带你十天轻松搞定 Go 微服务系列(七)- RPC 服务 Auth 验证
学习笔记:带你十天轻松搞定 Go 微服务系列(八)- 服务监控
学习笔记:带你十天轻松搞定 Go 微服务系列(九)- 链路追踪
学习笔记:带你十天轻松搞定 Go 微服务系列大结局(十)- 分布式事务

退出移动版