乐趣区

关于Flink:CalciteApache-Calcite-校验流程源码解读

1. 外围构造与概念

Calcite 提供的 Validator 流程极为简单,但概括下来次要做了这么一件事,对每个 SqlNode 联合元数据校验其语义是否正确,这些语义包含:

  • 验证表名是否存在;
  • select 的列在对应表中是否存在,且该匹配到的列名是否惟一,比方 join 多表,两个表有雷同名字的字段,如果此时 select 的列不指定表名就会报错;
  • 如果是 insert,须要插入列和数据源进行校验,如列数、类型、权限等;
  • ……

Calcite 提供的 validator 和后面提到的 Catalog 关系严密,Calcite 定义了一个 CatalogReader 用于在校验过程中拜访元数据 (Table schema),并对元数据做了运行时的一些封装,最外围的两局部是 SqlValidatorNamespace 和 SqlValidatorScope。

  • SqlValidatorNamespace:形容了 SQL 查问返回的关系,一个 SQL 查问能够拆分为多个局部,查问的列组合,表名等等,当中每个局部都有一个对应的 SqlValidatorNamespace。
  • SqlValidatorScope:能够认为是校验流程中每个 SqlNode 的工作上下文,当校验表达式时,通过 SqlValidatorScope 的 resolve 办法进行解析,如果胜利的话会返回对应的 SqlValidatorNamespace 形容后果类型。

在此基础上,Calcite 提供了 SqlValidator 接口,该接口提供了所有与校验相干的外围逻辑,并提供了内置的默认实现类 SqlValidatorImpl 定义如下:

public class SqlValidatorImpl implements SqlValidatorWithHints {
    // ...
  
  final SqlValidatorCatalogReader catalogReader;
  
  /**
   * Maps {@link SqlNode query node} objects to the {@link SqlValidatorScope}
   * scope created from them.
   */
  protected final Map<SqlNode, SqlValidatorScope> scopes =
      new IdentityHashMap<>();

  /**
   * Maps a {@link SqlSelect} node to the scope used by its WHERE and HAVING
   * clauses.
   */
  private final Map<SqlSelect, SqlValidatorScope> whereScopes =
      new IdentityHashMap<>();

  /**
   * Maps a {@link SqlSelect} node to the scope used by its GROUP BY clause.
   */
  private final Map<SqlSelect, SqlValidatorScope> groupByScopes =
      new IdentityHashMap<>();

  /**
   * Maps a {@link SqlSelect} node to the scope used by its SELECT and HAVING
   * clauses.
   */
  private final Map<SqlSelect, SqlValidatorScope> selectScopes =
      new IdentityHashMap<>();

  /**
   * Maps a {@link SqlSelect} node to the scope used by its ORDER BY clause.
   */
  private final Map<SqlSelect, SqlValidatorScope> orderScopes =
      new IdentityHashMap<>();

  /**
   * Maps a {@link SqlSelect} node that is the argument to a CURSOR
   * constructor to the scope of the result of that select node
   */
  private final Map<SqlSelect, SqlValidatorScope> cursorScopes =
      new IdentityHashMap<>();

  /**
   * The name-resolution scope of a LATERAL TABLE clause.
   */
  private TableScope tableScope = null;

  /**
   * Maps a {@link SqlNode node} to the
   * {@link SqlValidatorNamespace namespace} which describes what columns they
   * contain.
   */
  protected final Map<SqlNode, SqlValidatorNamespace> namespaces =
      new IdentityHashMap<>();
  
  // ...
}

能够看到 SqlValidatorImpl 当中有许多 scopes 映射 (SqlNode -> SqlValidatorScope) 和 namespaces (SqlNode -> SqlValidatorNamespace),校验其实就是在一个个 SqlValidatorScope 中校验 SqlValidatorNamespace 的过程,另外 SqlValidatorImpl 有一个成员 catalogReader,也就是下面说到的 SqlValidatorCatalogReader,为 SqlValidatorImpl 提供了拜访元数据的入口。(注:为了简便,后续本文均以 scope 指代 SqlValidatorScope,应用 namespace 指代 SqlValidatorNamespace)。

2. 入口函数

在第 3 章对 SqlNode 的介绍中,咱们意识到 SqlNode 是一个嵌套的树结构,因而很天然的咱们能够想到用解决树数据结构的一些思路或算法来解决整个 SqlNode 树,Calcite 正是基于这个思路,通过递归的形式遍历到每一个 SqlNode,并对每一个 SqlNode 都校验元数据。

校验流程极为繁琐,为了可能聚焦最外围的逻辑,咱们上面代码块中的 SQL 为例 该 SQL 语法简略但同样是一个残缺的 ETL 流程,且该 SQL 笼罩了 INSERT 和 SELECT 这两个最外围罕用的 DML 语句

