乐趣区

关于golang:微服务从代码到k8s部署应有尽有系列九事务精讲

咱们用一个系列来解说从需要到上线、从代码到 k8s 部署、从日志到监控等各个方面的微服务残缺实际。

整个我的项目应用了 go-zero 开发的微服务,根本蕴含了 go-zero 以及相干 go-zero 作者开发的一些中间件,所用到的技术栈根本是 go-zero 项目组的自研组件,根本是 go-zero 全家桶了。

实战我的项目地址:https://github.com/Mikaelemmm…

对于分布式事务

因为本我的项目服务划分绝对独立一些,所以目前没有应用到分布式事务,不过 go-zero 联合 dtm 应用分布式事务的最佳实际,我有整顿 demo,这里就介绍一下 go-zero 联合 dtm 的应用,我的项目地址 go-zero 联合 dtm 最佳实际仓库地址:https://github.com/Mikaelemmm…

【注】上面说的不是 go-zero-looklook 我的项目,是这个我的项目 https://github.com/Mikaelemmm…

一、注意事项

  • go-zero 1.2.4 版本以上,这个肯定要留神
  • dtm 你用最新的就行了

二、clone dtm

git clone https://github.com/yedf/dtm.git

三、配置文件

1、找到我的项目跟文件夹下的 conf.sample.yml

2、cp conf.sample.yml conf.yml

3、应用 etcd,把配置中上面这段正文关上(如果没用 etcd 就更简略了,这个都省了,间接链接到 dtm server 地址就能够了)

MicroService:
  Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover
  Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
  EndPoint: 'localhost:36790'

解释一下:

MicroService 这个不要动,这个代表要对把 dtm 注册到那个微服务服务集群外面去,使微服务集群外部服务能够通过 grpc 间接跟 dtm 交互

Driver:’dtm-driver-gozero’,应用 go-zero 的注册服务发现驱动,反对 go-zero

Target: ‘etcd://localhost:2379/dtmservice’ 将以后 dtm 的 server 间接注册到微服务所在的 etcd 集群中,如果 go-zero 作为微服务应用的话,就能够间接通过 etcd 拿到 dtm 的 server grpc 链接,间接就能够跟 dtm server 交互了

EndPoint: ‘localhost:36790’,代表的是 dtm 的 server 的连贯地址 + 端口,集群中的微服务能够间接通过 etcd 取得此地址跟 dtm 交互了,

如果你本人去改了 dtm 源码 grpc 端口,记得这里要改下端口

四、启动 dtm server

在 dtm 我的项目根目录下

go run app/main.go dev

五、应用 go-zero 的 grpc 对接 dtm

这是一个疾速下单扣商品库存的例子

1、order-api

order-api 是 http 服务入口创立订单

service order {
   @doc "创立订单"
   @handler create
   post /order/quickCreate (QuickCreateReq) returns (QuickCreateResp)
}

接下来看 logic

func (l *CreateLogic) Create(req types.QuickCreateReq,r *http.Request) (*types.QuickCreateResp, error) {orderRpcBusiServer, err := l.svcCtx.Config.OrderRpcConf.BuildTarget()
    if err != nil{return nil,fmt.Errorf("下单异样超时")
    }
    stockRpcBusiServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()
    if err != nil{return nil,fmt.Errorf("下单异样超时")
    }

    createOrderReq:= &order.CreateReq{UserId: req.UserId,GoodsId: req.GoodsId,Num: req.Num}
    deductReq:= &stock.DecuctReq{GoodsId: req.GoodsId,Num: req.Num}

    // 这里只举了 saga 例子,tcc 等其余例子根本没啥区别具体能够看 dtm 官网

    gid := dtmgrpc.MustGenGid(dtmServer)
    saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
        Add(orderRpcBusiServer+"/pb.order/create", orderRpcBusiServer+"/pb.order/createRollback", createOrderReq).
        Add(stockRpcBusiServer+"/pb.stock/deduct", stockRpcBusiServer+"/pb.stock/deductRollback", deductReq)

    err = saga.Submit()
    dtmimp.FatalIfError(err)
    if err != nil{return nil,fmt.Errorf("submit data to  dtm-server err  : %+v \n",err)
    }

    return &types.QuickCreateResp{}, nil}

进入到下单逻辑时,别离获取 order 订单、stock 库存服务的 rpc 在 etcd 中的地址,应用 BuildTarget()这个办法

