阿里开源分布式事务组件-seata-AT-模式的分支事务处理

53次阅读

共计 23789 个字符,预计需要花费 60 分钟才能阅读完成。

AT 模式与 TCC 模式

seata 的事务处理模式主要分为两种,分别是 AT 模式和 TCC 模式,其实在早期的 seata 版本里 (还叫 fescar 的时候),它把模式分为:AT 模式,MT 模式和混合模式。
那时还未深入了解 seata 时,对这样的分类模式感到有点奇怪,咱也不知道,更是不敢问。
不过现在这种只分为 AT 模式 和 TCC 模式的分类方法,我觉得是更合理的。
在王者荣耀这款游戏里,我听说过“万物皆可打野”,“万物皆可百穿”。
在算法的世界里,我听说过“万物皆可动态规划”。
在分布式事务的处理里,我觉得“万物皆可 TCC”。
seata 的 AT 模式主要是针对全局事务只涉及到数据库数据的场景,实际上 seata 的 AT 模式也是一种 TCC 模式,只不过,CC 阶段是程序自动完成的,不需要业务开发人员去关心如何去 Confirm,如何去 Cancel。在 AT 模式中,Try 阶段实际上已经完成本地事务的提交,而 Confirm 阶段则是删除 Undo log,Cancel 阶段则是利用 undo log,做数据的回放。
使用 seata 的朋友,如果 seata 给你一种分布式事务的接入很简单的感觉,你还是要记得你是在使用 TCC 模式。

AT 模式原理

seata 的 AT 模式,采用的是大量运用在数据库软件的 Write Ahead Log 思想,即把事务的信息以事务日志的方式记录下来。
这种处理方式,实际上是对传统两阶段提交的一种改进和优化,在《浅析阿里分布式事务组件 fescar/seata 2pc 的设计思想》这篇文章我也简单的分析了一下。主要有几个关键点:

  1. 传统两阶段提交协议是阻塞协议,性能差
  2. 传统两阶段提交协议高可用性不好
  3. 传统两阶段提交协议的全局事务隔离机制不支持
  4. 根据八二原则,80% 的涉及到全局事务的业务是能正常完成并提交的。

因此,在 AT 模式下,seata 采取的做法是,一个事务分支的数据库操作执行完后,马上进行本地事务的提交,从而释放相关的数据库资源。

本地事务执行流程

不过,进行本地提交的前提是,seata 会解析 SQL,获取数据库表的元数据,根绝 SQL 类型,选择性地生成数据的前置镜像和后置镜像,保存在 undo_log 表中,并且要求与保存 undo_log 与业务 SQL 在同一个本地事务内。这就保证了:

  1. 如果一个本地事务被提交,那么必定对应着相应的 undo_log
  2. 如果保存 undo_log 保存失败,那么业务 SQL 也会失败

全局事务提交流程

因为每个分支事务的本地事务都已经被提交,所以如果全局事务能够顺利进行到“提交“这一阶段,那么意味着所有事务分支的本地事务都已经被提交了,数据的一致性已经得到了保证。这个时候全局事务的提交就变得十分轻量级,就是把 undo_log 对应的记录删掉即可,即使是当时删除失败了,也已经不会影响全局事务的最终结果,这次删不了,那就待会再删,程序删不了,没事,顶多人工删。

全局事务回滚流程

如果全局事务的任何一个事务分支失败了,那么全局事务就进入“回滚“流程,回滚时依据先前保存好数据镜像,将原来的数据回放回去。如果全局回放成功,那么数据的一致性也就得到了保证,如果回放不成功,那么事务就进入异常。应对异常,可能需要重试,可能需要人工介入。

数据源代理机制

虽然执行一个正常的业务 SQL,需要附带解析 SQL,生成镜像等等这些操作,不过应用层编程时却不需要自己去实现这些细节。
这些细节 seata 已经自动帮你做好了。应用层看似简单的一个 JDBC 操作,实际上 seata 已经通过代理的方式在这个操作的前前后后加上了这些逻辑,seata 主要是通过代理模式来实现的。
在前面的文章《阿里开源分布式事务组件 seata:demo 环境搭建以及运行流程简析》中的 demo 里我们提到了 spring 的配置文件里对于数据源的配置方式,如下所示:

实际上,spring 的 jdbc template 的入参是一个由 seata 实现的 DataSource 的 Proxy,在 JDBC 的 API 设计中,Connection 通过 DataSource 获得,Statement 或者 PreparedStatement 则通过 Connection 获得。
seata 代理了 DataSource,相当于控制了源头,既然从源头代理了 DataSource,那么我也同样可以代理你的 Connection,同样可以代理你的 Statement 或者 PreparedStatement。(万物皆可代理 ……)
当应用层代码拿到一个普通的 JDBC Connection,进行着天真无邪的数据库操作时,其实这个 Connection 是个 proxy,而且由 Connection 生成的 Statement 或者 PreparedStatement,都是 proxy。如下图所示:

镜像生成机制

Template 模式

普通 SQL 执行时,织入的逻辑主要 StatementProxy 和 PreparedStatementProxy 里实现。当使用 StatementProxy 执行 executeUpdate 方法时:

@Override
public int executeUpdate(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, new StatementCallback<Integer, T>() {
        @Override
        public Integer execute(Statement statement, Object... args) throws SQLException {return statement.executeUpdate((String) args[0]);
        }
    }, sql);
}

这里使用 ExecuteTemplate 类去做主要的逻辑,这里其实也是一个 Template 模式的运用,seata 的很多地方都用了这种方式,在前面《阿里开源分布式事务组件 seata:demo 环境搭建以及运行流程简析》这篇文章中,我们就提到过一个 TransactionTemplate 的事务执行模板,通常这种机制运用在个性化的业务逻辑需要运行在固定的执行顺序中的时候。我们看一下 ExecuteTemplate 的 execute 方法是一个怎样的模板:

/**
 * Execute t.
 *
 * @param <T>               the type parameter
 * @param <S>               the type parameter
 * @param sqlRecognizer     the sql recognizer
 * @param statementProxy    the statement proxy
 * @param statementCallback the statement callback
 * @param args              the args
 * @return the t
 * @throws SQLException the sql exception
 */
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                 StatementProxy<S> statementProxy,
                                                 StatementCallback<T, S> statementCallback,
                                                 Object... args) throws SQLException {if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    if (sqlRecognizer == null) {
        sqlRecognizer = SQLVisitorFactory.get(statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
    }
    Executor<T> executor = null;
    if (sqlRecognizer == null) {executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
    } else {switch (sqlRecognizer.getSQLType()) {
            case INSERT:
                executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case UPDATE:
                executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case DELETE:
                executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case SELECT_FOR_UPDATE:
                executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            default:
                executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                break;
        }
    }
    T rs = null;
    try {rs = executor.execute(args);
    } catch (Throwable ex) {if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException)ex;
    }
    return rs;
}

通过这个方法,我们可以看到一个语句的执行模板大致是怎样的:

  1. 如果当前不处于分布式事务的上下文中,那么使用原来的 Statement 执行
  2. 解析 sql,并结合数据库的类型,指派对应类型的执行器,例如对于 insert 语句,它就指派 InsertExecutor 来执行
  3. 使用对应的执行器,执行相应的 sql

SQL 解析器

不同的 SQL 类型,会使用不同类型的 SQL 执行器来执行,这需要在执行器执行之前,先对 SQL 进行解析,得到 SQL 的元信息,例如类型、表名、where 条件,涉及到的列等等,这些信息由 SQLRecognizer 来表示。

public interface SQLRecognizer {

    /**
     * Type of the SQL. INSERT/UPDATE/DELETE ...
     *
     * @return sql type
     */
    SQLType getSQLType();

    /**
     * 获取表的别名
     */
    String getTableAlias();

    String getTableName();

    String getOriginalSQL();}

但是这个接口定义的 API 得到的信息都较为基础,只是 SQL 类型,表名,表别名以及原 SQL 而已。实际上在应对不同类型的 SQL 时,需要获取的关于 SQL 的信息都不太一样,更具体更复杂的功能,其实是在具体实现的 XXXRecognizer 中。

