咱们用一个系列来解说从需要到上线、从代码到k8s部署、从日志到监控等各个方面的微服务残缺实际。

整个我的项目应用了go-zero开发的微服务,根本蕴含了go-zero以及相干go-zero作者开发的一些中间件,所用到的技术栈根本是go-zero项目组的自研组件,根本是go-zero全家桶了。

实战我的项目地址:https://github.com/Mikaelemmm...

对于分布式事务

因为本我的项目服务划分绝对独立一些,所以目前没有应用到分布式事务,不过go-zero联合dtm应用分布式事务的最佳实际,我有整顿demo,这里就介绍一下go-zero联合dtm的应用,我的项目地址go-zero联合dtm最佳实际仓库地址 : https://github.com/Mikaelemmm...

【注】上面说的不是go-zero-looklook我的项目,是这个我的项目 https://github.com/Mikaelemmm...

一、注意事项

  • go-zero 1.2.4版本以上,这个肯定要留神
  • dtm 你用最新的就行了

二、clone dtm

git clone https://github.com/yedf/dtm.git

三、配置文件

1、找到我的项目跟文件夹下的conf.sample.yml

2、cp conf.sample.yml conf.yml

3、应用etcd , 把配置中上面这段正文关上 (如果没用etcd就更简略了 ,这个都省了,间接链接到dtm server地址就能够了)

MicroService:  Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover  Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url  EndPoint: 'localhost:36790'

解释一下:

MicroService 这个不要动,这个代表要对把dtm注册到那个微服务服务集群外面去,使微服务集群外部服务能够通过grpc间接跟dtm交互

Driver :'dtm-driver-gozero' , 应用go-zero的注册服务发现驱动,反对go-zero

Target: 'etcd://localhost:2379/dtmservice' 将以后dtm的server间接注册到微服务所在的etcd集群中,如果go-zero作为微服务应用的话,就能够间接通过etcd拿到dtm的server grpc链接,间接就能够跟dtm server交互了

EndPoint: 'localhost:36790' , 代表的是dtm的server的连贯地址+端口 , 集群中的微服务能够间接通过etcd取得此地址跟dtm交互了,

如果你本人去改了dtm源码grpc端口,记得这里要改下端口

四、启动dtm server

在dtm我的项目根目录下

go run app/main.go dev

五、应用go-zero的grpc对接dtm

这是一个疾速下单扣商品库存的例子

1、order-api

order-api是http服务入口创立订单

service order {   @doc "创立订单"   @handler create   post /order/quickCreate (QuickCreateReq) returns (QuickCreateResp)}

接下来看logic