而后创立 order、stock 对应的申请参数

申请 dtm 获取全局事务 id,基于此全局事务 id 开启 grpc 的 saga 分布式事务,将创立订单、扣减库存申请放入事务中,这里应用 grpc 模式申请,每个业务要有一个正向申请、一个回滚申请、以及申请参数,当执行其中任何一个业务正向申请出错了会主动调用事务中所有业务回滚申请达到回滚成果。

2、order-srv

order-srv 是订单的 rpc 服务,与 dtm-gozero-order 数据库中 order 表交互

// service
service order {rpc create(CreateReq)returns(CreateResp);
   rpc createRollback(CreateReq)returns(CreateResp);
}

2.1 Create

当 order-api 提交事务默认申请的是 create 办法,咱们看看 logic

func (l *CreateLogic) Create(in *pb.CreateReq) (*pb.CreateResp, error) {fmt.Printf("创立订单 in : %+v \n", in)

   // barrier 避免空弥补、空悬挂等具体看 dtm 官网即可,别忘记加 barrier 表在以后库中,因为判断弥补与要执行的 sql 一起本地事务
   barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
   db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()
   if err != nil {
      // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
      return nil, status.Error(codes.Internal, err.Error())
   }
   if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {order := new(model.Order)
      order.GoodsId = in.GoodsId
      order.Num = in.Num
      order.UserId = in.UserId

      _, err = l.svcCtx.OrderModel.Insert(tx, order)
      if err != nil {return fmt.Errorf("创立订单失败 err : %v , order:%+v \n", err, order)
      }

      return nil
   }); err != nil {
      // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
      return nil, status.Error(codes.Internal, err.Error())
   }

   return &pb.CreateResp{}, nil}

能够看到,一进入办法外部咱们就应用了 dtm 的子事务屏障技术,至于为什么应用子事务屏障是因为可能会呈现反复申请或者空申请造成的脏数据等,在这里 dtm 主动给咱们做了幂等解决不须要咱们本人在额定做了,同时保障他外部的幂等解决跟咱们本人执行的事务要在一个事务中,所以要应用一个会话的 db 链接,这时候咱们就要先获取

db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()

而后基于此 db 连贯 dtm 在外部通过 sql 执行做幂等解决,同时咱们基于此 db 连贯开启事务,这样就能保障 dtm 外部的子事务屏障在执行 sql 操作与咱们本人业务执行的 sql 操作在一个事务中。

dtm 在应用 grpc 调用咱们业务的时候,咱们的 grpc 服务在返回给 dtm server 谬误时候,dtm 会依据咱们返回给它的 grpc 错误码判断是要执行回滚操作还是始终重试:

  • codes.Internal:dtm server 不会调用回滚,会始终重试,每次重试 dtm 的数据库中都会加一次重试次数,本人能够监控这个重试次数报警,人工解决
  • codes.Aborted : dtm server 会调用所有回滚申请,执行回滚操作

如果 dtm 在调用 grpc 返回谬误是 nil 时候,就认为调用胜利了

2.2 CreateRollback

当咱们调用订单的创立订单或者库存扣减时候返回给 dtm server 的 codes.Aborted 时候,dtm server 会调用所有回滚操作,CreateRollback 就是对应订单下单的回滚操作,代码如下

func (l *CreateRollbackLogic) CreateRollback(in *pb.CreateReq) (*pb.CreateResp, error) {fmt.Printf("订单回滚  , in: %+v \n", in)

    order, err := l.svcCtx.OrderModel.FindLastOneByUserIdGoodsId(in.UserId, in.GoodsId)
    if err != nil && err != model.ErrNotFound {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, status.Error(codes.Internal, err.Error())
    }

    if order != nil {barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
        db, err := l.svcCtx.OrderModel.SqlDB()
        if err != nil {
            // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
            return nil, status.Error(codes.Internal, err.Error())
        }
        if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {

            order.RowState = -1
            if err := l.svcCtx.OrderModel.Update(tx, order); err != nil {return fmt.Errorf("回滚订单失败  err : %v , userId:%d , goodsId:%d", err, in.UserId, in.GoodsId)
            }

            return nil
        }); err != nil {logx.Errorf("err : %v \n", err)

            // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
            return nil, status.Error(codes.Internal, err.Error())
        }

    }
    return &pb.CreateResp{}, nil}

其实就是如果之前下单胜利了,将之前下胜利的单给勾销掉就是对应下单的回滚操作