seata 的 SQL 解析器采用的是它们自家的 druid,druid 的核心功能是一个数据库连接池,但伴随着它 druid 也自带了标准的 SQL 解析器,支持多种 SQL 方言。并且 druid 的 SQL 解析器模块相对比较独立,基本上可以单独拿出来用。
关于 druid 的 SQL 解析器的原理,可以看看我之前写的文章:

  • 《提取 Druid 的 SQL 解析器》
  • 《Druid SQL 解析器概览》
  • 《Druid SQL 解析器的解析过程》

基本上,seata 对于 druid 的使用方式,就这两个核心的步骤:

  1. 利用 druid 的 API 解析 SQL,生成一棵 AST(通常类名为 xxxStatement)
  2. 可以直接通过 AST 获得的信息就直接获得
  3. 无法通过 AST 直接获得的,则编写 visitor,从 AST 中提取想要的信息

seata 中的 visitor 大多数都是 OutputVisitor,即可以让 druid 把想要的信息自动追加到你传进去的 StringBuilder 里面,获取信息时只需要调用 StringBuilder.toString 即可。
例如在 MySQLInsertRecognizer 这个类中,语法树的表示:

直接从语法树上获取 insert 对应的字段:

使用 visitor 获取信息:

其它类型 Recognizer 基本上大同小异,这里不展开了。

数据库表元信息

seata 在生成镜像时,需要生成前后镜像查询 SQL,这些 SQL 的 where 条件,并不是一味地依赖于原 SQL 的 where 条件,特别是原 SQL 的 where 条件十分复杂,且字段繁多的时候。因此 seata 会将数据库表的元信息查询出来,缓存在本地,在构造镜像查询 SQL 时,可以将 where 条件优化成带有主键字段或者带有索引字段的查询条件,提高查询的效率。
这些缓存下来的元信息主要包括:

  1. 表的主键信息
  2. 表的索引信息
  3. 表的字段信息

在 seata 中,用 TableMeta 表示表的元信息,ColumnMeta 表示字段的元信息,IndexMeta 表示索引的信息。除了将这些元信息缓存在本地,seata 还会定期去更新表的元信息。

private static final Cache<String, TableMeta> TABLE_META_CACHE = Caffeine.newBuilder().maximumSize(CACHE_SIZE)
    .expireAfterWrite(EXPIRE_TIME, TimeUnit.MILLISECONDS).softValues().build();


/**
 * Clear the table meta cache
 * @param dataSourceProxy
 */
public static void refresh(final DataSourceProxy dataSourceProxy){ConcurrentMap<String, TableMeta> tableMetaMap = TABLE_META_CACHE.asMap();
    for (Entry<String, TableMeta> entry : tableMetaMap.entrySet()) {
        try {TableMeta tableMeta = fetchSchema(dataSourceProxy, entry.getValue().getTableName());
            if (tableMeta == null){LOGGER.error("get table meta error");
            }
            if (!tableMeta.equals(entry.getValue())){TABLE_META_CACHE.put(entry.getKey(), tableMeta);
                LOGGER.info("table meta change was found, update table meta cache automatically.");
            }
        } catch (SQLException e) {LOGGER.error("get table meta error:{}", e.getMessage(), e);
        }
    }
}

SQL 执行器

不同的 SQL 类型,会使用不同类型的 SQL 执行器来执行,这个执行器的执行过程中,就会包含前置镜像和后置镜像的生成,undo_log 的插入等,它的代码层次结构是这样的:

其中,PlainExecutor 则是默认执行原本的操作,不添加任何其它逻辑,当本地事务不参与到全局事务的时候,使用这个执行器。

BaseTransactionExecutor 执行器定义了一些公共的执行逻辑,如下所示:

@Override
public Object execute(Object... args) throws Throwable {if (RootContext.inGlobalTransaction()) {String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
    }

    if (RootContext.requireGlobalLock()) {statementProxy.getConnectionProxy().setGlobalLockRequire(true);
    } else {statementProxy.getConnectionProxy().setGlobalLockRequire(false);
    }
    return doExecute(args);
}

1 判断当前是否处于全局事务中,如果是,将全局事务 id 绑定到当前连接上。

  1. 判断是否需要加锁
  2. 执行抽象方法 doExecute

