乐趣区

关于seata:Seata-AT-模式代码级详解

文|

刘月财

seata-go 我的项目负责人

北京小桔科技有限公司【滴滴】开发工程师

赵新(花名:于雨 )

蚂蚁团体 Seata 我的项目开源负责人

本文 5343 字 浏览 14分钟

背景

Seata 四种事务模式中,AT 事务模式是阿里体系独创的事务模式,对业务无侵入,也是 Seata 用户最多的一种事务模式,兼具易用性与高性能。

目前,Seata 社区正鼎力推动其多语言版本建设,Go、PHP、JS 和 Python 四个语言版本根本实现了 TCC 事务模式的实现。参照 Seata v1.5.2 版本的 AT 模式的实现,并联合 Seata 官网文档,本文尝试从代码角度详解 Seata AT 事务模式的具体流程,目标是梳理 Seata Java 版本 AT 模式的实现细节后,在多语言版本后续开发中,优先实现 AT 事务模式。

1、什么是 AT 模式?

AT 模式是一种二阶段提交的分布式事务模式,它采纳了本地 undo log 的形式来数据在批改前后的状态,并用它来实现回滚。从性能上来说,AT 模式因为有 undo log 的存在,一阶段执行完能够立刻开释锁和连贯资源,吞吐量比 XA 模式高。用户在应用 AT 模式的时候,只须要配置好对应的数据源即可,事务提交、回滚的流程都由 Seata 主动实现,对用户业务简直没有入侵,应用便当。

2、AT 模式与 ACID 和 CAP

议论数据库的事务模式,个别都会先议论事务相干的 ACID 个性,但在分布式场景下,还须要思考其 CAP 性质。

2.1 AT 与 ACID

数据库事务要满足原子性、一致性、持久性以及隔离性四个性质,即 ACID。在分布式事务场景下,个别地,首先保障原子性和持久性,其次保障一致性,隔离性则因为其应用的不同数据库的锁、数据 MVCC 机制以及相干事务模式的差别,具备多种隔离级别,如 MySQL 本身事务就有读未提交(Read Uncommitted)、读已提交(Read Committed)、可反复读(Repeatable Read)、序列化(Serializable)等四种隔离级别。

2.1.1 AT 模式的读隔离

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的根底上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)

如果利用在特定场景下,必须要求全局的 读已提交,目前 Seata 的形式是通过 SELECT FOR UPDATE 语句的代理。

SELECT FOR UPDATE 语句的执行会查问 全局锁 ,如果 全局锁 被其余事务持有,则开释本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查问是被 block 住的,直到 全局锁 拿到,即读取的相干数据是 已提交 的,才返回。

出于总体性能上的思考,Seata 目前的计划并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

具体例子参考 Seata 官网:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

2.1.2 AT 模式的写隔离

AT 会对写操作的 SQL 进行拦挡,提交本地事务前,会向 TC 获取全局锁,未获取到全局锁的状况下,不能进行写,以此来保障不会产生写抵触:

一阶段本地事务提交前,须要确保先拿到 全局锁

拿不到 全局锁,不能提交本地事务;

全局锁 的尝试被限度在肯定范畴内,超出范围将放弃,并回滚本地事务,开释本地锁。

具体例子参考 Seata 官网:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

2.2 AT 与 CAP

Seata 所有的事务模式在个别状况下,是须要保障 CP,即一致性和分区容错性,因为分布式事务的外围就是要保证数据的一致性(包含弱一致性)。比方,在一些交易场景下,波及到多个零碎的金额的变动,保障一致性能够防止零碎产生资损。

分布式系统不可避免地会呈现服务不可用的状况,如 Seata 的 TC 呈现不可用时,用户可能心愿通过服务降级,优先保障整个服务的可用性,此时 Seata 须要从 CP 零碎转换为一个保障 AP 的零碎。

比方,有一个服务是给用户端提供用户批改信息的性能,如果此时 TC 服务呈现问题,为了不影响用户的应用体验,咱们心愿服务依然可用,只不过所有的 SQL 的执行降级为不走全局事务,而是当做本地事务执行。

AT 模式默认优先保障 CP,但提供了配置通道让用户在 CP 和 AP 两种模式下进行切换:

配置文件的 tm.degrade-check 参数,其值为 true 则分支事务保障 AP,反之保障 CP;