INSERT INTO sink_table SELECT id FROM source_table WHERE id > -1

SQL 校验的整体入口是 SqlValidatorImpl 的 validate(SqlNode topNode) 办法。

public SqlNode validate(SqlNode topNode) {SqlValidatorScope scope = new EmptyScope(this);
  scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
  final SqlNode topNode2 = validateScopedExpression(topNode, scope);
  final RelDataType type = getValidatedNodeType(topNode2);
  Util.discard(type);
  return topNode2;
}

首先,SqlValidatorImpl 会创立 CatalogScope 作为最外层的工作上下文,用于后续校验,这个 scope 也是前面一些 namespace 的 parentScope,创立 scope 结束后,Calcite 校验进入到 validateScopedExpression。(EmptyScope 的存在是为了更不便解决判空问题,并提供了一些外围的解析逻辑,它就相似一个 root scope

private SqlNode validateScopedExpression(
    SqlNode topNode,
    SqlValidatorScope scope) {
    // 1. 标准 SqlNode
  SqlNode outermostNode = performUnconditionalRewrites(topNode, false);

    // ...

    // 2. 注册 namespace 和 scope 信息
  if (outermostNode.isA(SqlKind.TOP_LEVEL)) {registerQuery(scope, null, outermostNode, outermostNode, null, false);
  }

    // 3. 进行校验
  outermostNode.validate(this, scope);
  
    // ...
  return outermostNode;
}

第一步的 performUnconditionalRewrites 是为了 对咱们的 SqlNode 进行标准,以简化后续校验解决,标准的内容包含不限于以下几点:

  • 如果一个 SELECT 子句带有 ORDER BY 关键字,SQL Parser 会将整个 SELECT 子句解析为 SqlOrderBy,在这一步会将 SqlOrderBy 转成 SqlSelect;
  • 给 SqlDelete 和 SqlUpdate 设置 sourceSelect(这是一个 SqlSelect),后续对这两类进行校验的时候就会对它们的 sourceSelect 进行校验(即 validateSelect);
  • ……

第二步的 registerQuery,会创立该 SqlNode 对应的 namespace 和 scope,以及将 namespace 注入到对应的 scope 中,以本例 SQL 进行调试可失去信息如下:

  1. 第一局部是 scopes 映射,这里蕴含了 SQL 文本每局部对应的名称解析空间,个别以 SELECT 为一个残缺的空间,像 where、groupBy 等子句的 scope 也是其所属的 SELECT 子句的 SelectScope;
  2. 第二局部是对于 namespace 注入到 scope,SelectScope 继承了 ListScope,有一个成员变量 children,这外面会存 SELECT 的数据源对应的 namespace,如该例存储的是 source_table 对应的 IdentifierNamespace;
  3. 最初一部分是 namespaces 映射,INSERT、SELECT 或某个具体表和视图,都会有绝对应的 namespace,用来示意他们执行后果的数据关系。

第三步校验调用 SqlNode 的 validate 办法,以后面的例子会走 SqlInsert 的 validate,而后调用 SqlValidatorImpl 的 validateInsert。

public void validateInsert(SqlInsert insert) {
    // 1. 校验 namespace
  final SqlValidatorNamespace targetNamespace = getNamespace(insert);
  validateNamespace(targetNamespace, unknownType);

    // ...
    // 计算 / 校验 insert 插入列,insert 语句插入有两种模式
    // `insert sink_table values(...):不指定插入列,默认为全部列
    // `insert sink_table(idx):指定插入列
    final RelDataType targetRowType = createTargetRowType(table, insert.getTargetColumnList(), false);

    // 2. 校验 source
    
  final SqlNode source = insert.getSource();
  if (source instanceof SqlSelect) {final SqlSelect sqlSelect = (SqlSelect) source;
    validateSelect(sqlSelect, targetRowType);
  } else {final SqlValidatorScope scope = scopes.get(source);
    validateQuery(source, scope, targetRowType);
  }

    // ...
    // 3. 校验 source 和 sink 是否兼容
  checkFieldCount(insert.getTargetTable(), table, source,
      logicalSourceRowType, logicalTargetRowType);

  checkTypeAssignment(logicalSourceRowType, logicalTargetRowType, insert);

  checkConstraint(table, source, logicalTargetRowType);

  validateAccess(insert.getTargetTable(), table, SqlAccessEnum.INSERT);
}

validateInsert 的外围逻辑有三块:

  1. 校验 namespace;
  2. 校验 SqlInsert 的 source,在该例里 source 是一个 SqlSelect,因而会走到 validateSelect;
  3. 校验 source(数据起源)和 target(指标表)是否兼容

3. 校验 namespace

Calcite 在校验 namespace 的元数据时采纳了 模板办法 设计模式,在 AbstractNamespace 的 validate 办法中定义了主校验流程,真正的校验逻辑(validateImpl)则交给每个具体的 namespace 各自实现。

AbstarctNamespace 中有一个成员变量 rowType,校验 namespace 其实就是解析失去 rowType 的值赋给对应的 namespace。

3.1 SqlValidatorImpl.validateNamespace

校验 namespace 的入口是 validateNamespace,做的是如下代码显示,即校验 namespace,并建设 SqlNode → RelDataType 的映射关系放到 nodeToTypeMap 中。。

protected void validateNamespace(final SqlValidatorNamespace namespace,
      RelDataType targetRowType) {
  // 1. 模板办法校验 namespace
  namespace.validate(targetRowType);
  if (namespace.getNode() != null) {
    // 2. 建设 SqlNode -> RelDataType 的映射关系
    setValidatedNodeType(namespace.getNode(), namespace.getType());
  }
}

3.2 AbstractNamespace.validate

namespace 的校验主流程由 validate 办法定义

public final void validate(RelDataType targetRowType) {switch (status) {
  // 1. 第一次进入该办法时,status 都是 UNVALIDATED
  case UNVALIDATED:
    try {
      // 2. 标记 status 为正在解决,防止反复解决
      status = SqlValidatorImpl.Status.IN_PROGRESS;
      Preconditions.checkArgument(rowType == null,
          "Namespace.rowType must be null before validate has been called");
      // 3. 调用各自实现的 validateImpl
      RelDataType type = validateImpl(targetRowType);
      Preconditions.checkArgument(type != null,
          "validateImpl() returned null");
      // 4. 记录解析失去的后果类型
      setType(type);
    } finally {
      // 5. 标记 status 已实现
      status = SqlValidatorImpl.Status.VALID;
    }
    break;
  case IN_PROGRESS:
    throw new AssertionError("Cycle detected during type-checking");
  case VALID:
    break;
  default:
    throw Util.unexpected(status);
  }
}

除去 status 的标记更新,validate 理论的步骤也是两步:

  1. 调用各自 namespace 实现的 validateImpl 办法获取对应的 type(RelDataType);
  2. 将解析失去的 type 赋值给 namespace 的 rowType。

以上述例子为例,SqlInsert 会首先开始校验其对应的 InsertNamespace,InsertNamespace 没有实现本人的 validateImpl 办法,但它继承了 IdentifierNamespace,会间接调用 IdentifierNamespace 的 validateImpl。因而 InsertNamespace 的校验即是解析 target table(这是一个 Identifier)的 rowType。

3.3 validateImpl(IdentifierNamespace)

IdentifierNamespace 有个成员 resolvedNamespace(也是一个 SqlValidatorNamespace),该 IdentifierNamespace 对应的 SqlNode 指向一个表时,resolvedNamespace 就是一个 TableNamespace,存有真正的类型信息。

public RelDataType validateImpl(RelDataType targetRowType) {
  // 1. 解析该 identifier 对应的 namespace,通常为 TableNamespace
    resolvedNamespace = Objects.requireNonNull(resolveImpl(id));

    // ...

    // 2. 获取 rowType,第一次执行时须要计算
    RelDataType rowType = resolvedNamespace.getRowType();
    
    // ...

    return rowType;
}

IdentifierNamespace 会先调用 resolveImpl,拿到对应的 TableNamespace,再调用 resolvedNamespace.getRowType() 失去 rowType。

3.3.1 IdentifierNamespace.resolveImpl

在 resolveImpl 中,IdentifierNamespace 会创立一个 SqlValidatorScope.ResolvedImpl 用于寄存解析失去的 TableNamespace。

private SqlValidatorNamespace resolveImpl(SqlIdentifier id) {
    // ...

    parentScope.resolveTable(names, nameMatcher,
        SqlValidatorScope.Path.EMPTY, resolved);

    // ...

    return resolve.namespace;
}

这里的 parentScope 其实就是最开始创立的 CatalogScope,简直是一个空实现,通过层层调用最初会调到 EmptyScope 的 resolve_ 办法。

3.3.2 EmptyScope.resolve_

在这里会调用 parentScope 中的 CalciteSchema(parentScope 中存有 SqlValidatorImpl,SqlValidatorImpl 中有 CalciteCatalogReader,CalciteCatalogReader 中有 CalciteSchema)的 getTable 拿到 TableEntryImpl 对象(定义如下),

public static class TableEntryImpl extends TableEntry {
  private final Table table;

  // ...

  public Table getTable() {return table;}
}

整个调用链如图所示:

拿到 TableEntryImpl 对象,通过 TableEntryImpl.getTable() 就能够拿到咱们注册进去的 Table 数据。

public static class TableEntryImpl extends TableEntry {
  private final Table table;

  // ...

  public Table getTable() {return table;}
}

在拿到 Table 后,会尝试将其转为一个 SqlValidatorTable(理论类型为 RelOptTableImpl),并注册到 TableNamespace 中。

最重要的是第 3 步,这里走了 Table 接口的 getRowType() 办法获取 rowType,如果咱们有自定义的表实现了 Table 接口,也能够通过重写 getRowType() 自定义返回的行类型。拿到 rowType 后,将 rowType 赋给 table2,并基于 table2 创立了 TableNamespace。

直到此时 resolveImpl 执行结束,咱们得拿到了 TableNamespace,它保留有一个 table 变量,和继承自 AbstractNamespace 的类型为 RelDataType 的 rowType。rowType 记录这个 namespace 对应的行数据类型,而此时 TableNamespace 刚创立,rowType 为 null。

须要留神的是,此时 rowType 仅赋值给了 TableNamespace 中的 table,TableNamespace 保留的 rowType 仍是 null(这可能是一个优化点,能够缩小后续额定对 TableNamespace 做校验的步骤,但也可能是思考到 TableNamespace 刚创立,status 仍为 UNVALIDATED)

3.4 计算失去 rowType

因而当 IdentifierNamesapce 的 validateNamespace 函数进行到第二步:RelDataType rowType = resolvedNamespace.getRowType();,因为 rowType 为空,又会触发 TableNamespace 的校验。

public RelDataType getRowType() {if (rowType == null) {validator.validateNamespace(this, validator.unknownType);
    Preconditions.checkArgument(rowType != null, "validate must set rowType");
  }
  return rowType;
}

校验 TableNamespace 完结后回到 AbstractNamespace.validate(此时该 AbstractNamespace 的实在类型为 IdentifierNamespace),咱们拿到了 IdentifierNamespace 对应的行类型信息(即 resolvedNamespace 的 rowType),通过 setType 注入到 IdentifierNamespace 中,此时就实现了 IdentifierNamespace 的校验。

3.4.1 validateImpl(TableNamespace)

TableNamespace 的 validateImpl 实现如下:

protected RelDataType validateImpl(RelDataType targetRowType) {if (extendedFields.isEmpty()) {return table.getRowType();
  }
  final RelDataTypeFactory.Builder builder =
      validator.getTypeFactory().builder();
  builder.addAll(table.getRowType().getFieldList());
  builder.addAll(extendedFields);
  return builder.build();}

前文提到,通过 EmptyScope.resolve_ 办法,TableNamespace 中绑定的 table 曾经注入了 rowType 信息,因而这里能够间接获取到并返回。

回到 AbstractNamespace.validate(此时该 AbstractNamespace 的实在类型为 TableNamespace),咱们拿到了 TableNamespace 对应的行类型信息(即它的 TableNamespace 的 rowType),通过 setType 注入到 TableNamespace 中,实现 TableNamespace 的校验。

3.5 总结

下图是 IdentifierNamespace 校验的残缺流程。

4. 校验 source

校验 source 就是一个校验 select 的过程,连续下面的例子,校验的 SqlNode(理论类型为 SqlSelect)对应的语句为 select id as idx from source_table where id > -1,SqlSelect 有本人绑定的 SelectNamespace,当然也有校验 namespace 的局部(给绑定的 SelectNamespace 注入 rowType 属性)。

校验 select 的入口函数为 validateSelect,下图是整个 validateSelect 的工作流程。

能够看到 validateSelect 对 select 语句的每个组成部分都做了 validate,当中 最重要的是 validateSelectList,这个办法校验 select 的列是否存在(能从数据源表中检索到),以及为这部分查出来的列建设对应的类型信息(RelDataType),也即 SelectNamespace 的 rowType

举个例子,针对 SELECT id FROM source_table WHERE id > -1,validateSelectList 则须要校验 id 这个列是否在 source_table 中存在。

5. 校验 source 和 target 是否兼容

这部分校验具体查看四块内容:

  1. checkFieldCount:查看插入数据列的个数是否非法,查看非空;
  2. checkTypeAssignment:查看起源列和插入表对应列的类型是否兼容;
  3. checkConstraint:束缚查看;
  4. validateAccess:权限校验,默认状况下都能通过。

6. 参考资料

  1. Apache Calcite 教程 -validate 校验:https://blog.csdn.net/QXC1281…
退出移动版