doExecute 方法是抽象的,留待子类实现。

DML 相关的执行器

AbstractDMLBaseExecutor 中实现了 doExecute 方法:

@Override
public T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);
    } else {return executeAutoCommitFalse(args);
    }
}

这里需要判断一下当前的连接是否是自动提交的,来决定不同的处理方式。
如果当前的连接设置已经是自动提交的,那么需要手动将它设置为非自动提交,也就是手动开启事务,目的是为了将原 SQL 与 undo_log 写数据的 SQL 保持在同一个本地事务里面。如下代码所示:

/**
 * Execute auto commit true t.
 *
 * @param args the args
 * @return the t
 * @throws Throwable the throwable
 */
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    T result = null;
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    LockRetryController lockRetryController = new LockRetryController();
    try {connectionProxy.setAutoCommit(false);
        while (true) {
            try {result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                break;
            } catch (LockConflictException lockConflict) {connectionProxy.getTargetConnection().rollback();
                lockRetryController.sleep(lockConflict);
            }
        }

    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("exception occur", e);
        throw e;
    } finally {connectionProxy.setAutoCommit(true);
    }
    return result;
}
  1. 首先将连接置为非自动提交的状态
  2. 构造一个无限循环,以便在锁冲突导致的执行失败后可以重试,循环退出条件是冲突次数达到一定次数或者正常执行完成。
  3. 循环体的主体内容是,以非自动提交的状态执行 SQL 处理逻辑,然后手动提交。

如果当前的连接设置是非自动提交的,说明当前已经处于一个本地事务中,不需要手动开启事务。如下代码所示:

/**
 * Execute auto commit false t.
 *
 * @param args the args
 * @return the t
 * @throws Throwable the throwable
 */
protected T executeAutoCommitFalse(Object[] args) throws Throwable {TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

这里的执行逻辑是

  1. 先生成前置镜像
  2. 执行原 SQL
  3. 生成后置镜像
  4. 暂时保存前置镜像和后置镜像作为一条 undo log.

生成前置镜像和后置镜像的方法 beforeImage 和 afterImage 都是抽象,留给具体的子类去实现。
prepareUndoLog 主要是利用前置镜像和后置镜像,生成一条 undo log:

/**
 * prepare undo log.
 *
 * @param beforeImage the before image
 * @param afterImage  the after image
 * @throws SQLException the sql exception
 */
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {return;}

    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    String lockKeys = buildLockKey(lockKeyRecords);
    connectionProxy.appendLockKey(lockKeys);

    SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
    connectionProxy.appendUndoLog(sqlUndoLog);
}

update 语句镜像生成

对于 update 语句,需要同时生成前置镜像和后置镜像,因为 update 是会修改已有数据的。

update 前置镜像

UpdateExecutor 封装了前置镜像和后置镜像的生成,下面是前置镜像的生成细节:


@Override
protected TableRecords beforeImage() throws SQLException {ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
    TableMeta tmeta = getTableMeta();
    String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
    logger.info("logged by beanlam, original sql = {}, before image query sql = {}", this.sqlRecognizer.getOriginalSQL(), selectSQL);
    return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}

private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
    List<String> updateColumns = recognizer.getUpdateColumns();
    StringBuilder prefix = new StringBuilder("SELECT");
    if (!tableMeta.containsPK(updateColumns)) {prefix.append(getColumnNameInSQL(tableMeta.getPkName()) + ",");
    }
    StringBuilder suffix = new StringBuilder("FROM" + getFromTableInSQL());
    String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
    if (StringUtils.isNotBlank(whereCondition)) {suffix.append("WHERE" + whereCondition);
    }
    suffix.append("FOR UPDATE");
    StringJoiner selectSQLJoin = new StringJoiner(",", prefix.toString(), suffix.toString());
    for (String updateColumn : updateColumns) {selectSQLJoin.add(updateColumn);
    }
    return selectSQLJoin.toString();}