手动批改配置核心的 service.disableGlobalTransaction 属性为 true,则敞开全局事务实现 AP。

3、AT 数据源代理

在 AT 模式中,用户只须要配置好 AT 的代理数据源即可,AT 的所有流程都在代理数据源中实现,对用户无感知。

AT 数据源代理的整体类构造如下图:

 AT 事务数据源代理类结构图【from https://seata.io/zh-cn/docs/dev/mode/xa-mode.html

AT 的数据源代理中,别离对指标数据库的 DataSource、Connection 和 Statement  进行了代理,在执行指标 SQL 动作之前,实现了 RM 资源注册、undo log 生成、分支事务注册、分支事务提交 / 回滚等操作,而这些操作对用户并无感知。

上面的时序图中,展现了 AT 模式在执行过程中,这几个代理类的动作细节:

注:图片倡议在 PC 端查看

4、AT 模式流程

以下是 AT 模式的整体流程,从这里能够看到分布式事务各个要害动作的执行机会,每个动作细节,咱们前面来探讨:

注:图片倡议在 PC 端查看

4.1 一阶段

在 AT 模式的第一阶段,Seata 会通过代理数据源,拦挡用户执行的业务 SQL,如果用户没有开启事务,会主动开启一个新事务。如果业务 SQL 是写操作(增、删、改操作)类型,会解析业务 SQL 的语法,生成 SELECT SQL 语句,把要被批改的记录查出来,保留为“before image”。而后执行业务 SQL,执行完后用同样的原理,将曾经被批改的记录查出来,保留为“after image”,至此一个 undo log 记录就残缺了。随后 RM 会向 TC 注册分支事务,TC 侧会新加锁记录,锁能够保障 AT 模式的读、写隔离。RM  再将 undo log 和业务 SQL 的本地事务提交,保障业务 SQL 和保留 undo log 记录 SQL 的原子性。

4.2 二阶段提交

AT 模式的二阶段提交,TC 侧会将该事务的锁删除,而后告诉 RM 异步删除 undo log 记录即可。

4.3 二阶段回滚

如果 AT 模式的二阶段是回滚,那么 RM 侧须要依据一阶段保留的 undo log 数据中的 before image 记录,通过逆向 SQL 的形式,对在一阶段批改过的业务数据进行还原即可。

然而在还原数据之前,须要进行脏数据校验。因为在一阶段提交后,到当初进行回滚的两头这段时间,该记录有可能被别的业务改变过。校验的形式,就是用 undo log 的 after image 和当初数据库的数据做比拟,如果数据统一,阐明没有脏数据;不统一则阐明有脏数据,呈现脏数据就须要人工进行解决了。

5、要害代码模块

如下是 AT 模式整个流程的次要模块,咱们从中能够理解开发 AT 模式须要做哪些事件:

5.1 Undo log 数据格式

undo log 存在表 undo_log 表中,undo_log 表的表构造如下:

rollback_info 寄存了业务数据批改前后的内容,数据表寄存的是通过压缩后的格局,他的明文格局如下:

{
    "branchId":2828558179596595558,
    "sqlUndoLogs":[
        {
            "afterImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PRIMARY_KEY",
                                "name":"id",
                                "type":4,
                                "value":3
                            },
                            {
                                "keyType":"NULL",
                                "name":"count",
                                "type":4,
                                "value":70
                            }
                        ]
                    }
                ],
                "tableName":"stock_tbl"
            },
            "beforeImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PRIMARY_KEY",
                                "name":"id",
                                "type":4,
                                "value":3
                            },
                            {
                                "keyType":"NULL",
                                "name":"count",
                                "type":4,
                                "value":100
                            }
                        ]
                    }
                ],
                "tableName":"stock_tbl"
            },
            "sqlType":"UPDATE",
            "tableName":"stock_tbl"
        }
    ],
    "xid":"192.168.51.102:8091:2828558179596595550"
}

5.2 UndoLogManager

UndoLogManager 负责 undo log 的新加、删除、回滚操作,不同的数据库有不同的实现(不同数据库的 SQL 语法会不同),公共逻辑放在了 AbstractUndoLogManager 抽象类中,整体的类继承关系如下图:

注:图片倡议在 PC 端查看

插入和删除 undo log 的逻辑都比较简单,间接操作数据表就行。这里重点看下回滚 undo log 的逻辑:

