关于后端:10分钟教你写一个数据库

明天教大家借助一款框架疾速实现一个数据库,这个框架就是Calcite,上面会带大家通过两个例子疾速教会大家怎么实现,一个是能够通过 SQL 语句的形式能够间接查问文件内容,第二个是模仿 Mysql 查问性能,以及最初通知大家怎么实现 SQL 查问 Kafka 数据。

Calcite

Calcite 是一个用于优化异构数据源的查询处理的可插拔根底框架(他是一个框架),能够将任意数据(Any data, Anywhere)DML 转换成基于 SQL 的 DML 引擎,并且咱们能够选择性的应用它的局部性能。

Calcite能干什么

  1. 应用 SQL 拜访内存中某个数据
  2. 应用 SQL 拜访某个文件的数据
  3. 跨数据源的数据拜访、聚合、排序等(例如 Mysql 和 Redis 数据源中的数据进行join)

当咱们须要自建一个数据库的时候,数据能够为任何格局的,比方text、word、xml、mysql、es、csv、第三方接口数据等等,咱们只有数据,咱们想让这些数据反对 SQL 模式动静增删改查。

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等我的项目中,数据处理系统都是应用 Calcite 来做 SQL 解析和查问优化,当然,还有局部用来构建本人的 JDBC driver。

名词解释

Token

就是将规范 SQL(能够了解为Mysql)关键词以及关键词之间的字符串截取进去,每一个token,会被封装为一个SqlNodeSqlNode会衍生很多子类,比方Select会被封装为SqlSelect,以后 SqlNode 也能反解析为 SQL 文本。

RelDataTypeField

某个字段的名称和类型信息

RelDataType

多个 RelDataTypeField 组成了 RelDataType,能够了解为数据行

Table

一个残缺的表的信息

Schema

所有元数据的组合,能够了解为一组 Table 或者库的概念

开始应用

1. 引入包

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <!-- 目前最新版本 2022-09-10日更新-->
    <version>1.32.0</version>
</dependency>

2. 创立model.json文件和表构造csv

model.json 外面次要形容或者说通知 Calcite 如何创立 Schema,也就是通知框架怎么创立出库。

{
"version": "1.0",//疏忽
"defaultSchema": "CSV",//设置默认的schema
"schemas": [//可定义多个schema
        {
          "name": "CSV",//相当于namespace和下面的defaultSchema的值对应
          "type": "custom",//写死
          "factory": "csv.CsvSchemaFactory",//factory的类名必须是你本人实现的factory的包的全门路
          "operand": { //这里能够传递自定义参数,最终会以map的模式传递给factory的operand参数
          "directory": "csv"//directory代表calcite会在resources上面的csv目录上面读取所有的csv文件,factory创立的Schema会吧这些文件全副构建成Table,能够了解为读取数据文件的根目录,当然key的名称也不肯定非得用directory,你能够随便指定
                }
        }
      ]
}

接下来还须要定义一个 csv 文件,用来定义表构造。

NAME:string,MONEY:string
aixiaoxian,10000万
xiaobai,10000万
adong,10000万
maomao,10000万
xixi,10000万
zizi,10000万
wuwu,10000万
kuku,10000万

整个我的项目的构造大略就是这样:

3. 实现Schema的工厂类

在上述文件中指定的包门路上来编写 CsvSchemaFactory 类,实现 SchemaFactory 接口,并且实现外面惟一的办法 create 办法,创立Schema(库)。

public class CsvSchemaFactory implements SchemaFactory {
    /**
     * parentSchema 父节点,个别为root
     * name 为model.json中定义的名字
     * operand 为model.json中定于的数据,这里能够传递自定义参数
     *
     * @param parentSchema Parent schema
     * @param name         Name of this schema
     * @param operand      The "operand" JSON property
     * @return
     */
    @Override
    public Schema create(SchemaPlus parentSchema, String name,
                         Map<String, Object> operand) {
        final String directory = (String) operand.get("directory");
        File directoryFile = new File(directory);
        return new CsvSchema(directoryFile, "scannable");
    }
}

4. 自定义Schma类

有了 SchemaFactory,接下来须要自定义 Schema 类。