首先通过 buildBeforeImageSQL 方法生成前置镜像查询语句,这里需要用到 SQL 解析器解析到的 SQL 信息,比如更新的字段,更新时的 where 条件是什么。也需要用到已经缓存的表的元数据的信息,比如说判断更新字段是否主键。
这里有两个关键的地方:

  1. 一个是如果主键不在更新字段列表里,那么构造后的查询 SQL 里,主键字段必须在查询的字段列表里
  2. 查询语句需要加 for update 进行加行锁

前置镜像查询语句构造完成后,便要通过 buildTableRecords 方法进行前置镜像的查询:

protected TableRecords buildTableRecords(TableMeta tableMeta, String selectSQL, ArrayList<List<Object>> paramAppenderList) throws SQLException {
    TableRecords tableRecords = null;
    PreparedStatement ps = null;
    Statement st = null;
    ResultSet rs = null;
    try {if (paramAppenderList.isEmpty()) {st = statementProxy.getConnection().createStatement();
            rs = st.executeQuery(selectSQL);
        } else {if (paramAppenderList.size() == 1) {ps = statementProxy.getConnection().prepareStatement(selectSQL);
                List<Object> paramAppender = paramAppenderList.get(0);
                for (int i = 0; i < paramAppender.size(); i++) {ps.setObject(i + 1, paramAppender.get(i));
                }
            } else {ps = statementProxy.getConnection().prepareStatement(selectSQL);
                List<Object> paramAppender = null;
                for (int i = 0; i < paramAppenderList.size(); i++) {paramAppender = paramAppenderList.get(i);
                    for (int j = 0; j < paramAppender.size(); j++) {ps.setObject(i * paramAppender.size() + j + 1, paramAppender.get(j));
                    }
                }
            }
            rs = ps.executeQuery();}
        tableRecords = TableRecords.buildRecords(tableMeta, rs);
    } finally {if (rs != null) {rs.close();
        }
        if (st != null) {st.close();
        }
        if (ps != null) {ps.close();
        }
    }
    return tableRecords;
}

这里,查出来的数据,以 TableRecords 类表示,一个 TableRecords 实例,会包含一次查询出来的一行或多行数据,并且包括一些元信息,比如说主键是哪个字段,每个字段的类型等等。TableRecords 是前置镜像的一个组成部分。

update 后置镜像

后置镜像的生成细节如下所示:

@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {TableMeta tmeta = getTableMeta();
    if (beforeImage == null || beforeImage.size() == 0) {return TableRecords.empty(getTableMeta());
    }
    String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
    logger.info("logged by beanlam, original sql = {}, after image query sql = {}", this.sqlRecognizer.getOriginalSQL(), selectSQL);
    TableRecords afterImage = null;
    PreparedStatement pst = null;
    ResultSet rs = null;
    try {pst = statementProxy.getConnection().prepareStatement(selectSQL);
        int index = 0;
        for (Field pkField : beforeImage.pkRows()) {
            index++;
            pst.setObject(index, pkField.getValue(), pkField.getType());
        }
        rs = pst.executeQuery();
        afterImage = TableRecords.buildRecords(tmeta, rs);

    } finally {if (rs != null) {rs.close();
        }
        if (pst != null) {pst.close();
        }
    }
    return afterImage;
}

private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
    List<String> updateColumns = recognizer.getUpdateColumns();
    StringBuilder prefix = new StringBuilder("SELECT");
    if (!tableMeta.containsPK(updateColumns)) {
        // PK should be included.
        prefix.append(getColumnNameInSQL(tableMeta.getPkName()) + ",");
    }
    String suffix = "FROM" + getFromTableInSQL() + "WHERE" + buildWhereConditionByPKs(beforeImage.pkRows());
    StringJoiner selectSQLJoiner = new StringJoiner(",", prefix.toString(), suffix);
    for (String column : updateColumns) {selectSQLJoiner.add(column);
    }
    return selectSQLJoiner.toString();}

基本上后置镜像的生成步骤与生成前置镜像的相同,但有个需要注意的点是,后置镜像的查询语句中,直接抛弃了原来的 update 条件。把 where 提交优化为仅包含主键,这能提高语句执行的效率。为什么能这么优化?因为前面的前置镜像的查询列表里包含了主键,所以能够知道 update 的那些数据里,它们的主键值是什么。

一个范例