源码剖析如下:

@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    Connection conn = null;b
        ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;

    for (; ;) {
        try {conn = dataSourceProxy.getPlainConnection();

            // The entire undo process should run in a local transaction.
            // 开启本地事务,确保删除 undo log 和复原业务数据的 SQL 在一个事务中 commit
            if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);
            }

            // Find UNDO LOG
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            // 查出 branchId 的所有 undo log 记录,用来复原业务数据
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                exists = true;

                // It is possible that the server repeatedly sends a rollback request to roll back
                // the same branch transaction to multiple processes,
                // ensuring that only the undo_log in the normal state is processed.
                int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                // 如果 state=1,阐明能够回滚;state= 1 阐明不能回滚
                if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                    }
                    return;
                }

                String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                Map<String, String> context = parseContext(contextString);
                byte[] rollbackInfo = getRollbackInfo(rs);

                String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                // 依据 serializer 获取序列化工具类
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                    : UndoLogParserFactory.getInstance(serializer);
                // 反序列化 undo log,失去业务记录批改前后的明文
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    // put serializer name to local
                    setCurrentSerializer(parser.getName());
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    if (sqlUndoLogs.size() > 1) {Collections.reverse(sqlUndoLogs);
                    }
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(conn);
                    }
                } finally {
                    // remove serializer name
                    removeCurrentSerializer();}
            }

            // If undo_log exists, it means that the branch transaction has completed the first phase,
            // we can directly roll back and clean the undo_log
            // Otherwise, it indicates that there is an exception in the branch transaction,
            // causing undo_log not to be written to the database.
            // For example, the business processing timeout, the global transaction is the initiator rolls back.
            // To ensure data consistency, we can insert an undo_log with GlobalFinished state
            // to prevent the local transaction of the first phase of other programs from being correctly submitted.
            // See https://github.com/seata/seata/issues/489

            if (exists) {deleteUndoLog(xid, branchId, conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
                            State.GlobalFinished.name());
                }
            } else {
                // 如果不存在 undo log,可能是因为分支事务还未执行实现(比方,分支事务执行超时),TM 发动了回滚全局事务的申请。// 这个时候,往 undo_log 表插入一条记录,能够使分支事务提交的时候失败(undo log)insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
                            State.GlobalFinished.name());
                }
            }

            return;
        } catch (SQLIntegrityConstraintViolationException e) {
            // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
            if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
            }
        } catch (Throwable e) {if (conn != null) {
                try {conn.rollback();
                } catch (SQLException rollbackEx) {LOGGER.warn("Failed to close JDBC resource while undo ...", rollbackEx);
                }
            }
            throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                    .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                        branchId, e.getMessage()), e);
        } finally {
            try {if (rs != null) {rs.close();
                }
                if (selectPST != null) {selectPST.close();
                }
                if (conn != null) {if (originalAutoCommit) {conn.setAutoCommit(true);
                    }
                    conn.close();}
            } catch (SQLException closeEx) {LOGGER.warn("Failed to close JDBC resource while undo ...", closeEx);
            }
        }
    }
}

备注:须要特地留神下,当回滚的时候,发现 undo log 不存在,须要往 undo_log 表新加一条记录,防止因为 RM 在 TM 收回回滚申请后,又胜利提交分支事务的场景。

5.3 Compressor 压缩算法

Compressor 接口定义了压缩算法的标准,用来压缩文本,节俭存储空间:


public interface Compressor {

    /**
     * compress byte[] to byte[].
     * @param bytes the bytes
     * @return the byte[]
     */
    byte[] compress(byte[] bytes);

    /**
     * decompress byte[] to byte[].
     * @param bytes the bytes
     * @return the byte[]
     */
    byte[] decompress(byte[] bytes);

}

目前曾经实现的压缩算法有如下这些:

5.4 UndoLogParser 序列化算法

Serializer 接口定义了序列化算法的标准,用来序列化代码:

public interface UndoLogParser {

    /**
     * Get the name of parser;
     * 
     * @return the name of parser
     */
    String getName();

    /**
     * Get default context of this parser
     * 
     * @return the default content if undo log is empty
     */
    byte[] getDefaultContent();

    /**
     * Encode branch undo log to byte array.
     *
     * @param branchUndoLog the branch undo log
     * @return the byte array
     */
    byte[] encode(BranchUndoLog branchUndoLog);