自定义的 Schema 须要实现 Schema 接口,然而间接实现要实现的办法太多,咱们去实现官网的 AbstractSchema 类,这样就只须要实现一个办法就行(如果有其余定制化需要能够实现原生接口)。

外围的逻辑就是createTableMap办法,用于创立出 Table 表。

他会扫描指定的Resource上面的所有 csv 文件,将每个文件映射成Table对象,最终以map模式返回,Schema接口的其余几个办法会用到这个对象。

        //实现这一个办法就行了
    @Override
    protected Map<String, Table> getTableMap() {
        if (tableMap == null) {
            tableMap = createTableMap();
        }
        return tableMap;
    }
        private Map<String, Table> createTableMap() {
        // Look for files in the directory ending in ".csv"
        final Source baseSource = Sources.of(directoryFile);
        //会主动过滤掉非指定文件后缀的文件,我这里写的csv
        File[] files = directoryFile.listFiles((dir, name) -> {
            final String nameSansGz = trim(name, ".gz");
            return nameSansGz.endsWith(".csv");
        });
        if (files == null) {
            System.out.println("directory " + directoryFile + " not found");
            files = new File[0];
        }
        // Build a map from table name to table; each file becomes a table.
        final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
        for (File file : files) {
            Source source = Sources.of(file);
            final Source sourceSansCsv = source.trimOrNull(".csv");
            if (sourceSansCsv != null) {
                final Table table = createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
            }
        }
        return builder.build();
    }

5. 自定义 Table

Schema 有了,并且数据文件 csv 也映射成 Table 了,一个 csv 文件对应一个 Table

接下来咱们去自定义 Table,自定义 Table 的外围是咱们要定义字段的类型和名称,以及如何读取 csv文件。

  1. 先获取数据类型和名称,即单表构造,从csv文件头中获取(以后文件头须要咱们本人定义,包含规定咱们也能够定制化)。
/**
 * Base class for table that reads CSV files.
 */
public abstract class CsvTable extends AbstractTable {
    protected final Source source;
    protected final @Nullable RelProtoDataType protoRowType;
    private @Nullable RelDataType rowType;
    private @Nullable List<RelDataType> fieldTypes;
​
    /**
     * Creates a CsvTable.
     */
    CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
        this.source = source;
        this.protoRowType = protoRowType;
    }
        /**
        * 创立一个CsvTable,继承AbstractTable,须要实现外面的getRowType办法,此办法就是获取以后的表构造。
            Table的类型有很多种,比方还有视图类型,AbstractTable类中帮咱们默认实现了Table接口的一些办法,比方getJdbcTableType            办法,默认为Table类型,如果有其余定制化需要可间接实现Table接口。
            和AbstractSchema很像
        */
    @Override
    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        if (protoRowType != null) {
            return protoRowType.apply(typeFactory);
        }
        if (rowType == null) {
            rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    null);
        }
        return rowType;
    }