下面是我自己随便跑的一个 update 语句以及它的前后镜像生成语句:
原 sql : update bank_account set money = (money + 10) where id = 1
前置镜像查询 SQL:SELECT id, money FROM bank_account WHERE id = 1 FOR UPDATE
后置镜像查询 SQL : SELECT id, money FROM bank_account WHERE id = ?
最终生成的 undo log(序列化方式为 JSON):

{
 "@class": "io.seata.rm.datasource.undo.BranchUndoLog",
 "xid": "192.168.18.1:8091:2024768572",
 "branchId": 2024768574,
 "sqlUndoLogs": ["java.util.ArrayList", [{
  "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
  "sqlType": "UPDATE",
  "tableName": "bank_account",
  "beforeImage": {
   "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
   "tableName": "bank_account",
   "rows": ["java.util.ArrayList", [{
    "@class": "io.seata.rm.datasource.sql.struct.Row",
    "fields": ["java.util.ArrayList", [{
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "id",
     "keyType": "PrimaryKey",
     "type": 4,
     "value": 1
    }, {
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "money",
     "keyType": "NULL",
     "type": 4,
     "value": 100
    }]]
   }]]
  },
  "afterImage": {
   "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
   "tableName": "bank_account",
   "rows": ["java.util.ArrayList", [{
    "@class": "io.seata.rm.datasource.sql.struct.Row",
    "fields": ["java.util.ArrayList", [{
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "id",
     "keyType": "PrimaryKey",
     "type": 4,
     "value": 1
    }, {
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "money",
     "keyType": "NULL",
     "type": 4,
     "value": 90
    }]]
   }]]
  }
 }]]
}

delete 语句镜像生成

delete 语句是对数据进行删除,因此它只需要保存前置镜像,不需要保存后置镜像


@Override
protected TableRecords beforeImage() throws SQLException {SQLDeleteRecognizer visitor = (SQLDeleteRecognizer) sqlRecognizer;
    TableMeta tmeta = getTableMeta(visitor.getTableName());
    ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
    String selectSQL = buildBeforeImageSQL(visitor, tmeta, paramAppenderList);
    logger.info("logged by beanlam, original sql = {}, before image query sql = {}", this.sqlRecognizer.getOriginalSQL(), selectSQL);
    return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}

private String buildBeforeImageSQL(SQLDeleteRecognizer visitor, TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL);
    String whereCondition = buildWhereCondition(visitor, paramAppenderList);
    StringBuilder suffix = new StringBuilder("FROM" + keywordChecker.checkAndReplace(getFromTableInSQL()));
    if (StringUtils.isNotBlank(whereCondition)) {suffix.append("WHERE" + whereCondition);
    }
    suffix.append("FOR UPDATE");
    StringJoiner selectSQLAppender = new StringJoiner(",", "SELECT", suffix.toString());
    for (String column : tableMeta.getAllColumns().keySet()) {selectSQLAppender.add(getColumnNameInSQL(keywordChecker.checkAndReplace(column)));
    }
    return selectSQLAppender.toString();}

@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {return TableRecords.empty(getTableMeta());
}

insert 语句镜像生成

insert 语句插入数据到数据库里,因此不存在前置镜像,只需要保存后置镜像

@Override
protected TableRecords beforeImage() throws SQLException {return TableRecords.empty(getTableMeta());
}

@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
    //Pk column exists or PK is just auto generated
    List<Object> pkValues = containsPK() ? getPkValuesByColumn() : getPkValuesByAuto();

    TableRecords afterImage = buildTableRecords(pkValues);

    if (afterImage == null) {throw new SQLException("Failed to build after-image for insert");
    }

    return afterImage;
}

SELECT FOR UPDATE 语句的执行

查询类的 SQL 操作是不带事务的,因此,对于普通的 select 语句,没有必要生成镜像这些东西,也没有必要为其单独开一个分支事务。
不过如果是 select for update,情况就不一样了,因为全局事务锁与单机事务锁是不一样的。在一个单库上希望使用 select for update 去做行锁时,还要检查一下这些数据是否在全局事务的范围内被锁住了。