    /**
     * Decode byte array to branch undo log.
     *
     * @param bytes the byte array
     * @return the branch undo log
     */
    BranchUndoLog decode(byte[] bytes);
}

目前曾经实现的序列化算法有如下这些:

5.5 Executor 执行器

Executor 是 SQL 执行的入口类,AT 在执行 SQL 前后,须要治理 undo log 的 image 记录,次要是构建 undo log,包含依据不同的业务 SQL,来组装查问 undo log 的 SQL 语句;执行查问 undo log 的 SQL,获取到镜像记录数据;执行插入 undo log 的逻辑(未提交事务)。

​public interface Executor<T> {​    /**     * Execute t.     *     * @param args the args     * @return the t     * @throws Throwable the throwable     */    T execute(Object... args) throws Throwable;}

针对不同的业务 SQL,有不同的 Executor 实现,次要是因为不同操作 / 不同数据库类型的业务 SQL,生成 undo log 的 SQL 的逻辑不同,所以都别离重写了 beforeImage() 和 afterImage() 办法。整体的继承关系如下图所示:

注:图片倡议在 PC 端查看

为了直观地看到不同类型的 SQL 生成的 before image SQL 和 after iamge SQL,这里做个梳理。如果指标数据表的构造如下:


public interface Executor<T> {

    /**
     * Execute t.
     *
     * @param args the args
     * @return the t
     * @throws Throwable the throwable
     */
    T execute(Object... args) throws Throwable;
}

注:图片倡议在 PC 端查看

5.6 AsyncWorker

AsyncWorker 是用来做异步执行的,用来做分支事务提交和 undo log 记录删除等操作。

6、对于性能

并不存在某一种完满的分布式事务机制能够适应所有场景,完满满足所有需要。无论 AT 模式、TCC 模式还是 Saga 模式,实质上都是对 XA 标准在各种场景下安全性或者性能的有余的改良。Seata 不同的事务模式是在一致性、可靠性、易用性、性能四个个性之间进行不同的取舍。

近期  Seata 社区发现有同行,在未详细分析 Java 版本 AT 模式的代码的具体实现的状况下,仅对某个晚期的 Go 版本的 Seata 进行短链接压测后,质疑 AT 模型的性能及其数据安全性,请具备肯定思辨能力的用户敌人们在承受这个论断前认真查阅其测试方法与测试对象,辨别好“李鬼”与“李逵”。

实际上,这个晚期的 Go 版本实现仅参照了 Seata v1.4.0,且未严格把 Seata AT 模式的所有性能都予以实现。话说回来,即使其推崇的 Seata XA 模式,其也依赖于单 DB 的 XA 模式。而当下最新版本的 MySQL XA 事务模式的 BUG 仍然很多,这个地基并没有其设想中的那样百分百巩固。

由阿里与蚂蚁团体共建的 Seata,是咱们多年外部分布式事务工程实际与技术教训的结晶,开源进去后失去了多达 150+ 以上行业同行生产环境的验证。开源小道既长且宽,这个路线上能够有机动车道也有非机动车道,还能够有人行道,大家携手把路线拓宽缩短,而非站在人行道上宣传机动车道危险性高且车速慢。

7、总结

Seata AT 模式依赖于各个 DB 厂商的不同版本的 DB Driver(数据库驱动),每种数据库公布新版本后,其 SQL 语义及其应用模式都可能产生扭转。随着近年 Seata 被其用户们广泛应用于多种业务场景,在开发者们的致力下,Seata AT 模式放弃了编程接口与其 XA 模式简直统一,适配了简直所有的支流数据库,并笼罩了这些数据库的次要风行版本的 Driver:真正做到了把分布式系统的“复杂性”留在了框架层面,把易用性和高性能交给了用户。

当然,Seata Java 版本的 XA 和 AT 模式还有许多须要欠缺与改良的中央,遑论其它多语言版本的实现。欢送对 Seata 及其多语言版本建设感兴趣的同行参加到 Seata 的建设中来,共同努力把 Seata 打造成一个标准化分布式事务平台。

本周举荐浏览

Go 内存透露,pprof 够用了么?

Go 原生插件应用问题全解析

Go 代码城市上云 –KusionStack 实际

Seata-php 半年布局

退出移动版