关于golang:跨MysqlRedisMongo的分布式事务

64次阅读

共计 4136 个字符,预计需要花费 11 分钟才能阅读完成。

Mysql、Redis、Mongo 都是十分风行的存储,并且各自有本人的劣势。在理论的利用中,经常会同时应用多种存储,也会遇见在多种存储中保证数据一致性的需要,例如保障数据库中的库存和 Redis 中的库存统一等。

本文基于分布式事务框架 https://github.com/dtm-labs/dtm 给出了一个跨 Mysql、Redis、Mongo 多种存储引擎的一个可运行的分布式事务实例,心愿可能帮忙大家解决这方面的问题。

这种灵便的组合多个存储引擎造成一个分布式事务的能力,也是 dtm 独创做到的,目前未看到其余的分布式事务框架有这样的能力。

问题场景

咱们先来看问题场景,假设当初用户加入一次流动,将本人的余额,充值话费,同时流动会赠送商城积分。其中余额存储在 Mysql,话费保留在 Redis,商城积分保留在 Mongo,并且因为流动限时,因而可能呈现加入流动失败的状况,所以须要反对回滚。

对于上述问题场景,能够应用 DTM 的 Saga 事务,上面咱们就来具体解说计划。

筹备数据

首先是筹备数据,为了不便用户疾速上手相干的例子,咱们曾经把相干的数据筹备好了,地址在 en.dtm.pub,外面包含 Mysql、Redis、Mongo,具体的连贯用户名明码能够在 https://github.com/dtm-labs/d… 找到。

如果您想要本人在本地筹备相干的数据环境,能够通过 https://github.com/dtm-labs/dtm/blob/main/helper/compose.store.yml 启动 Mysql、Redis、Mongo,而后通过 https://github.com/dtm-labs/dtm/tree/main/sqls 上面的脚本筹备本例子的数据,其中 busi.* 为业务数据,barrier.*为 DTM 应用的辅助表

编写业务代码

咱们先看最相熟的 Mysql 的业务代码

func SagaAdjustBalance(db dtmcli.DB, uid int, amount int) error {_, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
    return err
}

这段代码次要是进行数据库中用户余额的调整

对于 Saga 事务模式来说,当咱们回滚时,咱们须要反向调整余额,这部分的解决,咱们能够仍旧调用上述的SagaAdjustBalance,只须要传入正数的金额即可。

对于 Redis 和 Mongo,业务代码的解决也是相似的,只须要对相应的余额进行增减即可

如何做幂等

对于 Saga 事务模式来说,当咱们的子事务服务呈现长期故障,呈现故障就会进行重试,这个故障可能呈现在子事务提交前,也可能呈现在子事务提交之后,因而子事务服务就须要做到幂等。

DTM 提供了辅助表和辅助的函数,用于帮忙用户疾速实现幂等。对于 Mysql,他会在业务数据库中创立辅助表 barrier,当用户开启事务调整余额时,会先在 barrier 表中写入 gid,如果这是一个反复申请,那么写入 gid 时,会发现反复而失败,此时跳过用户业务上的余额调整,保障幂等。辅助函数的应用代码如下:

app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
    })
}))

Mongo 解决幂等的原理与 Mysql 相近,不再赘述

Redis 解决幂等的原理与 Mysql 不同,次要是因为事务的原理不同。Redis 的事务次要是通过 lua 的原子执行来保障的。DTM 的辅助函数会通过 lua 脚本来调整余额,调整余额前,会在 redis 中查问 gid,如果存在,则跳过业务上的余额调整;如果不存在,则执行业务上的余额调整。辅助函数的应用代码如下:

app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
}))

如何做弥补

对于 Saga 来说,咱们还须要解决弥补操作,但弥补操作并不是简略的反向调整,也有很多坑须要留神,否则很容易弥补出错。
一方面,弥补须要思考幂等,因为在弥补过程中,也同样须要思考故障重试的状况,与前一大节中的幂等解决一样。另一方面,弥补还须要思考空弥补,因为正向分支返回失败,这个失败可能是在正向的数据曾经调整实现提交之后的失败,也可能是还没有提交就返回了失败。对于数据已提交的失败,咱们须要执行反向操作,对于数据未提交的失败,咱们须要跳过反向操作,即解决空弥补。

DTM 提供的辅助表与辅助函数中,一方面会依据正向操作插入的 gid 判断是否为空弥补,另一方面还会再插入 gid+’compensate’,判断弥补是否为反复操作。如果是失常弥补操作,那么会执行业务上的弥补,如果是空弥补或者反复弥补,则会跳过弥补业务上的弥补。

Mysql 的代码如下:

app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
    })
}))

Redis 的代码如下:

app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
}))

弥补代码与后面的正向操作代码简直一样,仅仅是把金额乘以 -1。DTM 的辅助函数会在一个函数外部同时蕴含了幂等与弥补的相干逻辑

其余异样

编写子事务以及子事务的弥补时,其实还有一种异常情况是悬挂,可能呈现在全局事务超时回滚,或者重试达到上线后回滚,失常状况是先正向操作再弥补,然而极其状况可能呈现先弥补再正向操作,因而正向操作还须要判断弥补是否已执行,已执行的状况下,也须要跳过业务操作。

对于 DTM 的用户而言,这些异样都曾经被优雅的妥善处理,您作为用户,只须要依照上述的 MustBarrierFromGin(c).Call 进行调用即可,齐全不再须要关怀这些异样。DTM 解决这些异样的原理在这里进行了具体的讲述:异样与子事务屏障

发动分布式事务

后面编写完了各个子事务服务,上面这部分代码发动一个 Saga 全局事务:

saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)).
  Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", &busi.TransReq{Amount: 50}).
  Add(busi.Busi+"/SagaMongoTransIn", busi.Busi+"/SagaMongoTransInCom", &busi.TransReq{Amount: 30}).
  Add(busi.Busi+"/SagaRedisTransIn", busi.Busi+"/SagaRedisTransOutIn", &busi.TransReq{Amount: 20})
err := saga.Submit()

在这部分代码中,创立了一个 Saga 全局事务,该 Saga 事务包含 3 个子事务:

  • 从 Mysql 直达出 50
  • 向 Mongo 中转入 30
  • 向 Redis 中转入 20

在整个事务过程中,如果所有的子事务都顺利完成,那么全局事务胜利;如果有一个子事务返回了业务上的失败,那么全局事务回滚。

运行

如果您想要残缺运行一个下面的示例,步骤如下:

  1. 运行 dtm
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
  1. 运行胜利的例子
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb
  1. 运行失败的例子
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb_rollback

您能够对例子进行批改,模仿各种长期的故障,空弥补的状况,以及其余各种异样,当整个全局事务最终实现时,数据是统一的。

小结

本文给出了一个跨 Mysql、Redis、Mongo 的分布式事务例子,具体解说了其中须要解决的问题,以及解决方案。

本文的原理适宜于所有反对 ACID 事务的存储引擎,您能够将它疾速扩大,用于其余引擎,例如 TiKV 等。

欢送拜访 https://github.com/dtm-labs/dtm 并 star 反对咱们

正文完
 0