3、stock-srv

3.1 Deduct

扣减库存,这里跟 order 的 Create 一样了,是下单事务内的正向操作,扣减库存,代码如下

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {fmt.Printf("扣库存 start....")

    stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
    if err != nil && err != model.ErrNotFound {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, status.Error(codes.Internal, err.Error())
    }
    if stock == nil || stock.Num < in.Num {
        //【回滚】库存有余确定须要 dtm 间接回滚,间接返回 codes.Aborted, dtmcli.ResultFailure 才能够回滚
        return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
    }

    // barrier 避免空弥补、空悬挂等具体看 dtm 官网即可,别忘记加 barrier 表在以后库中,因为判断弥补与要执行的 sql 一起本地事务
    barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
    db, err := l.svcCtx.StockModel.SqlDB()
    if err != nil {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, status.Error(codes.Internal, err.Error())
    }
    if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)
        if err != nil{
            // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
            return status.Error(codes.Internal, err.Error())
        }
        affected, err := sqlResult.RowsAffected()
        if err != nil{
            // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
            return status.Error(codes.Internal, err.Error())
        }

        // 如果是影响行数为 0,间接就通知 dtm 失败不须要重试了
        if affected <= 0 {return  status.Error(codes.Aborted,  dtmcli.ResultFailure)
        }

        //!!开启测试!!测试订单回滚更改状态为生效,并且以后库扣失败不须要回滚
        //return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in)

        return nil
    }); err != nil {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil,err
    }

    return &pb.DeductResp{}, nil}

这里值得注意的是当只有库存有余、或者在扣库存影响行数为 0(未胜利)才须要通知 dtm server 要回滚,其余状况下其实都是网络抖动、硬件异样导致,应该让 dtm server 始终重试,当然本人要加个最大重试次数的监控报警,如果达到最大次数还未胜利能实现主动发短信、打电话人工染指了。

3.2 DeductRollback

这里是对应扣库存的回滚操作

func (l *DeductRollbackLogic) DeductRollback(in *pb.DecuctReq) (*pb.DeductResp, error) {fmt.Printf("库存回滚 in : %+v \n", in)

    barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
    db, err := l.svcCtx.StockModel.SqlDB()
    if err != nil {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, status.Error(codes.Internal, err.Error())
    }
    if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {if err := l.svcCtx.StockModel.AddStock(tx, in.GoodsId, in.Num); err != nil {return fmt.Errorf("回滚库存失败 err : %v ,goodsId:%d , num :%d", err, in.GoodsId, in.Num)
        }
        return nil
    }); err != nil {logx.Errorf("err : %v \n", err)
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, status.Error(codes.Internal, err.Error())
    }

    return &pb.DeductResp{}, nil}

六、子事务屏障

这个词是 dtm 作者定义的,其实子事务屏障代码不多,看 barrier.CallWithDB 这个办法即可。

// CallWithDB the same as Call, but with *sql.DB
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {tx, err := db.Begin()   
  if err != nil {return err}   
  return bb.Call(tx, busiCall)
}

因为这个办法他在外部开启本地事务,它外部是在此事务执行了 sql 操作,所以在咱们执行本人的业务时候必须跟它用同一个事务,那就要基于同一个 db 连贯开事务了,so~ 你晓得为什么咱们要提前获取 db 连贯了吧,目标就是要让它外部执行的 sql 操作跟咱们的 sql 操作在一个事务下。至于它外部为什么执行本人的 sql 操作,接下来咱们剖析。

咱们看 bb.Call 这个办法

// Call 子事务屏障,具体介绍见 https://zhuanlan.zhihu.com/p/388444465
// tx: 本地数据库的事务对象,容许子事务屏障进行事务操作
// busiCall: 业务函数,仅在必要时被调用
func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
 bb.BarrierID = bb.BarrierID + 1
 bid := fmt.Sprintf("%02d", bb.BarrierID)
 defer func() {// Logf("barrier call error is %v", rerr)
  if x := recover(); x != nil {tx.Rollback()
   panic(x)
  } else if rerr != nil {tx.Rollback()
  } else {tx.Commit()
  }
 }()
 ti := bb
 originType := map[string]string{
  BranchCancel:     BranchTry,
  BranchCompensate: BranchAction,
 }[ti.Op]

 originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
 currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
 dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
 if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补
  currentAffected == 0 { // 这个是反复申请或者悬挂
  return
 }
 rerr = busiCall(tx)
 return
}