@Override
public T doExecute(Object... args) throws Throwable {Connection conn = statementProxy.getConnection();
    T rs = null;
    Savepoint sp = null;
    LockRetryController lockRetryController = new LockRetryController();
    boolean originalAutoCommit = conn.getAutoCommit();
    ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
    String selectPKSQL = buildSelectSQL(paramAppenderList);
    try {if (originalAutoCommit) {conn.setAutoCommit(false);
        }
        sp = conn.setSavepoint();

        while (true) {
            try {
                // #870
                // execute return Boolean
                // executeQuery return ResultSet
                rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

                // Try to get global lock of those rows selected
                TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
                String lockKeys = buildLockKey(selectPKRows);
                if (StringUtils.isNullOrEmpty(lockKeys)) {break;}

                if (RootContext.inGlobalTransaction()) {
                    //do as usual
                    statementProxy.getConnectionProxy().checkLock(lockKeys);
                } else if (RootContext.requireGlobalLock()) {
                    //check lock key before commit just like DML to avoid reentrant lock problem(no xid thus can
                    // not reentrant)
                    statementProxy.getConnectionProxy().appendLockKey(lockKeys);
                } else {throw new RuntimeException("Unknown situation!");
                }
                break;
            } catch (LockConflictException lce) {conn.rollback(sp);
                lockRetryController.sleep(lce);
            }
        }
    } finally {if (sp != null) {conn.releaseSavepoint(sp);
        }
        if (originalAutoCommit) {conn.setAutoCommit(true);
        }
    }
    return rs;
}

遇到 select for update 的时候,seata 会先老老实实把数据查出来,并检查相应的数据有没有存在于全局事务锁中。如果有,则查询失败,如果没有,那么正常返回查询到的数据。
关于全局事务锁以及事务隔离级别,打算后面单独用一篇文章来写,这里暂时不深入,传统两阶段提交的一个缺点就是缺乏全局事务锁的支持,这一块对于分布式事务的处理是很重要的。

undo log 的保存

前面说了数据镜像的生成,但是并没有讲到最终的 undo_log 是如何保存的,这涉及到:

  1. undo_log 的保存时机
  2. undo_log 的保存格式

时机

回顾一下前面的 DML SQL 执行器中,一个 DML 执行的流程模板如下所示:

/**
 * Execute auto commit false t.
 *
 * @param args the args
 * @return the t
 * @throws Throwable the throwable
 */
protected T executeAutoCommitFalse(Object[] args) throws Throwable {TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

前后镜像生成后,将会被组装成一个 undo log,这个 prepareUndoLog 方法的细节如下所示:

/**
 * prepare undo log.
 *
 * @param beforeImage the before image
 * @param afterImage  the after image
 * @throws SQLException the sql exception
 */
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {return;}

    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    String lockKeys = buildLockKey(lockKeyRecords);
    connectionProxy.appendLockKey(lockKeys);

    SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
    connectionProxy.appendUndoLog(sqlUndoLog);
}
  1. 首先获取这些 DML 语句可能需要的锁信息
  2. 将前置镜像和后置镜像组成一个 SQLUndoLog 实例
  3. 将 SQLUndoLog 暂存到 connection 代理对象中,也就是说,还在内存里。

什么时候应该真正把 undo log 保存到数据库呢,答案是本地事务提交的时候,我们来看一下 ConnectionProxy 的 commit 方法

@Override
public void commit() throws SQLException {if (context.inGlobalTransaction()) {processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();
    } else {targetConnection.commit();
    }
}

private void processGlobalTransactionCommit() throws SQLException {
    try {register();
    } catch (TransactionException e) {recognizeLockKeyConflictException(e);
    }

    try {if (context.hasUndoLog()) {if(JdbcConstants.ORACLE.equalsIgnoreCase(this.getDbType())) {UndoLogManagerOracle.flushUndoLogs(this);
           } else {UndoLogManager.flushUndoLogs(this);
           }
        }
        targetConnection.commit();} catch (Throwable ex) {report(false);
        if (ex instanceof SQLException) {throw new SQLException(ex);
        }
    }
    report(true);
    context.reset();}
  1. 先向 seata server 注册分支事务
  2. 保存 undo log
  3. 提交本地事务
  4. 向 seata server 汇报分支事务的执行状态