​
    /**
     * Returns the field types of this CSV table.
     */
    public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) {
        if (fieldTypes == null) {
            fieldTypes = new ArrayList<>();
            CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    fieldTypes);
        }
        return fieldTypes;
    }
  
   public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
                                            Source source, @Nullable List<RelDataType> fieldTypes) {
        final List<RelDataType> types = new ArrayList<>();
        final List<String> names = new ArrayList<>();
        try (CSVReader reader = openCsv(source)) {
            String[] strings = reader.readNext();
            if (strings == null) {
                strings = new String[]{"EmptyFileHasNoColumns:boolean"};
            }
            for (String string : strings) {
                final String name;
                final RelDataType fieldType;
                //就是简略的读取字符串冒号后面是名称,冒号前面是类型
                final int colon = string.indexOf(':');
                if (colon >= 0) {
                    name = string.substring(0, colon);
                    String typeString = string.substring(colon + 1);
                    Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
                    if (decimalMatcher.matches()) {
                        int precision = Integer.parseInt(decimalMatcher.group(1));
                        int scale = Integer.parseInt(decimalMatcher.group(2));
                        fieldType = parseDecimalSqlType(typeFactory, precision, scale);
                    } else {
                        switch (typeString) {
                            case "string":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                            case "boolean":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
                                break;
                            case "byte":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
                                break;
                            case "char":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
                                break;
                            case "short":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
                                break;
                            case "int":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
                                break;
                            case "long":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
                                break;
                            case "float":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
                                break;
                            case "double":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
                                break;
                            case "date":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
                                break;
                            case "timestamp":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
                                break;
                            case "time":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
                                break;
                            default:
                                LOGGER.warn(
                                        "Found unknown type: {} in file: {} for column: {}. Will assume the type of "
                                                + "column is string.",
                                        typeString, source.path(), name);
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                        }
                    }
                } else {
                    //  如果没定义,默认都是String类型,字段名称也是string
                    name = string;
                    fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                }
                names.add(name);
                types.add(fieldType);
                if (fieldTypes != null) {
                    fieldTypes.add(fieldType);
                }
            }
        } catch (IOException e) {
            // ignore
        }
        if (names.isEmpty()) {
            names.add("line");
            types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}
  1. 获取文件中的数据,下面把Table的表构造字段名称和类型都获取到了当前,就剩最初一步了,获取文件中的数据。咱们须要自定义一个类,实现 ScannableTable 接口,并且实现外面惟一的办法 scan 办法,其实实质上就是读文件,而后把文件的每一行的数据和上述获取的 fileType 进行匹配。
@Override
    public Enumerable<Object[]> scan(DataContext root) {
        JavaTypeFactory typeFactory = root.getTypeFactory();
        final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
        final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
        final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
        return new AbstractEnumerable<@Nullable Object[]>() {
            @Override
            public Enumerator<@Nullable Object[]> enumerator() {
                //返回咱们自定义的读取数据的类
                return new CsvEnumerator<>(source, cancelFlag, false, null,
                        CsvEnumerator.arrayConverter(fieldTypes, fields, false));
            }
        };
    }
 
 
 public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
                         @Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
        this.cancelFlag = cancelFlag;
        this.rowConverter = rowConverter;
        this.filterValues = filterValues == null ? null
                : ImmutableNullableList.copyOf(filterValues);
        try {
 
            this.reader = openCsv(source);
            //跳过第一行,因为第一行是定义类型和名称的
            this.reader.readNext(); // skip header row
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
//CsvEnumerator必须实现calcit本人的迭代器,外面有current、moveNext办法,current是返回以后游标所在的数据记录,moveNext是将游标指向下一个记录,官网中本人定义了一个类型转换器,是将csv文件中的数据转换成文件头指定的类型,这个须要咱们本人来实现
     @Override
    public E current() {
        return castNonNull(current);
    }
 
    @Override
    public boolean moveNext() {
        try {
            outer:
            for (; ; ) {
                if (cancelFlag.get()) {
                    return false;
                }
                final String[] strings = reader.readNext();
                if (strings == null) {
                    current = null;
                    reader.close();
                    return false;
                }
                if (filterValues != null) {
                    for (int i = 0; i < strings.length; i++) {
                        String filterValue = filterValues.get(i);
                        if (filterValue != null) {
                            if (!filterValue.equals(strings[i])) {
                                continue outer;
                            }
                        }
                    }
                }
                current = rowConverter.convertRow(strings);
                return true;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
 
        protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
            if (fieldType == null || string == null) {
                return string;
            }
            switch (fieldType.getSqlTypeName()) {
                case BOOLEAN:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Boolean.parseBoolean(string);
                case TINYINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Byte.parseByte(string);
                case SMALLINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Short.parseShort(string);
                case INTEGER:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Integer.parseInt(string);
                case BIGINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Long.parseLong(string);
                case FLOAT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Float.parseFloat(string);
                case DOUBLE:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Double.parseDouble(string);
                case DECIMAL:
                    if (string.length() == 0) {
                        return null;
                    }
                    return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
                case DATE:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_DATE.parse(string);
                        return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
                    } catch (ParseException e) {
                        return null;
                    }
                case TIME:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIME.parse(string);
                        return (int) date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case TIMESTAMP:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIMESTAMP.parse(string);
                        return date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case VARCHAR:
                default:
                    return string;
            }
        }

6. 最初

至此咱们须要筹备的货色:库、表名称、字段名称、字段类型都有了,接下来咱们去写咱们的 SQL 语句查问咱们的数据文件。

创立好几个测试的数据文件,例如下面我的项目构造中我创立 2 个 csv 文件USERINFO.csvASSET.csv,而后创立测试类。

这样跑起来,就能够通过 SQL 语句的形式间接查问数据了。

public class Test {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo "));
 
            print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
        } finally {
            connection.close();
        }
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查问后果:

这里在测试的时候踩到2个坑,大家如果本人试验的时候能够防止下。

  1. Calcite 默认会把你的 SQL 语句中的表名和类名全副转换为大写,因为默认的 csv(其余文件也一样)文件的名称就是表名,除非你自定义规定,所以你的文件名要写成大写。
  2. Calcite 有一些默认的关键字不能用作表名,不然会查问失败,比方我刚开始定的user.csv就始终查不进去,改成USERINFO就能够了,这点和Mysql 的内置关键字差不多,也能够通过个性化配置去改。

演示Mysql

  1. 首先,还是先筹备 Calcite 须要的货色:库、表名称、字段名称、字段类型。

如果数据源应用Mysql的话,这些都不必咱们去 JAVA 服务中去定义,间接在 Mysql 客户端创立好,这里间接创立两张表用于测试,就和咱们的csv文件一样。

CREATE TABLE `USERINFO1` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `AGE` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE TABLE `ASSET` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
  1. 上述 csv 案例中的 SchemaFactory 以及 Schema 这些都不须要创立,因为 Calcite 默认提供了 Mysql 的 Adapter适配器。
  2. 其实,上述两步都不须要做,咱们真正要做的是,通知 Calcite 你的 JDBC 的连贯信息就行了,也是在 model.json 文件中定义。
{
  "version": "1.0",
  "defaultSchema": "Demo",
  "schemas": [
    {
      "name": "Demo",
      "type": "custom",
    //  这里是calcite默认的SchemaFactory,外面的流程和咱们上述本人定义的雷同,上面会简略看看源码。
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        //  我用的是mysql8以上版本,所以这里留神包的名称
        "jdbcDriver": "com.mysql.cj.jdbc.Driver",
        "jdbcUrl": "jdbc:mysql://localhost:3306/irving",
        "jdbcUser": "root",
        "jdbcPassword": "123456"
      }
    }
  ]
}
  1. 在我的项目中引入 Mysql 的驱动包
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.30</version>
</dependency>
  1. 写好测试类,这样间接就相当于实现了所有的性能了。
public class TestMysql {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            statement.executeUpdate(" insert into  userinfo1 values ('xxx',12) ");
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo1 "));
 
            print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo1 where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
        } finally {
            connection.close();
        }
 
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查问后果:

Mysql实现原理

上述咱们在 model.json 文件中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory类,能够看下这个类的代码。

这个类是把 FactorySchema 写在了一起,其实就是调用schemafactory类的create办法创立一个 schema 进去,和咱们下面自定义的流程是一样的。

其中JdbcSchema类也是 Schema 的子类,所以也会实现getTable办法(这个咱们上述也实现了,咱们过后是获取表构造和表的字段类型以及名称,是从csv文件头中读文件的),JdbcSchema的实现是通过连贯 Mysql 服务端查问元数据信息,再将这些信息封装成 Calcite须要的对象格局。

这里同样要留神 csv形式的2个留神点,大小写和关键字问题。

public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand) {
    DataSource dataSource;
    try {
      final String dataSourceName = (String) operand.get("dataSource");
      if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else {
        //会走在这里来,这里就是咱们在model.json中指定的jdbc的连贯信息,最终会创立一个datasource
        final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
        final String jdbcDriver = (String) operand.get("jdbcDriver");
        final String jdbcUser = (String) operand.get("jdbcUser");
        final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) {
      throw new RuntimeException("Error while reading dataSource", e);
    }
    String jdbcCatalog = (String) operand.get("jdbcCatalog");
    String jdbcSchema = (String) operand.get("jdbcSchema");
    String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
 
    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
      return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
      SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.class, sqlDialectFactory);
      return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
    }
  }
 
  @Override public @Nullable Table getTable(String name) {
    return getTableMap(false).get(name);
  }
 
  private synchronized ImmutableMap<String, JdbcTable> getTableMap(
      boolean force) {
    if (force || tableMap == null) {
      tableMap = computeTables();
    }
    return tableMap;
  }
 
  private ImmutableMap<String, JdbcTable> computeTables() {
    Connection connection = null;
    ResultSet resultSet = null;
    try {
      connection = dataSource.getConnection();
      final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
      final String catalog = catalogSchema.left;
      final String schema = catalogSchema.right;
      final Iterable<MetaImpl.MetaTable> tableDefs;
      Foo threadMetadata = THREAD_METADATA.get();
      if (threadMetadata != null) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else {
        final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
        //  获取元数据
        final DatabaseMetaData metaData = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, null, null);
        while (resultSet.next()) {
        //获取库名,表明等信息
          final String catalogName = resultSet.getString(1);
          final String schemaName = resultSet.getString(2);
          final String tableName = resultSet.getString(3);
          final String tableTypeName = resultSet.getString(4);
          tableDefList.add(
              new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }
        tableDefs = tableDefList;
      }
 
      final ImmutableMap.Builder<String, JdbcTable> builder =
          ImmutableMap.builder();
      for (MetaImpl.MetaTable tableDef : tableDefs) {
        final String tableTypeName2 =
            tableDef.tableType == null
            ? null
            : tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
        final TableType tableType =
            Util.enumVal(TableType.OTHER, tableTypeName2);
        if (tableType == TableType.OTHER  && tableTypeName2 != null) {
          System.out.println("Unknown table type: " + tableTypeName2);
        }
        //  最终封装成JdbcTable对象
        final JdbcTable table =
            new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
                tableDef.tableName, tableType);
        builder.put(tableDef.tableName, table);
      }
      return builder.build();
    } catch (SQLException e) {
      throw new RuntimeException(
          "Exception while reading tables", e);
    } finally {
      close(connection, null, resultSet);
    }
  }

