关于后端:PolarDBX-源码解读系列DML-之-INSERT-IGNORE-流程

30次阅读

共计 10426 个字符,预计需要花费 27 分钟才能阅读完成。

简介:本文将进一步介绍 PolarDB-X 中 INSERT IGNORE 的执行流程,其依据插入的表是否有 GSI 也有所变动。作者:潜璟在上一篇源码浏览中,咱们介绍了 INSERT 的执行流程。而 INSERT IGNORE 与 INSERT 不同,须要对插入值判断是否有 Unique Key 的抵触,并疏忽有抵触的插入值。因而本文将进一步介绍 PolarDB-X 中 INSERT IGNORE 的执行流程,其依据插入的表是否有 GSI 也有所变动。下推执行如果插入的表只有一张主表,没有 GSI,那么只须要将 INSERT IGNORE 间接发送到对应的物理表上,由 DN 自行疏忽存在抵触的值。在这种状况下,INSERT IGNORE 的执行过程和 INSERT 基本上雷同,读者能够参考之前的源码阅读文章。逻辑执行而在有 GSI 的状况下,就不能简略地将 INSERT IGNORE 别离下发到主表和 GSI 对应的物理分表上,否则有可能呈现主表和 GSI 数据不统一的状况。举个例子:create table t1 (a int primary key, b int, global index g1(b) dbpartition by hash(b)) dbpartition by hash(a);
insert ignore into t1 values (1,1),(1,2); 对于插入的两条记录,它们在主表上位于同一个物理表(a 雷同),然而在 GSI 上位于不同的物理表(b 不雷同),如果间接下发 INSERT IGNORE 的话,主表上只有 (1,1) 可能胜利插入(主键抵触),而在 GSI 上 (1,1) 和 (1,2) 都能胜利插入,于是 GSI 比主表多了一条数据。针对这种状况,一种解决方案是依据插入值中的 Unique Key,先到数据库中 SELECT 出有可能抵触的数据到 CN,而后在 CN 判断抵触的值并删除。进行 SELECT 的时候,最简略的形式就是将所有的 SELECT 间接发送到主表上,然而主表上可能没有对应的 Unique Key,这就导致 SELECT 的时候会进行全表扫描,影响性能。所以在优化器阶段,咱们会依据 Unique Key 是在主表还是 GSI 上定义的来确定相应的 SELECT 须要发送到主表还是 GSI,具体代码地位:com.alibaba.polardbx.optimizer.core.planner.rule.OptimizeLogicalInsertRule#groupUkByTableprotected Map<String, List<List<String>>> groupUkByTable(LogicalInsertIgnore insertIgnore,

                                                         ExecutionContext executionContext) {
    // 找到每个 Unique Key 在主表和哪些 GSI 中存在
    Map<Integer, List<String>> ukAllTableMap = new HashMap<>();
    for (int i = 0; i < uniqueKeys.size(); i++) {List<String> uniqueKey = uniqueKeys.get(i);
        for (Map.Entry<String, Map<String, Set<String>>> e : writableTableUkMap.entrySet()) {String currentTableName = e.getKey().toUpperCase();
            Map<String, Set<String>> currentUniqueKeys = e.getValue();
            boolean found = false;
            for (Set<String> currentUniqueKey : currentUniqueKeys.values()) {if (currentUniqueKey.size() != uniqueKey.size()) {continue;}
                boolean match = currentUniqueKey.containsAll(uniqueKey);
                if (match) {
                    found = true;
                    break;
                }
            }
            if (found) {ukAllTableMap.computeIfAbsent(i, k -> new ArrayList<>()).add(currentTableName);
            }
        }
    }

    // 确定是在哪一个表上进行 SELECT
    for (Map.Entry<Integer, List<String>> e : ukAllTableMap.entrySet()) {List<String> tableNames = e.getValue();

        if (tableNames.contains(primaryTableName.toUpperCase())) {tableUkMap.computeIfAbsent(primaryTableName.toUpperCase(), k -> new ArrayList<>())
                .add(uniqueKeys.get(e.getKey()));
        } else {
            final boolean onlyNonPublicGsi =
                tableNames.stream().noneMatch(tn -> GlobalIndexMeta.isPublished(executionContext, sm.getTable(tn)));

            boolean found = false;
            for (String tableName : tableNames) {if (!onlyNonPublicGsi && GlobalIndexMeta.isPublished(executionContext, sm.getTable(tableName))) {tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));
                    found = true;
                    break;
                } else if (onlyNonPublicGsi && GlobalIndexMeta.canWrite(executionContext, sm.getTable(tableName))) {tableUkMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(uniqueKeys.get(e.getKey()));
                    found = true;
                    break;
                }
            }
        }
    }

    return tableUkMap;
} 而到了执行阶段,咱们在 LogicalInsertIgnoreHandler 中解决 INSERT IGNORE。咱们首先会进入 getDuplicatedValues 函数,其通过下发 SELECT 的形式查找表中已有的抵触的 Unique Key 的记录。咱们将下发的 SELECT 语句中抉择的列设置为 (value_index, uk_index, pk)。其中 value_index 和 uk_index 均为的常量。举个例子,假如有表:CREATE TABLE `t` (`id` int(11) NOT NULL,
`a` int(11) NOT NULL,
`b` int(11) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)

) DBPARTITION BY HASH(id) 以及一条 INSERT IGNORE 语句:INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5); 假如在 PolarDB-X 中执行时,其会将 Unique Key 编号为 0: id
1: g_i_aINSERT IGNORE 语句中插入的每个值别离编号为 0: (1,2,3)
1: (2,3,4)
2: (3,4,5) 那么对于 (2,3,4) 的 UNIQUE KEY 结构的 GSI 上的 SELECT 即为:查问 GSISELECT 1 as value_index, 1 as uk_index, id
FROM g_i_a_xxxx
WHERE a in 3; 假如表中曾经存在 (5,3,6),那么这条 SELECT 的返回后果即为 (1,1,5)。此外,因为不同的 Unique Key 的 SELECT 返回格局是雷同的,所以咱们会将同一个物理库上不同的 SELECT 查问 UNION 起来发送,以一次性失去多个后果,缩小 CN 和 DN 之间的交互次数。只有某个 Unique Key 有反复值,咱们就能依据 value_index 和 uk_index 确定是插入值的哪一行的哪个 Unique Key 是反复的。当失去所有的返回后果之后,咱们对数据进行去重。咱们将上一步失去的抵触的的值放入一个 SET 中,而后程序扫描所有的每一行插入值,如果发现有反复的就跳过该行,否则就将该行也退出到 SET 中(因为插入值之间也有可能存在互相抵触)。去重结束之后,咱们就失去了所有不存在抵触的值,将这些值插入到表中之后就实现了一条 INSERT IGNORE 的执行。逻辑执行的执行流程:com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecuteprotected int doExecute(LogicalInsert insert, ExecutionContext executionContext,

                        LogicalInsert.HandlerParams handlerParams) {
    // ...

    try {Map<String, List<List<String>>> ukGroupByTable = insertIgnore.getUkGroupByTable();
        List<Map<Integer, ParameterContext>> deduplicated;
        List<List<Object>> duplicateValues;
        // 获取表中已有的 Unique Key 抵触值
        duplicateValues = getDuplicatedValues(insertIgnore, LockMode.SHARED_LOCK, executionContext, ukGroupByTable,
            (rowCount) -> memoryAllocator.allocateReservedMemory(MemoryEstimator.calcSelectValuesMemCost(rowCount, selectRowType)), selectRowType, true,
            handlerParams);

        final List<Map<Integer, ParameterContext>> batchParameters =
            executionContext.getParams().getBatchParameters();

        // 依据上一步失去的后果,去掉 INSERT IGNORE 中的抵触值
        deduplicated = getDeduplicatedParams(insertIgnore.getUkColumnMetas(), insertIgnore.getBeforeUkMapping(),
            insertIgnore.getAfterUkMapping(), RelUtils.getRelInput(insertIgnore), duplicateValues,
            batchParameters, executionContext);

        if (!deduplicated.isEmpty()) {insertEc.setParams(new Parameters(deduplicated));
        } else {
            // All duplicated
            return affectRows;
        }

        // 执行 INSERT
        try {if (gsiConcurrentWrite) {affectRows = concurrentExecute(insertIgnore, insertEc);
            } else {affectRows = sequentialExecute(insertIgnore, insertEc);
            }
        } catch (Throwable e) {handleException(executionContext, e, GeneralUtil.isNotEmpty(insertIgnore.getGsiInsertWriters()));
        }
    } finally {selectValuesPool.destroy();
    }
    return affectRows;
}RETURNING 优化上一节提到的 INSERT IGNORE 的逻辑执行形式,尽管保障了数据的正确性,然而也使得一条 INSERT IGNORE 语句至多须要 CN 和 DN 的两次交互能力实现(第一次 SELECT,第二次 INSERT),影响了 INSERT IGNORE 的执行性能。目前的 DN 曾经反对了 AliSQL 的 RETURNING 优化,其能够在 DN 的 INSERT IGNORE 执行结束之后返回胜利插入的值。利用这一性能,PolarDB-X 对 INSERT IGNORE 进行了进一步的优化:间接将 INSERT IGNORE 下发,如果在主表和 GSI 上全副胜利返回,那么就阐明插入值中没有抵触,于是就胜利实现该条 INSERT IGNORE 的执行;否则就将多插入的值删除。执行时,CN 首先会依据上文中的语法下发带有 RETURNING 的物理 INSERT IGNORE 语句到 DN,比方:call dbms_trans.returning("a", "insert into t1_xxxx values(1,1)"); 其中返回列是主键,用来标识插入的一批数据中哪些被胜利插入了;t1_xxxx 是逻辑表 t1 的一个物理分表。当主表和 GSI 上的所有 INSERT IGNORE 执行结束之后,咱们计算主表和 GSI 中胜利插入值的交加作为最初的后果,而后删除多插入的值。这部分代码在 com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#getRowsToBeRemovedprivate Map<String, List<List<Object>>> getRowsToBeRemoved(String tableName,
                                                           Map<String, List<List<Object>>> tableInsertedValues,
                                                           List<Integer> beforePkMapping,
                                                           List<ColumnMeta> pkColumnMetas) {final Map<String, Set<GroupKey>> tableInsertedPks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
    final Map<String, List<Pair<GroupKey, List<Object>>>> tablePkRows =
        new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
    tableInsertedValues.forEach((tn, insertedValues) -> {final Set<GroupKey> insertedPks = new TreeSet<>();
        final List<Pair<GroupKey, List<Object>>> pkRows = new ArrayList<>();
        for (List<Object> inserted : insertedValues) {final Object[] groupKeys = beforePkMapping.stream().map(inserted::get).toArray();
            final GroupKey pk = new GroupKey(groupKeys, pkColumnMetas);
            insertedPks.add(pk);
            pkRows.add(Pair.of(pk, inserted));
        }
        tableInsertedPks.put(tn, insertedPks);
        tablePkRows.put(tn, pkRows);
    });

    // Get intersect of inserted values
    final Set<GroupKey> distinctPks = new TreeSet<>();
    for (GroupKey pk : tableInsertedPks.get(tableName)) {if (tableInsertedPks.values().stream().allMatch(pks -> pks.contains(pk))) {distinctPks.add(pk);
        }
    }

    // Remove values which not exists in at least one insert results
    final Map<String, List<List<Object>>> tableDeletePks = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
    tablePkRows.forEach((tn, pkRows) -> {final List<List<Object>> deletePks = new ArrayList<>();
        pkRows.forEach(pkRow -> {if (!distinctPks.contains(pkRow.getKey())) {deletePks.add(pkRow.getValue());
            }
        });
        if (!deletePks.isEmpty()) {tableDeletePks.put(tn, deletePks);
        }
    });
    return tableDeletePks;
} 与上一节的逻辑执行的“乐观执行”相比,应用 RETURNING 优化的 INSERT IGNORE 相当于“乐观执行”,如果插入的值自身没有抵触,那么一条 INSERT IGNORE 语句 CN 和 DN 间只须要一次交互即可;而在有抵触的状况下,咱们须要下发 DELETE 语句将主表或 GSI 中多插入的值删除,于是 CN 和 DN 间须要两次交互。能够看出,即使是有抵触的状况,CN 和 DN 间的交互次数也不会超过上一节的逻辑执行。因而在无奈间接下推的状况下,INSERT IGNORE 的执行策略是默认应用 RETURNING 优化执行。当然 RETURNING 优化的应用也有一些限度,比方插入的 Value 有反复主键时就不能应用,因为这种状况下无奈判断具体是哪一行被胜利插入,哪一行须要删除;具体能够浏览代码中的条件判断。当不能应用 RETURNING 优化时,零碎会主动抉择上一节中的逻辑执行形式执行该条 INSERT IGNORE 语句以保证数据的正确性。应用 RETURNING 优化的执行流程:com.alibaba.polardbx.repo.mysql.handler.LogicalInsertIgnoreHandler#doExecuteprotected int doExecute(LogicalInsert insert, ExecutionContext executionContext,
                        LogicalInsert.HandlerParams handlerParams) {
    // ...

    // 判断是否应用 RETURNING 优化
    boolean canUseReturning =
        executorContext.getStorageInfoManager().supportsReturning() && executionContext.getParamManager()
            .getBoolean(ConnectionParams.DML_USE_RETURNING) && allDnUseXDataSource && gsiCanUseReturning
            && !isBroadcast && !ComplexTaskPlanUtils.canWrite(tableMeta);

    if (canUseReturning) {canUseReturning = noDuplicateValues(insertIgnore, insertEc);
    }

    if (canUseReturning) {
        // 执行 INSERT IGNORE 并取得返回后果
        final List<RelNode> allPhyPlan =
            new ArrayList<>(replaceSeqAndBuildPhyPlan(insertIgnore, insertEc, handlerParams));
        getPhysicalPlanForGsi(insertIgnore.getGsiInsertIgnoreWriters(), insertEc, allPhyPlan);
        final Map<String, List<List<Object>>> tableInsertedValues =
            executeAndGetReturning(executionContext, allPhyPlan, insertIgnore, insertEc, memoryAllocator,
                selectRowType);

        // ...

        // 生成 DELETE
        final boolean removeAllInserted =
            targetTableNames.stream().anyMatch(tn -> !tableInsertedValues.containsKey(tn));

        if (removeAllInserted) {
            affectedRows -=
                removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableInsertedValues);
            if (returnIgnored) {ignoredRows = totalRows;}
        } else {final List<Integer> beforePkMapping = insertIgnore.getBeforePkMapping();
            final List<ColumnMeta> pkColumnMetas = insertIgnore.getPkColumnMetas();

            // 计算所有插入值的交加
            final Map<String, List<List<Object>>> tableDeletePks =
                getRowsToBeRemoved(tableName, tableInsertedValues, beforePkMapping, pkColumnMetas);

            affectedRows -=
                removeInserted(insertIgnore, schemaName, tableName, isBroadcast, insertEc, tableDeletePks);
            if (returnIgnored) {
                ignoredRows +=
                    Optional.ofNullable(tableDeletePks.get(insertIgnore.getLogicalTableName())).map(List::size)
                        .orElse(0);
            }

        }

        handlerParams.optimizedWithReturning = true;

        if (returnIgnored) {return ignoredRows;} else {return affectedRows;}
    } else {handlerParams.optimizedWithReturning = false;}

    // ... 
} 最初以一个例子来展示 RETURNING 优化的执行流程与逻辑执行的不同。通过 /+TDDL:CMD_EXTRA(DML_USE_RETURNING=TRUE)/ 这条 HINT,用户能够手动管制是否应用 RETURNING 优化。首先建表并插入一条数据:CREATE TABLE `t` (`id` int(11) NOT NULL,
`a` int(11) NOT NULL,
`b` int(11) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE GLOBAL KEY `g_i_a` (`a`) COVERING (`id`) DBPARTITION BY HASH(`a`)

) DBPARTITION BY HASH(id);

