序言
咱们通过一个系列文章跟大家具体展现一个 go-zero 微服务示例,整个系列分十篇文章,目录构造如下:
- 环境搭建
- 服务拆分
- 用户服务
- 产品服务
- 订单服务
- 领取服务
- RPC 服务 Auth 验证
- 服务监控
- 链路追踪
- 分布式事务(本文)
冀望通过本系列带你在本机利用 Docker 环境利用 go-zero 疾速开发一个商城零碎,让你疾速上手微服务。
残缺示例代码:https://github.com/nivin-studio/go-zero-mall
首先,咱们来看一下整体的服务拆分图:
10.1 DTM
介绍
DTM 是一款 golang
开发的分布式事务管理器,解决了跨数据库、跨服务、跨语言栈更新数据的一致性问题。
绝大多数的订单零碎的事务都会跨服务,因而都有更新数据一致性的需要,都能够通过 DTM 大幅简化架构,造成一个优雅的解决方案。
而且 DTM 曾经深度单干,原生的反对go-zero中的分布式事务,上面就来具体的解说如何用 DTM 来帮忙咱们的订单零碎解决一致性问题
10.2 go-zero
应用 DTM
首先咱们回顾下 第五章 订单服务 中 order rpc
服务中 Create
接口解决逻辑。办法里判断了用户和产品的合法性,以及产品库存是否短缺,最初通过 OrderModel
创立了一个新的订单,以及调用 product rpc
服务 Update
的接口更新了产品的库存。
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) { // 查问用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return nil, err } // 查问产品是否存在 productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{ Id: in.Pid, }) if err != nil { return nil, err } // 判断产品库存是否短缺 if productRes.Stock <= 0 { return nil, status.Error(500, "产品库存有余") } newOrder := model.Order{ Uid: in.Uid, Pid: in.Pid, Amount: in.Amount, Status: 0, } res, err := l.svcCtx.OrderModel.Insert(&newOrder) if err != nil { return nil, status.Error(500, err.Error()) } newOrder.Id, err = res.LastInsertId() if err != nil { return nil, status.Error(500, err.Error()) } _, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{ Id: productRes.Id, Name: productRes.Name, Desc: productRes.Desc, Stock: productRes.Stock - 1, Amount: productRes.Amount, Status: productRes.Status, }) if err != nil { return nil, err } return &order.CreateResponse{ Id: newOrder.Id, }, nil}
之前咱们说过,这里解决逻辑存在数据一致性问题,有可能订单创立胜利了,然而在更新产品库存的时候可能会产生失败,这时候就会存在订单创立胜利,产品库存没有缩小的状况。
因为这里的产品库存更新是跨服务操作的,也没有方法应用本地事务来解决,所以咱们须要应用分布式事务来解决它。这里咱们须要借助 DTM
的 SAGA
协定来实现订单创立和产品库存更新的跨服务分布式事务操作。
大家能够先移步到 DTM
的文档先了接下 SAGA事务模式。
10.2.1 增加 DTM
服务配置
参见 第一章 环境搭建,批改 dtm->config.yml
配置文件。咱们只有批改 MicroService
中的 Target
,EndPoint
配置即可,将 dtm
注册到 etcd
中。
# ......# 微服务MicroService: Driver: 'dtm-driver-gozero' # 要解决注册/发现的驱动程序的名称 Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址 EndPoint: 'dtm:36790'# ......
10.2.2 增加 dtm_barrier
数据表
微服务是一个分布式系统,因而可能产生各种异样,例如网络抖动导致反复申请,这类的异样会让业务解决异样简单。而 DTM
中,独创了 子事务屏障 技术,应用该技术,可能十分便捷的解决异样问题,极大的升高了分布式事务的应用门槛。
应用 DTM
提供的子事务屏障技术则须要在业务数据库中创立子事务屏障相干的表,建表语句如下:
create database if not exists dtm_barrier/*!40100 DEFAULT CHARACTER SET utf8mb4 */;drop table if exists dtm_barrier.barrier;create table if not exists dtm_barrier.barrier( id bigint(22) PRIMARY KEY AUTO_INCREMENT, trans_type varchar(45) default '', gid varchar(128) default '', branch_id varchar(128) default '', op varchar(45) default '', barrier_id varchar(45) default '', reason varchar(45) default '' comment 'the branch type who insert this record', create_time datetime DEFAULT now(), update_time datetime DEFAULT now(), key(create_time), key(update_time), UNIQUE key(gid, branch_id, op, barrier_id));
留神:库名和表名请勿批改,如果您自定义了表名,请在应用前调用 dtmcli.SetBarrierTableName
。
10.2.3 批改 OrderModel
和 ProductModel
在每一个子事务中,很多操作逻辑,须要应用到本地事务,所以咱们增加一些 model
办法兼容 DTM
的子事务屏障
$ vim mall/service/order/model/ordermodel.go
package model......type ( OrderModel interface { TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) TxUpdate(tx *sql.Tx, data *Order) error FindOneByUid(uid int64) (*Order, error) })......func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) { query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet) ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status) return ret, err}func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error { productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id) _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder) return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id) }, productIdKey) return err}func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) { var resp Order query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table) err := m.QueryRowNoCache(&resp, query, uid) switch err { case nil: return &resp, nil case sqlc.ErrNotFound: return nil, ErrNotFound default: return nil, err }}
$ vim mall/service/product/model/productmodel.go
package model......type ( ProductModel interface { TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) })......func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) { productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id) return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table) return tx.Exec(query, delta, delta, id) }, productIdKey)}
10.2.4 批改 product rpc
服务
增加
DecrStock
,DecrStockRevert
接口办法咱们须要为
product rpc
服务增加DecrStock
、DecrStockRevert
两个接口办法,别离用于产品库存更新 和 产品库存更新的弥补。
$ vim mall/service/product/rpc/product.proto
syntax = "proto3";package productclient;option go_package = "product";......// 减产品库存message DecrStockRequest { int64 id = 1; int64 num = 2;}message DecrStockResponse {}// 减产品库存service Product { ...... rpc DecrStock(DecrStockRequest) returns(DecrStockResponse); rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);}
提醒:批改后应用 goctl 工具从新生成下代码。
实现
DecrStock
接口办法在这里只有库存有余时,咱们不须要再重试,间接回滚。
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logicimport ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/codes" "google.golang.org/grpc/status")type DecrStockLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger}func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic { return &DecrStockLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), }}func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error { // 更新产品库存 result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1) if err != nil { return err } affected, err := result.RowsAffected() // 库存有余,返回子事务失败 if err == nil && affected == 0 { return dtmcli.ErrFailure } return err }) // 这种状况是库存有余,不再重试,走回滚 if err == dtmcli.ErrFailure { return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } if err != nil { return nil, err } return &product.DecrStockResponse{}, nil}
实现
DecrStockRevert
接口办法在
DecrStock
接口办法中,产品库存是减去指定的数量,在这里咱们把它给加回来。这样产品库存就回到在DecrStock
接口办法减去之前的数量。
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logicimport ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status")type DecrStockRevertLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger}func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic { return &DecrStockRevertLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), }}func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error { // 更新产品库存 _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1) return err }) if err != nil { return nil, err } return &product.DecrStockResponse{}, nil}
10.2.5 批改 order rpc
服务
增加
CreateRevert
接口办法order rpc
服务中曾经有Create
接口办法、咱们须要创立它的弥补接口办法CreateRevert
。
$ vim mall/service/order/rpc/order.proto
syntax = "proto3";package orderclient;option go_package = "order";......service Order { rpc Create(CreateRequest) returns(CreateResponse); rpc CreateRevert(CreateRequest) returns(CreateResponse); ......}
提醒:批改后应用 goctl 工具从新生成下代码。
批改
Create
接口办法原来
Create
接口办法中产品库存判断和更新操作,咱们曾经在product rpc
DecrStock
接口办法中实现了,所以咱们这里只有创立订单一个操作即可。
$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logicimport ( "context" "database/sql" "fmt" "mall/service/order/model" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status")type CreateLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger}func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic { return &CreateLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), }}func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { // 查问用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return fmt.Errorf("用户不存在") } newOrder := model.Order{ Uid: in.Uid, Pid: in.Pid, Amount: in.Amount, Status: 0, } // 创立订单 _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder) if err != nil { return fmt.Errorf("订单创立失败") } return nil }); err != nil { return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil}
实现
CreateRevert
接口办法在这个接口中咱们查问用户刚刚创立的订单,把订单的状态改为
9(有效状态)
。
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logicimport ( "context" "database/sql" "fmt" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status")type CreateRevertLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger}func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic { return &CreateRevertLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), }}func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { // 查问用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return fmt.Errorf("用户不存在") } // 查问用户最新创立的订单 resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid) if err != nil { return fmt.Errorf("订单不存在") } // 批改订单状态9,标识订单已生效,并更新订单 resOrder.Status = 9 err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder) if err != nil { return fmt.Errorf("订单更新失败") } return nil }); err != nil { return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil}
10.2.6 批改 order api
服务
咱们把 order rpc
服务 Create
、CreateRevert
接口办法,product rpc
服务 DecrStock
、DecrStockRevert
接口办法,提到 order api
服务中做成一个以 SAGA事务模式
的分布式事务操作。
增加
pproduct rpc
依赖配置$ vim mall/service/order/api/etc/order.yaml
Name: OrderHost: 0.0.0.0Port: 8002......OrderRpc: Etcd: Hosts: - etcd:2379 Key: order.rpcProductRpc: Etcd: Hosts: - etcd:2379 Key: product.rpc
增加
pproduct rpc
服务配置的实例化$ vim mall/service/order/api/internal/config/config.go
package configimport ( "github.com/tal-tech/go-zero/rest" "github.com/tal-tech/go-zero/zrpc")type Config struct { rest.RestConf Auth struct { AccessSecret string AccessExpire int64 } OrderRpc zrpc.RpcClientConf ProductRpc zrpc.RpcClientConf}
注册服务上下文
pproduct rpc
的依赖$ vim mall/service/order/api/internal/svc/servicecontext.go
package svcimport ( "mall/service/order/api/internal/config" "mall/service/order/rpc/orderclient" "mall/service/product/rpc/productclient" "github.com/tal-tech/go-zero/zrpc")type ServiceContext struct { Config config.Config OrderRpc orderclient.Order ProductRpc productclient.Product}func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, OrderRpc: orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)), ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)), }}
增加导入
gozero
的dtm
驱动$ vim mall/service/order/api/order.go
package mainimport ( ...... _ "github.com/dtm-labs/driver-gozero" // 增加导入 `gozero` 的 `dtm` 驱动)var configFile = flag.String("f", "etc/order.yaml", "the config file")func main() { ......}
批改
order api
Create
接口办法$ vim mall/service/order/api/internal/logic/createlogic.go
package logicimport ( "context" "mall/service/order/api/internal/svc" "mall/service/order/api/internal/types" "mall/service/order/rpc/order" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "google.golang.org/grpc/status")type CreateLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext}func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic { return CreateLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, }}func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) { // 获取 OrderRpc BuildTarget orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget() if err != nil { return nil, status.Error(100, "订单创立异样") } // 获取 ProductRpc BuildTarget productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget() if err != nil { return nil, status.Error(100, "订单创立异样") } // dtm 服务的 etcd 注册地址 var dtmServer = "etcd://etcd:2379/dtmservice" // 创立一个gid gid := dtmgrpc.MustGenGid(dtmServer) // 创立一个saga协定的事务 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid). Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{ Uid: req.Uid, Pid: req.Pid, Amount: req.Amount, Status: 0, }). Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{ Id: req.Pid, Num: 1, }) // 事务提交 err = saga.Submit() if err != nil { return nil, status.Error(500, err.Error()) } return &types.CreateResponse{}, nil}
提醒:SagaGrpc.Add
办法第一个参数action
是微服务grpc
拜访的办法门路,这个办法门路须要别离去以下文件中寻找。mall/service/order/rpc/order/order.pb.go
mall/service/product/rpc/product/product.pb.go
按关键字Invoke
搜寻即可找到。
10.3 测试 go-zero
+ DTM
10.3.1 测试分布式事务失常流程
- 应用
postman
调用/api/product/create
接口,创立一个产品,库存stock
为1
。
- 应用
postman
调用/api/order/create
接口,创立一个订单,产品IDpid
为1
。
- 咱们能够看出,产品的库存从原来的
1
曾经变成了0
。
- 咱们再看下子事务屏障表
barrier
里的数据,咱们能够看出两个服务的操作均曾经实现。
10.3.2 测试分布式事务失败流程1
- 接着下面测试后果,此时的产品ID为
1
的库存曾经是0
了, 应用postman
调用/api/order/create
接口,再创立一个订单。
- 咱们看下订单数据表里有一条ID为
2
产品ID为1
的数据,它的订单数据状态为9
。
- 咱们再看下子事务屏障表
barrier
里的数据,咱们能够看出(gid = fqYS8CbYbK8GkL8SCuTRUF)
第一个服务(branch_id = 01)
子事务屏障操作是失常,第二个服务(branch_id = 02)
子事务屏障操作失败,要求弥补。于是两个服务都产生了弥补的操作记录。
这个分布式事务的操作流程
- 首先
DTM
服务会调order rpc
Create
接口进行创立订单解决。 - 创立订单实现后
DTM
服务再调product rpc
DecrStock
接口,这个接口的里通过pid
更新产品库存,因产品库存有余,抛出事务失败。 DTM
服务发动弥补机制,调order rpc
CreateRevert
接口进行订单的弥补解决。DTM
服务发动弥补机制,调product rpc
DecrStockRevert
接口进行产品库存更新的弥补解决。然而因为在product rpc
DecrStock
接口的子事务屏障内,业务解决并未胜利。所以在DecrStockRevert
接口里不会执行子事务屏障内的业务逻辑。
- 首先
10.3.3 测试分布式事务失败流程2
- 咱们在数据库中手动将产品ID为
1
库存批改为100,而后在product rpc
DecrStock
接口办法中子事务屏障外,人为的制作异样失败。
- 应用
postman
调用/api/order/create
接口,再创立一个订单,产品IDpid
为1
。
- 咱们别离来看下订单数据表和产品数据表,订单数据表ID为
3
的订单,它的订单数据状态为9
。产品数据表ID为1
的产品,它的库存还是100
且数据更新工夫也产生了变动。
- 咱们再看下子事务屏障表
barrier
里的数据,咱们能够看出(gid = ZbjYHv2jNra7RMwyWjB5Lc)
第一个服务(branch_id = 01)
子事务屏障操作是失常,第二个服务(branch_id = 02)
子事务屏障操作也是失常。因为在product rpc
DecrStock
接口办法中子事务屏障外,咱们人为的制作异样失败,所以两个服务产生了弥补的操作记录。
大家能够比照下 测试分布式事务失败流程1 与 测试分布式事务失败流程2 不同之处,是不是能发现和领会到 DTM
的这个子事务屏障技术的弱小之处。
子事务屏障会自动识别正向操作是否已执行,失败流程1未执行业务操作,所以弥补时,也不会执行弥补的业务操作;失败流程2执行了业务操作,所以弥补时,也会执行弥补的业务操作。
我的项目地址
https://github.com/zeromicro/go-zero
欢送应用 go-zero
并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并点击 交换群 获取社区群二维码。