因此到了这里,undo_log 保存到数据库的时机,我们已经搞清楚了。

格式

接下来让我们来看看 undo_log 的保存格式,进入到 UndoLogManager 的 flushUndoLogs 方法里:

/**
 * Flush undo logs.
 *
 * @param cp the cp
 * @throws SQLException the sql exception
 */
public static void flushUndoLogs(ConnectionProxy cp) throws SQLException {assertDbSupport(cp.getDbType());

    ConnectionContext connectionContext = cp.getContext();
    String xid = connectionContext.getXid();
    long branchID = connectionContext.getBranchId();

    BranchUndoLog branchUndoLog = new BranchUndoLog();
    branchUndoLog.setXid(xid);
    branchUndoLog.setBranchId(branchID);
    branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

    UndoLogParser parser = UndoLogParserFactory.getInstance();
    byte[] undoLogContent = parser.encode(branchUndoLog);

    if (LOGGER.isDebugEnabled()) {LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
    }

    insertUndoLogWithNormal(xid, branchID, buildContext(parser.getName()), undoLogContent,
        cp.getTargetConnection());
}
private static void insertUndoLog(String xid, long branchID, String rollbackCtx,
                                  byte[] undoLogContent, State state, Connection conn) throws SQLException {
    PreparedStatement pst = null;
    try {pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL);
        pst.setLong(1, branchID);
        pst.setString(2, xid);
        pst.setString(3, rollbackCtx);
        pst.setBlob(4, BlobUtils.bytes2Blob(undoLogContent));
        pst.setInt(5, state.getValue());
        pst.executeUpdate();} catch (Exception e) {if (!(e instanceof SQLException)) {e = new SQLException(e);
        }
        throw (SQLException) e;
    } finally {if (pst != null) {pst.close();
        }
    }
}
  1. 先前所有保存的 SQLUndoLog 会被整合进一个 BranchUndoLog 对象里
  2. 对这个 BranchUndoLog 对象进行序列化(一般用 json)
  3. 插入到 undo_log 表

BranchUndoLog 对象被序列化后,以 JSON 为例子,从我们前文看到 update undo log 能看到它的结构:

{
 "@class": "io.seata.rm.datasource.undo.BranchUndoLog",
 "xid": "192.168.18.1:8091:2024768572",
 "branchId": 2024768574,
 "sqlUndoLogs": ["java.util.ArrayList", [{
  "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
  "sqlType": "UPDATE",
  "tableName": "bank_account",
  "beforeImage": {
   "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
   "tableName": "bank_account",
   "rows": ["java.util.ArrayList", [{
    "@class": "io.seata.rm.datasource.sql.struct.Row",
    "fields": ["java.util.ArrayList", [{
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "id",
     "keyType": "PrimaryKey",
     "type": 4,
     "value": 1
    }, {
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "money",
     "keyType": "NULL",
     "type": 4,
     "value": 100
    }]]
   }]]
  },
  "afterImage": {
   "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
   "tableName": "bank_account",
   "rows": ["java.util.ArrayList", [{
    "@class": "io.seata.rm.datasource.sql.struct.Row",
    "fields": ["java.util.ArrayList", [{
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "id",
     "keyType": "PrimaryKey",
     "type": 4,
     "value": 1
    }, {
     "@class": "io.seata.rm.datasource.sql.struct.Field",
     "name": "money",
     "keyType": "NULL",
     "type": 4,
     "value": 90
    }]]
   }]]
  }
 }]]
}

除了保存 BranchUndoLog,数据库里的 UndoLog 表还保留着其它的信息,UndoLog 当前版本的表结构如下所示:

CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,
                          `branch_id` bigint(20) NOT NULL,
                          `xid` varchar(100) NOT NULL,
                          `context` varchar(128) NOT NULL,
                          `rollback_info` longblob NOT NULL,
                          `log_status` int(11) NOT NULL,
                          `log_created` datetime NOT NULL,
                          `log_modified` datetime NOT NULL,
                          `ext` varchar(100) DEFAULT NULL,
                          PRIMARY KEY (`id`),
                          UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

正文完
 0