SQL执行流程

OK,到这里基本上两个简略的案例曾经演示好了,最初补充一下整个Calcite架构和整个 SQL 的执行流程。

整个流程如下:SQL解析(Parser)=> SQL校验(Validator)=> SQL查问优化(optimizer)=> SQL生成 => SQL执行

SQL Parser

所有的 SQL 语句在执行前都须要经验 SQL 解析器解析,解析器的工作内容就是将 SQL 中的 Token 解析成形象语法树,每个树的节点都是一个 SqlNode,这个过程其实就是 Sql Text => SqlNode 的过程。

咱们后面的 Demo 没有自定义 Parser,是因为 Calcite 采纳了本人默认的 Parser(SqlParserImpl)。

SqlNode

SqlNode是整个解析中的外围,比方图中你能够发现,对于每个比方selectfromwhere关键字之后的内容其实都是一个SqlNode

parserConfig办法次要是设置 SqlParserFactory 的参数,比方咱们下面所说的我本地测试的时候踩的大小写的坑,就能够在这里设置。

间接调用setCaseSensitive=false即不会将 SQL 语句中的表名列名转为大写,上面是默认的,其余的参数能够按需配置。

SQL Validator

SQL 语句先通过 Parser,而后通过语法验证器,留神 Parser 并不会验证语法的正确性。

