1、学习课程

带你十天轻松搞定 Go 微服务系列(十)- 分布式事务

明天是学习 go 微服务的最初一天,明天是学习分布式事务

2、go-zero 应用 DTM

2.1 增加 DTM 服务配置

参见 第一章 环境搭建,批改 dtm->config.yml 配置文件。咱们只有批改 MicroService 中的 Target,EndPoint 配置即可,将 dtm 注册到 etcd 中。

# ......# 微服务MicroService:  Driver: 'dtm-driver-gozero'           # 要解决注册/发现的驱动程序的名称  Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址  EndPoint: 'dtm:36790'# ......

2.2 增加 dtm_barrier 数据表

应用 DTM 提供的子事务屏障技术则须要在业务数据库中创立子事务屏障相干的表,建表语句如下:

create database if not exists dtm_barrier/*!40100 DEFAULT CHARACTER SET utf8mb4 */;drop table if exists dtm_barrier.barrier;create table if not exists dtm_barrier.barrier(  id bigint(22) PRIMARY KEY AUTO_INCREMENT,  trans_type varchar(45) default '',  gid varchar(128) default '',  branch_id varchar(128) default '',  op varchar(45) default '',  barrier_id varchar(45) default '',  reason varchar(45) default '' comment 'the branch type who insert this record',  create_time datetime DEFAULT now(),  update_time datetime DEFAULT now(),  key(create_time),  key(update_time),  UNIQUE key(gid, branch_id, op, barrier_id));

留神:库名和表名请勿批改,如果您自定义了表名,请在应用前调用 dtmcli.SetBarrierTableName。

2.3 批改 OrderModel 和 ProductModel

在每一个子事务中,很多操作逻辑,须要应用到本地事务,所以咱们增加一些 model 办法兼容 DTM 的子事务屏障

$ vim mall/service/order/model/ordermodel.go
package model......type ( OrderModel interface {  TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)  TxUpdate(tx *sql.Tx, data *Order) error    FindOneByUid(uid int64) (*Order, error) })......func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) { query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet) ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status) return ret, err}func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error { productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id) _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {  query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)  return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id) }, productIdKey) return err}func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) { var resp Order query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table) err := m.QueryRowNoCache(&resp, query, uid) switch err { case nil:  return &resp, nil case sqlc.ErrNotFound:  return nil, ErrNotFound default:  return nil, err }}$ vim mall/service/product/model/productmodel.gopackage model......type ( ProductModel interface {  TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) })......func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) { productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id) return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {  query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)  return tx.Exec(query, delta, delta, id) }, productIdKey)}

2.4 批改 product rpc 服务

增加 DecrStock, DecrStockRevert 接口办法

咱们须要为 product rpc 服务增加 DecrStock、DecrStockRevert 两个接口办法,别离用于 产品库存更新产品库存更新的弥补

$ vim mall/service/product/rpc/product.proto
syntax = "proto3";package productclient;option go_package = "product";......// 减产品库存message DecrStockRequest {    int64 id = 1;    int64 num = 2;}message DecrStockResponse {}// 减产品库存service Product {    ......    rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);    rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);}

提醒:批改后应用 goctl 工具从新生成下代码。 要在 golang 容器下运行,可参考:Linux下部署go-zero,运行goctl model运行模板生成命令报错解决办法

$ cd mall/service/product$ goctl rpc proto -src ./rpc/product.proto -dir ./rpc

实现 DecrStock 接口办法

在这里只有库存有余时,咱们不须要再重试,间接回滚。

$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logicimport ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" "google.golang.org/grpc/codes" "google.golang.org/grpc/status")type DecrStockLogic struct { ctx    context.Context svcCtx *svc.ServiceContext logx.Logger}func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic { return &DecrStockLogic{  ctx:    ctx,  svcCtx: svcCtx,  Logger: logx.WithContext(ctx), }}func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil {  return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil {  return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {  // 更新产品库存  result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1)  if err != nil {   return err  }  affected, err := result.RowsAffected()  // 库存有余,返回子事务失败  if err == nil && affected == 0 {   return dtmcli.ErrFailure  }  return err }) // 这种状况是库存有余,不再重试,走回滚 if err == dtmcli.ErrFailure {  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } if err != nil {  return nil, err } return &product.DecrStockResponse{}, nil}

实现 DecrStockRevert 接口办法