INSERT INTO t VALUES (1,3,3); 再执行一条 INSERT IGNORE:INSERT IGNORE INTO t VALUES (1,2,3),(2,3,4),(3,4,5); 其中 (1,2,3) 与 (1,3,3) 主键抵触,(2,3,4) 与 (1,3,3) 对于 Unique Key g_i_a 抵触。如果是 RETURNING 优化:

能够看到 PolarDB-X 先进行了 INSERT IGNORE,再将多插入的数据删除:(1,2,3) 在主表上抵触在 UGSI 上胜利插入,(2,3,4) 在 UGSI 上抵触在主表上胜利插入,因而别离下发对应的 DELETE 到 UGSI 和主表上。如果敞开 RETURNING 优化,逻辑执行:

能够看到 PolarDB-X 先进行了 SELECT,再将没有抵触的数据 (3,4,5) 插入。小结本文介绍了 PolarDB-X 中 INSERT IGNORE 的执行流程。除了 INSERT IGNORE 之外,还有一些 DML 语句在执行时也须要进行反复值的判断,比方 REPLACE、INSERT ON DUPLICATE KEY UPDATE 等,这些语句在有 GSI 的状况下均采纳了逻辑执行的形式,即先进行 SELECT 再进行判重、更新等操作,感兴趣的读者能够自行浏览相干代码。欢送关注 PolarDB- X 知乎机构号,浏览更多技术好文。原文链接:https://click.aliyun.com/m/10… 本文为阿里云原创内容,未经容许不得转载。

正文完
 0