外围其实就是如下几行代码

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)    
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)    
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)    
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补        
  currentAffected == 0 { // 这个是反复申请或者悬挂        
  return    
}
rerr = busiCall(tx)
func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {
  if op == "" {return 0, nil}    

  sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")    

  return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}

每一个业务逻辑,dtm server 在失常胜利申请时候,ti.Op 默认失常执行的操作是 action,所以失常第一次申请都是 ti.Op 值都是 action,那 originType 就是“”

    originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)

那么下面这个 sql 就不会执行因为 ti.Op == “” 在 insertBarrier 中间接 return 了

    currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

那第二个 sql 的 ti.Op 是 action,所以子事务屏障表 barrier 就会插入一条数据

同理在执行库存也会插入一条

1、整个事务都胜利的子事务屏障

那在一次下订单失常胜利申请下,因为 ti.Op 都是 action,所以 originType 都是 ”” , 所以不论是下单的 barrier 还是扣库存的 barrier,在执行他们 2 次 barrier insert 时候,originAffected 都会疏忽,因为 originType==“”会间接被 return 不插入数据,这样看来 不论是下单还是扣库存,barrier 的第二条插入数据失效,所以 barrier 数据表中就会有 2 条下单数据,一条是订单的一条是扣库存的

gid : dtm 全局事务 id

branch_id : 每个全局事务 id 下的每个业务 id

op : 操作,如果是失常胜利申请就是 action

barrier_id : 同一个业务下开多个会递增

这 4 个字段在表中是联结惟一索引,在 insertBarrier 时候,dtm 判断如果存在就疏忽不插入

2、如果订单胜利库存有余回滚子事务屏障

咱们库存只有 10 个,咱们下单 20 个

1)当订单下胜利,因为订单下单时候并不知道后续库存状况(即便在下单时候先去查库存那也会有查问时候足够,扣除时候有余状况),

所以下单胜利 barrier 表中依照之前梳理的逻辑就会在 barrier 表中产生一条正确数据执行数据

2)接着执行扣库存操作

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {fmt.Printf("扣库存 start....")

    stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
    if err != nil && err != model.ErrNotFound {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, status.Error(codes.Internal, err.Error())
    }
    if stock == nil || stock.Num < in.Num {
        //【回滚】库存有余确定须要 dtm 间接回滚,间接返回 codes.Aborted, dtmcli.ResultFailure 才能够回滚
        return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
    }
  
  .......
}

在执行扣库存业务逻辑之前,因为咱们查问库存发现库存有余,所以间接 return codes.Aborted 了,不会走到子事务屏障 barrier 这里,所以 barrier 表中不会插入数据,而是通知 dtm 要回滚

3)调用 order 回滚操作

订单回滚的时候会开启 barrier,这时候又会执行 barrier 代码(如下),因为回滚代码的 ti.Op 是 compensate,orginType 就是 action

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)

if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补        
  currentAffected == 0 { // 这个是反复申请或者悬挂        
  return    
}
rerr = busiCall(tx)

因为咱们之前下订单胜利了,barrier 表里有一条下单胜利时候的记录 action,所以 originAffected==0,所以只会插入一条以后回滚记录持续调用 busiCall(tx) 执行后续咱们本人写的回滚操作

到此,咱们应该只有两条数据,一条订单胜利创立记录,一条订单回滚记录

4)库存回滚 DeductRollback

订单回滚胜利后,会再持续调用库存回滚 DeductRollback,库存回滚代码如下

这就是子事务屏障主动帮咱们判断的,也就是那两条外围插入语句帮咱们判断的,以至于咱们业务不会呈现脏数据

库存这里回滚分两种状况

  • 没扣胜利回滚
  • 扣胜利回滚
没扣胜利回滚(咱们以后举例场景是这个)

首先调用库存回滚时候 ti.Op 是 compensate,orginType 就是 action,会执行上面 2 条 insert

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)

if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补
  currentAffected == 0 { // 这个是反复申请或者悬挂
  return}rerr = busiCall(tx)
}

这里联合判断如果是回滚、勾销操作,originAffected > 0 以后插入胜利了,之前对应的正向扣库存操作没有插入胜利,阐明之前库存没扣胜利,间接 return 就不须要执行后续弥补了。所以此时会在 barrier 表中插入 2 条数据间接 return,就不会执行咱们后续弥补操作了

到此咱们 barrier 表中有 4 条数据了