在 DecrStock 接口办法中,产品库存是减去指定的数量,在这里咱们把它给加回来。这样产品库存就回到在 DecrStock 接口办法减去之前的数量。

$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logicimport ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" "google.golang.org/grpc/status")type DecrStockRevertLogic struct { ctx    context.Context svcCtx *svc.ServiceContext logx.Logger}func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic { return &DecrStockRevertLogic{  ctx:    ctx,  svcCtx: svcCtx,  Logger: logx.WithContext(ctx), }}func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil {  return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil {  return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {  // 更新产品库存  _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)  return err }) if err != nil {  return nil, err } return &product.DecrStockResponse{}, nil}

2.5 批改 order rpc 服务

2.5.1 增加 CreateRevert 接口办法

order rpc 服务中曾经有 Create 接口办法、咱们须要创立它的弥补接口办法 CreateRevert。

$ vim mall/service/order/rpc/order.proto
syntax = "proto3";package orderclient;option go_package = "order";......service Order {    rpc Create(CreateRequest) returns(CreateResponse);    rpc CreateRevert(CreateRequest) returns(CreateResponse);    ......}

提醒:批改后应用 goctl 工具从新生成下代码。 也要在 golang 容器下运行。

$ cd mall/service/order$ goctl rpc proto -src ./rpc/order.proto -dir ./rpc

2.5.2 批改 Create 接口办法

原来 Create 接口办法中产品库存判断和更新操作,咱们曾经在 product rpcDecrStock 接口办法中实现了,所以咱们这里只有创立订单一个操作即可。

$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logicimport ( "context" "database/sql" "fmt" "mall/service/order/model" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" "google.golang.org/grpc/status")type CreateLogic struct { ctx    context.Context svcCtx *svc.ServiceContext logx.Logger}func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic { return &CreateLogic{  ctx:    ctx,  svcCtx: svcCtx,  Logger: logx.WithContext(ctx), }}func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil {  return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil {  return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {  // 查问用户是否存在  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{   Id: in.Uid,  })  if err != nil {   return fmt.Errorf("用户不存在")  }  newOrder := model.Order{   Uid:    in.Uid,   Pid:    in.Pid,   Amount: in.Amount,   Status: 0,  }  // 创立订单  _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)  if err != nil {   return fmt.Errorf("订单创立失败")  }  return nil }); err != nil {  return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil}

2.5.3 实现 CreateRevert 接口办法

在这个接口中咱们查问用户刚刚创立的订单,把订单的状态改为 9(有效状态)。

$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logicimport ( "context" "database/sql" "fmt" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" "google.golang.org/grpc/status")type CreateRevertLogic struct { ctx    context.Context svcCtx *svc.ServiceContext logx.Logger}func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic { return &CreateRevertLogic{  ctx:    ctx,  svcCtx: svcCtx,  Logger: logx.WithContext(ctx), }}func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil {  return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil {  return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {  // 查问用户是否存在  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{   Id: in.Uid,  })  if err != nil {   return fmt.Errorf("用户不存在")  }  // 查问用户最新创立的订单  resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)  if err != nil {   return fmt.Errorf("订单不存在")  }  // 批改订单状态9,标识订单已生效,并更新订单  resOrder.Status = 9  err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)  if err != nil {   return fmt.Errorf("订单更新失败")  }  return nil }); err != nil {  return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil}

2.6 批改 order api 服务

咱们把 order rpc 服务 Create、CreateRevert 接口办法,product rpc 服务 DecrStock、DecrStockRevert 接口办法,提到 order api 服务中做成一个以 SAGA事务模式 的分布式事务操作。

2.6.1 增加 pproduct rpc 依赖配置

$ vim mall/service/order/api/etc/order.yaml
Name: OrderHost: 0.0.0.0Port: 8002......OrderRpc:  Etcd:    Hosts:    - etcd:2379    Key: order.rpcProductRpc:  Etcd:    Hosts:    - etcd:2379    Key: product.rpc

2.6.2 增加 pproduct rpc 服务配置的实例化

$ vim mall/service/order/api/internal/config/config.go
package configimport ( "github.com/zeromicro/go-zero/rest" "github.com/zeromicro/go-zero/zrpc")type Config struct { rest.RestConf Auth struct {  AccessSecret string  AccessExpire int64 } OrderRpc   zrpc.RpcClientConf ProductRpc zrpc.RpcClientConf}

2.6.3 注册服务上下文 pproduct rpc 的依赖

$ vim mall/service/order/api/internal/svc/servicecontext.go
package svcimport ( "mall/service/order/api/internal/config" "mall/service/order/rpc/orderclient" "mall/service/product/rpc/productclient" "github.com/zeromicro/go-zero/zrpc")type ServiceContext struct { Config config.Config OrderRpc   orderclient.Order ProductRpc productclient.Product}func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{  Config:     c,  OrderRpc:   orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),  ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)), }}