func (l *CreateLogic) Create(req types.QuickCreateReq,r *http.Request) (*types.QuickCreateResp, error) {    orderRpcBusiServer, err := l.svcCtx.Config.OrderRpcConf.BuildTarget()    if err != nil{        return nil,fmt.Errorf("下单异样超时")    }    stockRpcBusiServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()    if err != nil{        return nil,fmt.Errorf("下单异样超时")    }    createOrderReq:= &order.CreateReq{UserId: req.UserId,GoodsId: req.GoodsId,Num: req.Num}    deductReq:= &stock.DecuctReq{GoodsId: req.GoodsId,Num: req.Num}    // 这里只举了saga例子,tcc等其余例子根本没啥区别具体能够看dtm官网    gid := dtmgrpc.MustGenGid(dtmServer)    saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).        Add(orderRpcBusiServer+"/pb.order/create", orderRpcBusiServer+"/pb.order/createRollback", createOrderReq).        Add(stockRpcBusiServer+"/pb.stock/deduct", stockRpcBusiServer+"/pb.stock/deductRollback", deductReq)    err = saga.Submit()    dtmimp.FatalIfError(err)    if err != nil{        return nil,fmt.Errorf("submit data to  dtm-server err  : %+v \n",err)    }    return &types.QuickCreateResp{}, nil}

进入到下单逻辑时,别离获取order订单、stock库存服务的rpc在etcd中的地址,应用BuildTarget()这个办法

而后创立order、stock对应的申请参数

申请dtm获取全局事务id , 基于此全局事务id开启grpc的saga分布式事务 ,将创立订单、扣减库存申请放入事务中,这里应用grpc模式申请,每个业务要有一个正向申请、一个回滚申请、以及申请参数,当执行其中任何一个业务正向申请出错了会主动调用事务中所有业务回滚申请达到回滚成果。

2、order-srv

order-srv是订单的rpc服务,与dtm-gozero-order数据库中order表交互

// serviceservice order {   rpc create(CreateReq)returns(CreateResp);   rpc createRollback(CreateReq)returns(CreateResp);}

2.1 Create

当order-api提交事务默认申请的是create办法,咱们看看logic

func (l *CreateLogic) Create(in *pb.CreateReq) (*pb.CreateResp, error) {   fmt.Printf("创立订单 in : %+v \n", in)   // barrier避免空弥补、空悬挂等具体看dtm官网即可,别忘记加barrier表在以后库中,因为判断弥补与要执行的sql一起本地事务   barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)   db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()   if err != nil {      // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!      return nil, status.Error(codes.Internal, err.Error())   }   if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {      order := new(model.Order)      order.GoodsId = in.GoodsId      order.Num = in.Num      order.UserId = in.UserId      _, err = l.svcCtx.OrderModel.Insert(tx, order)      if err != nil {         return fmt.Errorf("创立订单失败 err : %v , order:%+v \n", err, order)      }      return nil   }); err != nil {      // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!      return nil, status.Error(codes.Internal, err.Error())   }   return &pb.CreateResp{}, nil}

能够看到,一进入办法外部咱们就应用了dtm的子事务屏障技术,至于为什么应用子事务屏障是因为可能会呈现反复申请或者空申请造成的脏数据等,在这里dtm主动给咱们做了幂等解决不须要咱们本人在额定做了,同时保障他外部的幂等解决跟咱们本人执行的事务要在一个事务中,所以要应用一个会话的db链接,这时候咱们就要先获取

db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()

而后基于此db连贯dtm在外部通过sql执行做幂等解决,同时咱们基于此db连贯开启事务,这样就能保障dtm外部的子事务屏障在执行sql操作与咱们本人业务执行的sql操作在一个事务中。

dtm在应用grpc调用咱们业务的时候,咱们的grpc服务在返回给dtm server谬误时候,dtm会依据咱们返回给它的grpc错误码判断是要执行回滚操作还是始终重试:

  • codes.Internal : dtm server不会调用回滚,会始终重试,每次重试dtm的数据库中都会加一次重试次数,本人能够监控这个重试次数报警,人工解决
  • codes.Aborted : dtm server会调用所有回滚申请,执行回滚操作

如果dtm在调用grpc返回谬误是nil时候,就认为调用胜利了

2.2 CreateRollback

当咱们调用订单的创立订单或者库存扣减时候返回给dtm server 的codes.Aborted时候,dtm server会调用所有回滚操作,CreateRollback就是对应订单下单的回滚操作,代码如下

func (l *CreateRollbackLogic) CreateRollback(in *pb.CreateReq) (*pb.CreateResp, error) {    fmt.Printf("订单回滚  , in: %+v \n", in)    order, err := l.svcCtx.OrderModel.FindLastOneByUserIdGoodsId(in.UserId, in.GoodsId)    if err != nil && err != model.ErrNotFound {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, status.Error(codes.Internal, err.Error())    }    if order != nil {        barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)        db, err := l.svcCtx.OrderModel.SqlDB()        if err != nil {            // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!            return nil, status.Error(codes.Internal, err.Error())        }        if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {            order.RowState = -1            if err := l.svcCtx.OrderModel.Update(tx, order); err != nil {                return fmt.Errorf("回滚订单失败  err : %v , userId:%d , goodsId:%d", err, in.UserId, in.GoodsId)            }            return nil        }); err != nil {            logx.Errorf("err : %v \n", err)            // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!            return nil, status.Error(codes.Internal, err.Error())        }    }    return &pb.CreateResp{}, nil}

其实就是如果之前下单胜利了,将之前下胜利的单给勾销掉就是对应下单的回滚操作

3、stock-srv

3.1 Deduct

扣减库存,这里跟order的Create一样了,是下单事务内的正向操作,扣减库存,代码如下

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {    fmt.Printf("扣库存start....")    stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)    if err != nil && err != model.ErrNotFound {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, status.Error(codes.Internal, err.Error())    }    if stock == nil || stock.Num < in.Num {        // 【回滚】库存有余确定须要dtm间接回滚,间接返回 codes.Aborted, dtmcli.ResultFailure 才能够回滚        return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)    }    // barrier避免空弥补、空悬挂等具体看dtm官网即可,别忘记加barrier表在以后库中,因为判断弥补与要执行的sql一起本地事务    barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)    db, err := l.svcCtx.StockModel.SqlDB()    if err != nil {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, status.Error(codes.Internal, err.Error())    }    if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {        sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)        if err != nil{            // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!            return status.Error(codes.Internal, err.Error())        }        affected, err := sqlResult.RowsAffected()        if err != nil{            // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!            return status.Error(codes.Internal, err.Error())        }        // 如果是影响行数为0,间接就通知dtm失败不须要重试了        if affected <= 0 {            return  status.Error(codes.Aborted,  dtmcli.ResultFailure)        }        // !!开启测试!! 测试订单回滚更改状态为生效,并且以后库扣失败不须要回滚        //return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in)        return nil    }); err != nil {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil,err    }    return &pb.DeductResp{}, nil}

这里值得注意的是当只有库存有余、或者在扣库存影响行数为0(未胜利)才须要通知dtm server要回滚,其余状况下其实都是网络抖动、硬件异样导致,应该让dtm server始终重试,当然本人要加个最大重试次数的监控报警,如果达到最大次数还未胜利能实现主动发短信、打电话人工染指了。

3.2 DeductRollback

这里是对应扣库存的回滚操作

func (l *DeductRollbackLogic) DeductRollback(in *pb.DecuctReq) (*pb.DeductResp, error) {    fmt.Printf("库存回滚 in : %+v \n", in)    barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)    db, err := l.svcCtx.StockModel.SqlDB()    if err != nil {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, status.Error(codes.Internal, err.Error())    }    if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {        if err := l.svcCtx.StockModel.AddStock(tx, in.GoodsId, in.Num); err != nil {            return fmt.Errorf("回滚库存失败 err : %v ,goodsId:%d , num :%d", err, in.GoodsId, in.Num)        }        return nil    }); err != nil {        logx.Errorf("err : %v \n", err)        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, status.Error(codes.Internal, err.Error())    }    return &pb.DeductResp{}, nil}

六、子事务屏障

这个词是dtm作者定义的,其实子事务屏障代码不多,看barrier.CallWithDB这个办法即可。

// CallWithDB the same as Call, but with *sql.DBfunc (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {     tx, err := db.Begin()     if err != nil {          return err     }     return bb.Call(tx, busiCall)}

因为这个办法他在外部开启本地事务,它外部是在此事务执行了sql操作,所以在咱们执行本人的业务时候必须跟它用同一个事务,那就要基于同一个db连贯开事务了,so~ 你晓得为什么咱们要提前获取db连贯了吧,目标就是要让它外部执行的sql操作跟咱们的sql操作在一个事务下。至于它外部为什么执行本人的sql操作,接下来咱们剖析。

咱们看bb.Call这个办法

// Call 子事务屏障,具体介绍见 https://zhuanlan.zhihu.com/p/388444465// tx: 本地数据库的事务对象,容许子事务屏障进行事务操作// busiCall: 业务函数,仅在必要时被调用func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) { bb.BarrierID = bb.BarrierID + 1 bid := fmt.Sprintf("%02d", bb.BarrierID) defer func() {  // Logf("barrier call error is %v", rerr)  if x := recover(); x != nil {   tx.Rollback()   panic(x)  } else if rerr != nil {   tx.Rollback()  } else {   tx.Commit()  } }() ti := bb originType := map[string]string{  BranchCancel:     BranchTry,  BranchCompensate: BranchAction, }[ti.Op] originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补  currentAffected == 0 { // 这个是反复申请或者悬挂  return } rerr = busiCall(tx) return}

外围其实就是如下几行代码

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)    currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)    dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)    if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补          currentAffected == 0 { // 这个是反复申请或者悬挂          return    }rerr = busiCall(tx)
func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {  if op == "" {            return 0, nil        }      sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")      return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)}

每一个业务逻辑,dtm server在失常胜利申请时候, ti.Op 默认失常执行的操作是action,所以失常第一次申请都是ti.Op值都是action,那originType就是“”

    originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)

那么下面这个sql就不会执行因为ti.Op == "" 在insertBarrier中间接return了

    currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

那第二个sql的ti.Op是 action, 所以子事务屏障表barrier就会插入一条数据

同理在执行库存也会插入一条

1、整个事务都胜利的子事务屏障

那在一次下订单失常胜利申请下,因为 ti.Op都是action,所以originType都是"" , 所以不论是下单的barrier 还是扣库存的barrier,在执行他们2次barrier insert时候,originAffected都会疏忽,因为originType==“” 会间接被return不插入数据,这样看来 不论是下单还是扣库存,barrier的第二条插入数据失效,所以barrier数据表中就会有2条下单数据,一条是订单的一条是扣库存的

gid : dtm全局事务id

branch_id : 每个全局事务id下的每个业务id

op : 操作,如果是失常胜利申请就是action

barrier_id : 同一个业务下开多个会递增

这4个字段在表中是联结惟一索引,在insertBarrier时候,dtm判断如果存在就疏忽不插入

2、如果订单胜利库存有余回滚子事务屏障

咱们库存只有10个 ,咱们下单20个

1)当订单下胜利,因为订单下单时候并不知道后续库存状况(即便在下单时候先去查库存那也会有查问时候足够,扣除时候有余状况),