扣胜利回滚(这个状况本人能够尝试模仿此场景)

如果咱们上一步扣库存胜利,在执行此弥补的时候 ti.Op 是 compensate,orginType 就是 action,继续执行 2 个 insert 语句

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)

if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补
  currentAffected == 0 { // 这个是反复申请或者悬挂
  return}rerr = busiCall(tx)
}

这里联合判断如果是回滚、勾销操作,originAffected == 0 以后插入疏忽了没插入进去,阐明之前正向扣库存插入胜利了,这里只插入第二个 sql 语句记录即可,而后在执行后续咱们弥补的业务操作。

所以,整体剖析下来外围语句就是 2 条 insert,它帮咱们解决了反复回滚数据、数据幂等状况,只能说 dtm 作者想法真的很好,用了起码的代码帮咱们解决了一个很麻烦的问题

七、go-zero 对接中注意事项

1、dtm 的回滚弥补

在应用 dtm 的 grpc 时候,当咱们应用 saga、tcc 等如果第一步尝试或者执行失败了,是心愿它能执行前面的 rollback 的,在 grpc 中的服务如果产生谬误了,必须返回:status.Error(codes.Aborted, dtmcli.ResultFailure),返回其余谬误,不会执行你的 rollback 操作,dtm 会始终重试,如下:

stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
if err != nil && err != model.ErrNotFound {
  // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
  return nil, status.Error(codes.Internal, err.Error())
}
if stock == nil || stock.Num < in.Num {
  //【回滚】库存有余确定须要 dtm 间接回滚,间接返回 codes.Aborted, dtmcli.ResultFailure 才能够回滚
  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
}

2、barrier 的空弥补、悬挂等

之前筹备工作中,咱们创立了 dtm_barrier 库以及执行了 barrier.mysql.sql,这个其实就是为咱们的业务服务做了一个查看,避免空弥补,具体能够看 barrier.Call 中源码,没几行代码能够看懂的。

如果咱们线上应用的话,你的每个与 db 交互的服务只有用到了 barrier,这个服务应用到的 mysql 账号,要给他调配 barrier 库的权限,这个不要遗记了

3、barrier 在 rpc 中本地事务

在 rpc 的业务中,如果应用了 barrier 的话,那么在 model 中与 db 交互时候必须要用事务,并且肯定要跟 barrier 用同一个事务

logic

barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
    db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()
    if err != nil {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, status.Error(codes.Internal, err.Error())
    }
    if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)
        if err != nil{
            // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
            return status.Error(codes.Internal, err.Error())
        }
        affected, err := sqlResult.RowsAffected()
        if err != nil{
            // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
            return status.Error(codes.Internal, err.Error())
        }

        // 如果是影响行数为 0,间接就通知 dtm 失败不须要重试了
        if affected <= 0 {return  status.Error(codes.Aborted,  dtmcli.ResultFailure)
        }

        //!!开启测试!!:测试订单回滚更改状态为生效,并且以后库扣失败不须要回滚
        // return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in)

        return nil
    }); err != nil {
        // !!! 个别数据库不会谬误不须要 dtm 回滚,就让他始终重试,这时候就不要返回 codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!
        return nil, err
    }

model

func (m *defaultStockModel) DecuctStock(tx *sql.Tx,goodsId , num int64) (sql.Result,error) {query := fmt.Sprintf("update %s set `num` = `num` - ? where `goods_id` = ? and num >= ?", m.table)
    return tx.Exec(query,num, goodsId,num)
}

func (m *defaultStockModel) AddStock(tx *sql.Tx,goodsId , num int64) error {query := fmt.Sprintf("update %s set `num` = `num` + ? where `goods_id` = ?", m.table)
    _, err :=tx.Exec(query, num, goodsId)
    return err
}

七、应用 go-zero 的 http 对接

这个根本没啥难度,grpc 会了这个很简略,鉴于 go 在微服务中去应用 http 场景不多,这里就不具体去做了,我之前一个版本写过一个简略的,然而没这个欠缺,有趣味能够去看下,不过那个 barrier 是本人基于 go-zero 的 sqlx,将 dtm 的官网的做了批改,当初不须要了。

我的项目地址:https://github.com/Mikaelemmmm/dtmbarrier-go-zero

我的项目地址

https://github.com/zeromicro/go-zero

欢送应用 go-zerostar 反对咱们!

微信交换群

关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。

退出移动版