2.6.4 增加导入 gozero 的 dtm 驱动

$ vim mall/service/order/api/order.go
package mainimport ( ...... _ "github.com/dtm-labs/driver-gozero" // 增加导入 `gozero` 的 `dtm` 驱动)var configFile = flag.String("f", "etc/order.yaml", "the config file")func main() { ......}

2.6.5 批改 order apiCreate 接口办法

$ vim mall/service/order/api/internal/logic/createlogic.go
package logicimport ( "context" "mall/service/order/api/internal/svc" "mall/service/order/api/internal/types" "mall/service/order/rpc/order" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmgrpc" "github.com/zeromicro/go-zero/core/logx" "google.golang.org/grpc/status")type CreateLogic struct { logx.Logger ctx    context.Context svcCtx *svc.ServiceContext}func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic { return CreateLogic{  Logger: logx.WithContext(ctx),  ctx:    ctx,  svcCtx: svcCtx, }}func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) { // 获取 OrderRpc BuildTarget orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget() if err != nil {  return nil, status.Error(100, "订单创立异样") } // 获取 ProductRpc BuildTarget productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget() if err != nil {  return nil, status.Error(100, "订单创立异样") } // dtm 服务的 etcd 注册地址 var dtmServer = "etcd://etcd:2379/dtmservice" // 创立一个gid gid := dtmgrpc.MustGenGid(dtmServer) // 创立一个saga协定的事务 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).  Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{   Uid:    req.Uid,   Pid:    req.Pid,   Amount: req.Amount,   Status: 0,  }).  Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{   Id:  req.Pid,   Num: 1,  }) // 事务提交 err = saga.Submit() if err != nil {  return nil, status.Error(500, err.Error()) } return &types.CreateResponse{}, nil}
  • 提醒:SagaGrpc.Add 办法第一个参数 action 是微服务 grpc 拜访的办法门路,这个办法门路须要别离去以下文件中寻找。
mall/service/order/rpc/order/order.pb.gomall/service/product/rpc/product/product.pb.go

按关键字 Invoke 搜寻即可找到。

3 测试 go-zero + DTM

3.1 测试分布式事务失常流程

应用 postman 调用 /api/product/create 接口,创立一个产品,库存 stock 为 1。

可是报错 401,no token present in request,这是因为创立商品的时候没有主动带登录的时候的 token,没有权限创立

解决形式能够参考:新版Postman设置所有申请都主动带token登陆权限验证(Postman 版本9.10.0 )

应用 postman 调用 /api/order/create 接口,创立一个订单,记得和下面一样设置申请主动带token登陆权限验证,产品ID pid 为 1。

咱们能够看出,产品的库存从原来的 1 曾经变成了 0。

咱们再看下子事务屏障表 barrier 里的数据,咱们能够看出两个服务的操作均曾经实现。

4、系列

学习笔记:带你十天轻松搞定 Go 微服务系列(一)- 环境搭建
学习笔记:带你十天轻松搞定 Go 微服务系列(二)- 服务拆分
学习笔记:带你十天轻松搞定 Go 微服务系列(三)- 用户服务
学习笔记:带你十天轻松搞定 Go 微服务系列(四)- 产品服务
学习笔记:带你十天轻松搞定 Go 微服务系列(五)- 订单服务
学习笔记:带你十天轻松搞定 Go 微服务系列(六)- 领取服务
学习笔记:带你十天轻松搞定 Go 微服务系列(七)- RPC 服务 Auth 验证
学习笔记:带你十天轻松搞定 Go 微服务系列(八)- 服务监控
学习笔记:带你十天轻松搞定 Go 微服务系列(九)- 链路追踪
学习笔记:带你十天轻松搞定 Go 微服务系列大结局(十)- 分布式事务