所以下单胜利barrier表中依照之前梳理的逻辑就会在barrier表中产生一条正确数据执行数据

2)接着执行扣库存操作

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {    fmt.Printf("扣库存start....")    stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)    if err != nil && err != model.ErrNotFound {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, status.Error(codes.Internal, err.Error())    }    if stock == nil || stock.Num < in.Num {        //【回滚】库存有余确定须要dtm间接回滚,间接返回 codes.Aborted, dtmcli.ResultFailure 才能够回滚        return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)    }    .......}

在执行扣库存业务逻辑之前,因为咱们查问库存发现库存有余,所以间接return codes.Aborted 了,不会走到子事务屏障barrier这里,所以barrier表中不会插入数据,而是通知dtm要回滚

3)调用order回滚操作

订单回滚的时候会开启barrier,这时候又会执行barrier代码(如下),因为回滚代码的ti.Op是compensate ,orginType就是action

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补          currentAffected == 0 { // 这个是反复申请或者悬挂          return    }rerr = busiCall(tx)

因为咱们之前下订单胜利了,barrier表里有一条下单胜利时候的记录action,所以originAffected==0 ,所以只会插入一条以后回滚记录持续调用 busiCall(tx) 执行后续咱们本人写的回滚操作

到此,咱们应该只有两条数据,一条订单胜利创立记录,一条订单回滚记录

4)库存回滚DeductRollback