其实 Parser 只会验证 SQL 关键词的地位是否正确,咱们上述2个 Parser 的例子中都没有创立 schematable 这些,然而如果这样写,那就会报错,这个谬误就是 parser 检测后抛出来的(ParseLocationErrorTest)。

真正的校验在 validator 中,会去验证查问的表名是否存在,查问的字段是否存在,类型是否匹配,这个过程比较复杂,默认的 validatorSqlValidatorImpl

查问优化

比方关系代数,比方什么投影、笛卡尔积这些,Calcite提供了很多外部的优化器,也能够实现本人的优化器。

适配器

Calcite 是不蕴含存储层的,所以提供一种适配器的机制来拜访内部的数据存储或者存储引擎。

最初,进阶

官网外面写了将来会反对Kafka适配器到公共Api中,到时候应用起来就和上述集成Mysql一样不便,然而当初还没有反对,我这里给大家提供个本人实现的形式,这样就能够通过 SQL 的形式间接查问 Kafka 中的 Topic 数据等信息。

这里咱们外部集成实现了KSQL的能力,查问后果是OK的。

还是像上述步骤一样,咱们须要筹备库、表名称、字段名称、字段类型、数据源(多进去的中央)。

  1. 自定义Sql解析,之前咱们都没有自定义解析,这里须要自定义解析,是因为我须要动静解析sqlwhere条件外面的partation
  • 配置解析器,就是之前案例中提到的配置大小写之类的
  • 创立解析器,应用的默认SqlParseImpl
  • 开始解析,生成AST,咱们能够基于生成的SqlNode做一些业务相干的校验和参数解析
  1. 适配器获取数据源
   public class KafkaConsumerAdapter {
       public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) {
           Properties props = new Properties();
           props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
           List<TopicPartition> topics = new ArrayList<>();
           for (Integer partition : kafkaSql.getPartition()) {
               TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
               topics.add(tp);
           }
           consumer.assign(topics);
           for (TopicPartition tp : topics) {
               Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
               long position = 500;
               if (offsets.get(tp).longValue() > position) {
                   consumer.seek(tp, offsets.get(tp).longValue() - 500);
               } else {
                   consumer.seek(tp, 0);
               }
           }
           List<KafkaResult> results = new ArrayList<>();
           boolean flag = true;
           while (flag) {
               ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
               for (ConsumerRecord<String, String> record : records) {
                   //转成我定义的对象汇合
                   KafkaResult result = new KafkaResult();
                   result.setPartition(record.partition());
                   result.setOffset(record.offset());
                   result.setMsg(record.value());
                   result.setKey(record.key());
                   results.add(result);
               }
               if (!records.isEmpty()) {
                   flag = false;
               }
           }
           consumer.close();
           return results;
       }
   
   }
  1. 执行查问,就能够失去咱们想要的成果了。
   public class TestKafka {
       public static void main(String[] args) throws Exception {
           KafkaService kafkaService = new KafkaService();
           //把解析到的参数放在我本人定义的kafkaSqlInfo对象中
           KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
           //适配器获取数据源,次要是从上述的sqlInfo对象中去poll数据
           List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
           //执行查问
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
           sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%'  limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
   
           sqlInfo = kafkaService.parseSql("select count(*) AS addad  from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
       }
   
       private static void query(String tableName, List<KafkaResult> results,
                                 String sql) throws Exception {
           //创立model.json,设置我的SchemaFactory,设置库名
           String model = createTempJson();
           //设置我的表构造,表名称和表字段名以及类型
           KafkaTableSchema.generateSchema(tableName, results);
           Properties info = new Properties();
           info.setProperty("lex", Lex.JAVA.toString());
           Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
           Statement st = connection.createStatement();
           //执行
           ResultSet result = st.executeQuery(sql);
           ResultSetMetaData rsmd = result.getMetaData();
           List<Map<String, Object>> ret = new ArrayList<>();
           while (result.next()) {
               Map<String, Object> map = new LinkedHashMap<>();
               for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                   map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
               }
               ret.add(map);
           }
           result.close();
           st.close();
           connection.close();
       }
   
       private static void print(ResultSet resultSet) throws SQLException {
           final ResultSetMetaData metaData = resultSet.getMetaData();
           final int columnCount = metaData.getColumnCount();
           while (resultSet.next()) {
               for (int i = 1; ; i++) {
                   System.out.print(resultSet.getString(i));
                   if (i < columnCount) {
                       System.out.print(", ");
                   } else {
                       System.out.println();
                       break;
                   }
               }
           }
       }
   
       private static String createTempJson() throws IOException {
           JSONObject object = new JSONObject();
           object.put("version", "1.0");
           object.put("defaultSchema", "QAKAFKA");
           JSONArray array = new JSONArray();
           JSONObject tmp = new JSONObject();
           tmp.put("name", "QAKAFKA");
           tmp.put("type", "custom");
           tmp.put("factory", "kafka.KafkaSchemaFactory");
           array.add(tmp);
           object.put("schemas", array);
           return object.toJSONString();
       }
   }
  • 生成长期的model.json,之前是基于文件,当初基于text字符串,mode=inline模式
  • 设置我的表构造、表名称、字段名、字段类型等,并搁置在内存中,同时将适配器查问进去的数据也放进去table外面
  • 获取连贯,执行查问,完满!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理