订单回滚胜利后,会再持续调用库存回滚DeductRollback,库存回滚代码如下

这就是子事务屏障主动帮咱们判断的,也就是那两条外围插入语句帮咱们判断的,以至于咱们业务不会呈现脏数据

库存这里回滚分两种状况

  • 没扣胜利回滚
  • 扣胜利回滚
没扣胜利回滚(咱们以后举例场景是这个 )

首先调用库存回滚时候ti.Op是compensate ,orginType就是action ,会执行上面2条insert

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补  currentAffected == 0 { // 这个是反复申请或者悬挂  return}rerr = busiCall(tx)}

这里联合判断如果是回滚、勾销操作,originAffected > 0 以后插入胜利了,之前对应的正向扣库存操作没有插入胜利,阐明之前库存没扣胜利,间接return就不须要执行后续弥补了。所以此时会在barrier表中插入2条数据间接return,就不会执行咱们后续弥补操作了

到此咱们barrier表中有4条数据了

扣胜利回滚(这个状况本人能够尝试模仿此场景)

如果咱们上一步扣库存胜利,在执行此弥补的时候ti.Op是compensate ,orginType就是action ,继续执行2个insert语句

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空弥补  currentAffected == 0 { // 这个是反复申请或者悬挂  return}rerr = busiCall(tx)}

这里联合判断如果是回滚、勾销操作,originAffected == 0 以后插入疏忽了没插入进去,阐明之前正向扣库存插入胜利了,这里只插入第二个sql语句记录即可,而后在执行后续咱们弥补的业务操作。

所以,整体剖析下来外围语句就是2条insert,它帮咱们解决了反复回滚数据、数据幂等状况,只能说dtm作者想法真的很好,用了起码的代码帮咱们解决了一个很麻烦的问题

七、go-zero对接中注意事项

1、dtm的回滚弥补

在应用dtm的grpc时候,当咱们应用saga、tcc等如果第一步尝试或者执行失败了,是心愿它能执行前面的rollback的,在grpc中的服务如果产生谬误了,必须返回 : status.Error(codes.Aborted, dtmcli.ResultFailure) , 返回其余谬误,不会执行你的rollback操作,dtm会始终重试,如下:

stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)if err != nil && err != model.ErrNotFound {  // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!  return nil, status.Error(codes.Internal, err.Error())}if stock == nil || stock.Num < in.Num {  //【回滚】库存有余确定须要dtm间接回滚,间接返回 codes.Aborted, dtmcli.ResultFailure 才能够回滚  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}

2、barrier的空弥补、悬挂等

之前筹备工作中,咱们创立了dtm_barrier库以及执行了barrier.mysql.sql,这个其实就是为咱们的业务服务做了一个查看,避免空弥补,具体能够看barrier.Call中源码,没几行代码能够看懂的。

如果咱们线上应用的话,你的每个与db交互的服务只有用到了barrier,这个服务应用到的mysql账号,要给他调配barrier库的权限,这个不要遗记了

3、barrier在rpc中本地事务

在rpc的业务中,如果应用了barrier的话,那么在model中与db交互时候必须要用事务,并且肯定要跟barrier用同一个事务

logic

barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)    db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()    if err != nil {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, status.Error(codes.Internal, err.Error())    }    if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {        sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)        if err != nil{            // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!            return status.Error(codes.Internal, err.Error())        }        affected, err := sqlResult.RowsAffected()        if err != nil{            // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!            return status.Error(codes.Internal, err.Error())        }        // 如果是影响行数为0,间接就通知dtm失败不须要重试了        if affected <= 0 {            return  status.Error(codes.Aborted,  dtmcli.ResultFailure)        }        // !!开启测试!! : 测试订单回滚更改状态为生效,并且以后库扣失败不须要回滚        // return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in)        return nil    }); err != nil {        // !!!个别数据库不会谬误不须要dtm回滚,就让他始终重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就能够了,具体本人把控!!!        return nil, err    }

model

func (m *defaultStockModel) DecuctStock(tx *sql.Tx,goodsId , num int64) (sql.Result,error) {    query := fmt.Sprintf("update %s set `num` = `num` - ? where `goods_id` = ? and num >= ?", m.table)    return tx.Exec(query,num, goodsId,num)}func (m *defaultStockModel) AddStock(tx *sql.Tx,goodsId , num int64) error {    query := fmt.Sprintf("update %s set `num` = `num` + ? where `goods_id` = ?", m.table)    _, err :=tx.Exec(query, num, goodsId)    return err}

七、应用go-zero的http对接

这个根本没啥难度,grpc会了这个很简略,鉴于go在微服务中去应用http场景不多,这里就不具体去做了,我之前一个版本写过一个简略的,然而没这个欠缺,有趣味能够去看下,不过那个barrier是本人基于go-zero的sqlx,将dtm的官网的做了批改,当初不须要了。

我的项目地址:https://github.com/Mikaelemmmm/dtmbarrier-go-zero

我的项目地址

https://github.com/zeromicro/go-zero

欢送应用 go-zerostar 反对咱们!

微信交换群

关注『微服务实际』公众号并点击 交换群 获取社区群二维码。