共计 22346 个字符,预计需要花费 56 分钟才能阅读完成。
明天教大家借助一款框架疾速实现一个数据库,这个框架就是Calcite
,上面会带大家通过两个例子疾速教会大家怎么实现,一个是能够通过 SQL 语句的形式能够间接查问文件内容,第二个是模仿 Mysql 查问性能,以及最初通知大家怎么实现 SQL 查问 Kafka 数据。
Calcite
Calcite
是一个用于优化异构数据源的查询处理的可插拔根底框架(他是一个框架),能够将任意数据(Any data, Anywhere)DML 转换成基于 SQL 的 DML 引擎,并且咱们能够选择性的应用它的局部性能。
Calcite 能干什么
- 应用 SQL 拜访内存中某个数据
- 应用 SQL 拜访某个文件的数据
- 跨数据源的数据拜访、聚合、排序等(例如 Mysql 和 Redis 数据源中的数据进行 join)
当咱们须要自建一个数据库的时候,数据能够为任何格局的,比方 text、word、xml、mysql、es、csv、第三方接口数据等等,咱们只有数据,咱们想让这些数据反对 SQL 模式动静增删改查。
另外,像 Hive、Drill、Flink、Phoenix 和 Storm 等我的项目中,数据处理系统都是应用 Calcite 来做 SQL 解析和查问优化,当然,还有局部用来构建本人的 JDBC driver。
名词解释
Token
就是将规范 SQL(能够了解为 Mysql)关键词以及关键词之间的字符串截取进去,每一个 token
,会被封装为一个SqlNode
,SqlNode
会衍生很多子类,比方 Select
会被封装为SqlSelect
,以后 SqlNode
也能反解析为 SQL 文本。
RelDataTypeField
某个字段的名称和类型信息
RelDataType
多个 RelDataTypeField 组成了 RelDataType,能够了解为数据行
Table
一个残缺的表的信息
Schema
所有元数据的组合,能够了解为一组 Table 或者库的概念
开始应用
1. 引入包
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<!-- 目前最新版本 2022-09-10 日更新 -->
<version>1.32.0</version>
</dependency>
2. 创立 model.json 文件和表构造 csv
model.json 外面次要形容或者说通知 Calcite
如何创立 Schema
,也就是通知框架怎么创立出库。
{
"version": "1.0",// 疏忽
"defaultSchema": "CSV",// 设置默认的 schema
"schemas": [// 可定义多个 schema
{
"name": "CSV",// 相当于 namespace 和下面的 defaultSchema 的值对应
"type": "custom",// 写死
"factory": "csv.CsvSchemaFactory",//factory 的类名必须是你本人实现的 factory 的包的全门路
"operand": { // 这里能够传递自定义参数,最终会以 map 的模式传递给 factory 的 operand 参数
"directory": "csv"//directory 代表 calcite 会在 resources 上面的 csv 目录上面读取所有的 csv 文件,factory 创立的 Schema 会吧这些文件全副构建成 Table,能够了解为读取数据文件的根目录,当然 key 的名称也不肯定非得用 directory,你能够随便指定
}
}
]
}
接下来还须要定义一个 csv
文件,用来定义表构造。
NAME:string,MONEY:string
aixiaoxian,10000 万
xiaobai,10000 万
adong,10000 万
maomao,10000 万
xixi,10000 万
zizi,10000 万
wuwu,10000 万
kuku,10000 万
整个我的项目的构造大略就是这样:
3. 实现 Schema 的工厂类
在上述文件中指定的包门路上来编写 CsvSchemaFactory
类,实现 SchemaFactory
接口,并且实现外面惟一的办法 create
办法,创立Schema
(库)。
public class CsvSchemaFactory implements SchemaFactory {
/**
* parentSchema 父节点,个别为 root
* name 为 model.json 中定义的名字
* operand 为 model.json 中定于的数据,这里能够传递自定义参数
*
* @param parentSchema Parent schema
* @param name Name of this schema
* @param operand The "operand" JSON property
* @return
*/
@Override
public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {final String directory = (String) operand.get("directory");
File directoryFile = new File(directory);
return new CsvSchema(directoryFile, "scannable");
}
}
4. 自定义 Schma 类
有了 SchemaFactory
,接下来须要自定义 Schema
类。
自定义的 Schema
须要实现 Schema
接口,然而间接实现要实现的办法太多,咱们去实现官网的 AbstractSchema
类,这样就只须要实现一个办法就行(如果有其余定制化需要能够实现原生接口)。
外围的逻辑就是 createTableMap
办法,用于创立出 Table
表。
他会扫描指定的 Resource
上面的所有 csv
文件,将每个文件映射成 Table
对象,最终以 map
模式返回,Schema
接口的其余几个办法会用到这个对象。
// 实现这一个办法就行了
@Override
protected Map<String, Table> getTableMap() {if (tableMap == null) {tableMap = createTableMap();
}
return tableMap;
}
private Map<String, Table> createTableMap() {
// Look for files in the directory ending in ".csv"
final Source baseSource = Sources.of(directoryFile);
// 会主动过滤掉非指定文件后缀的文件,我这里写的 csv
File[] files = directoryFile.listFiles((dir, name) -> {final String nameSansGz = trim(name, ".gz");
return nameSansGz.endsWith(".csv");
});
if (files == null) {System.out.println("directory" + directoryFile + "not found");
files = new File[0];
}
// Build a map from table name to table; each file becomes a table.
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (File file : files) {Source source = Sources.of(file);
final Source sourceSansCsv = source.trimOrNull(".csv");
if (sourceSansCsv != null) {final Table table = createTable(source);
builder.put(sourceSansCsv.relative(baseSource).path(), table);
}
}
return builder.build();}
5. 自定义 Table
Schema
有了,并且数据文件 csv
也映射成 Table
了,一个 csv
文件对应一个 Table
。
接下来咱们去自定义 Table
,自定义 Table
的外围是咱们要定义字段的类型和名称,以及如何读取 csv
文件。
- 先获取数据类型和名称,即单表构造,从
csv
文件头中获取(以后文件头须要咱们本人定义,包含规定咱们也能够定制化)。
/**
* Base class for table that reads CSV files.
*/
public abstract class CsvTable extends AbstractTable {
protected final Source source;
protected final @Nullable RelProtoDataType protoRowType;
private @Nullable RelDataType rowType;
private @Nullable List<RelDataType> fieldTypes;
/**
* Creates a CsvTable.
*/
CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
this.source = source;
this.protoRowType = protoRowType;
}
/**
* 创立一个 CsvTable,继承 AbstractTable,须要实现外面的 getRowType 办法,此办法就是获取以后的表构造。Table 的类型有很多种,比方还有视图类型,AbstractTable 类中帮咱们默认实现了 Table 接口的一些办法,比方 getJdbcTableType 办法,默认为 Table 类型,如果有其余定制化需要可间接实现 Table 接口。和 AbstractSchema 很像
*/
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {if (protoRowType != null) {return protoRowType.apply(typeFactory);
}
if (rowType == null) {rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
null);
}
return rowType;
}
/**
* Returns the field types of this CSV table.
*/
public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) {if (fieldTypes == null) {fieldTypes = new ArrayList<>();
CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
fieldTypes);
}
return fieldTypes;
}
public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
Source source, @Nullable List<RelDataType> fieldTypes) {final List<RelDataType> types = new ArrayList<>();
final List<String> names = new ArrayList<>();
try (CSVReader reader = openCsv(source)) {String[] strings = reader.readNext();
if (strings == null) {strings = new String[]{"EmptyFileHasNoColumns:boolean"};
}
for (String string : strings) {
final String name;
final RelDataType fieldType;
// 就是简略的读取字符串冒号后面是名称,冒号前面是类型
final int colon = string.indexOf(':');
if (colon >= 0) {name = string.substring(0, colon);
String typeString = string.substring(colon + 1);
Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
if (decimalMatcher.matches()) {int precision = Integer.parseInt(decimalMatcher.group(1));
int scale = Integer.parseInt(decimalMatcher.group(2));
fieldType = parseDecimalSqlType(typeFactory, precision, scale);
} else {switch (typeString) {
case "string":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
case "boolean":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
break;
case "byte":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
break;
case "char":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
break;
case "short":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
break;
case "int":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
break;
case "long":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
break;
case "float":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
break;
case "double":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
break;
case "date":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
break;
case "timestamp":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
break;
case "time":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
break;
default:
LOGGER.warn("Found unknown type: {} in file: {} for column: {}. Will assume the type of"
+ "column is string.",
typeString, source.path(), name);
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
}
}
} else {
// 如果没定义,默认都是 String 类型,字段名称也是 string
name = string;
fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
names.add(name);
types.add(fieldType);
if (fieldTypes != null) {fieldTypes.add(fieldType);
}
}
} catch (IOException e) {// ignore}
if (names.isEmpty()) {names.add("line");
types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
- 获取文件中的数据,下面把
Table
的表构造字段名称和类型都获取到了当前,就剩最初一步了,获取文件中的数据。咱们须要自定义一个类,实现ScannableTable
接口,并且实现外面惟一的办法scan
办法,其实实质上就是读文件,而后把文件的每一行的数据和上述获取的fileType
进行匹配。
@Override
public Enumerable<Object[]> scan(DataContext root) {JavaTypeFactory typeFactory = root.getTypeFactory();
final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<@Nullable Object[]>() {
@Override
public Enumerator<@Nullable Object[]> enumerator() {
// 返回咱们自定义的读取数据的类
return new CsvEnumerator<>(source, cancelFlag, false, null,
CsvEnumerator.arrayConverter(fieldTypes, fields, false));
}
};
}
public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
@Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
this.cancelFlag = cancelFlag;
this.rowConverter = rowConverter;
this.filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
try {this.reader = openCsv(source);
// 跳过第一行,因为第一行是定义类型和名称的
this.reader.readNext(); // skip header row} catch (IOException e) {throw new RuntimeException(e);
}
}
//CsvEnumerator 必须实现 calcit 本人的迭代器,外面有 current、moveNext 办法,current 是返回以后游标所在的数据记录,moveNext 是将游标指向下一个记录,官网中本人定义了一个类型转换器,是将 csv 文件中的数据转换成文件头指定的类型,这个须要咱们本人来实现
@Override
public E current() {return castNonNull(current);
}
@Override
public boolean moveNext() {
try {
outer:
for (; ;) {if (cancelFlag.get()) {return false;}
final String[] strings = reader.readNext();
if (strings == null) {
current = null;
reader.close();
return false;
}
if (filterValues != null) {for (int i = 0; i < strings.length; i++) {String filterValue = filterValues.get(i);
if (filterValue != null) {if (!filterValue.equals(strings[i])) {continue outer;}
}
}
}
current = rowConverter.convertRow(strings);
return true;
}
} catch (IOException e) {throw new RuntimeException(e);
}
}
protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {if (fieldType == null || string == null) {return string;}
switch (fieldType.getSqlTypeName()) {
case BOOLEAN:
if (string.length() == 0) {return null;}
return Boolean.parseBoolean(string);
case TINYINT:
if (string.length() == 0) {return null;}
return Byte.parseByte(string);
case SMALLINT:
if (string.length() == 0) {return null;}
return Short.parseShort(string);
case INTEGER:
if (string.length() == 0) {return null;}
return Integer.parseInt(string);
case BIGINT:
if (string.length() == 0) {return null;}
return Long.parseLong(string);
case FLOAT:
if (string.length() == 0) {return null;}
return Float.parseFloat(string);
case DOUBLE:
if (string.length() == 0) {return null;}
return Double.parseDouble(string);
case DECIMAL:
if (string.length() == 0) {return null;}
return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
case DATE:
if (string.length() == 0) {return null;}
try {Date date = TIME_FORMAT_DATE.parse(string);
return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
} catch (ParseException e) {return null;}
case TIME:
if (string.length() == 0) {return null;}
try {Date date = TIME_FORMAT_TIME.parse(string);
return (int) date.getTime();} catch (ParseException e) {return null;}
case TIMESTAMP:
if (string.length() == 0) {return null;}
try {Date date = TIME_FORMAT_TIMESTAMP.parse(string);
return date.getTime();} catch (ParseException e) {return null;}
case VARCHAR:
default:
return string;
}
}
6. 最初
至此咱们须要筹备的货色:库、表名称、字段名称、字段类型都有了,接下来咱们去写咱们的 SQL 语句查问咱们的数据文件。
创立好几个测试的数据文件,例如下面我的项目构造中我创立 2 个 csv 文件USERINFO.csv
、ASSET.csv
,而后创立测试类。
这样跑起来,就能够通过 SQL 语句的形式间接查问数据了。
public class Test {public static void main(String[] args) throws SQLException {
Connection connection = null;
Statement statement = null;
try {Properties info = new Properties();
info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
connection = DriverManager.getConnection("jdbc:calcite:", info);
statement = connection.createStatement();
print(statement.executeQuery("select * from asset"));
print(statement.executeQuery("select * from userinfo"));
print(statement.executeQuery("select age from userinfo where name ='aixiaoxian' "));
print(statement.executeQuery("select * from userinfo where age >60"));
print(statement.executeQuery("select * from userinfo where name like'a%' "));
} finally {connection.close();
}
}
private static void print(ResultSet resultSet) throws SQLException {final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {for (int i = 1; ; i++) {System.out.print(resultSet.getString(i));
if (i < columnCount) {System.out.print(",");
} else {System.out.println();
break;
}
}
}
}
}
查问后果:
这里在测试的时候踩到 2 个坑,大家如果本人试验的时候能够防止下。
Calcite
默认会把你的 SQL 语句中的表名和类名全副转换为大写,因为默认的 csv(其余文件也一样)文件的名称就是表名,除非你自定义规定,所以你的文件名要写成大写。Calcite
有一些默认的关键字不能用作表名,不然会查问失败,比方我刚开始定的user.csv
就始终查不进去,改成USERINFO
就能够了,这点和Mysql
的内置关键字差不多,也能够通过个性化配置去改。
演示 Mysql
- 首先,还是先筹备
Calcite
须要的货色:库、表名称、字段名称、字段类型。
如果数据源应用 Mysql
的话,这些都不必咱们去 JAVA 服务中去定义,间接在 Mysql 客户端创立好,这里间接创立两张表用于测试,就和咱们的 csv
文件一样。
CREATE TABLE `USERINFO1` (`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
`AGE` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
CREATE TABLE `ASSET` (`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
`MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
- 上述
csv
案例中的SchemaFactory
以及Schema
这些都不须要创立,因为Calcite
默认提供了 Mysql 的 Adapter 适配器。 - 其实,上述两步都不须要做,咱们真正要做的是,通知
Calcite
你的 JDBC 的连贯信息就行了,也是在model.json
文件中定义。
{
"version": "1.0",
"defaultSchema": "Demo",
"schemas": [
{
"name": "Demo",
"type": "custom",
// 这里是 calcite 默认的 SchemaFactory,外面的流程和咱们上述本人定义的雷同,上面会简略看看源码。"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
// 我用的是 mysql8 以上版本,所以这里留神包的名称
"jdbcDriver": "com.mysql.cj.jdbc.Driver",
"jdbcUrl": "jdbc:mysql://localhost:3306/irving",
"jdbcUser": "root",
"jdbcPassword": "123456"
}
}
]
}
- 在我的项目中引入 Mysql 的驱动包
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
- 写好测试类,这样间接就相当于实现了所有的性能了。
public class TestMysql {public static void main(String[] args) throws SQLException {
Connection connection = null;
Statement statement = null;
try {Properties info = new Properties();
info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
connection = DriverManager.getConnection("jdbc:calcite:", info);
statement = connection.createStatement();
statement.executeUpdate("insert into userinfo1 values ('xxx',12)");
print(statement.executeQuery("select * from asset"));
print(statement.executeQuery("select * from userinfo1"));
print(statement.executeQuery("select age from userinfo1 where name ='aixiaoxian' "));
print(statement.executeQuery("select * from userinfo1 where age >60"));
print(statement.executeQuery("select * from userinfo1 where name like'a%' "));
} finally {connection.close();
}
}
private static void print(ResultSet resultSet) throws SQLException {final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {for (int i = 1; ; i++) {System.out.print(resultSet.getString(i));
if (i < columnCount) {System.out.print(",");
} else {System.out.println();
break;
}
}
}
}
}
查问后果:
Mysql 实现原理
上述咱们在 model.json
文件中指定了 org.apache.calcite.adapter.jdbc.JdbcSchema$Factory
类,能够看下这个类的代码。
这个类是把 Factory
和 Schema
写在了一起,其实就是调用 schemafactory
类的 create
办法创立一个 schema
进去,和咱们下面自定义的流程是一样的。
其中 JdbcSchema
类也是 Schema
的子类,所以也会实现 getTable
办法(这个咱们上述也实现了,咱们过后是获取表构造和表的字段类型以及名称,是从 csv 文件头中读文件的),JdbcSchema
的实现是通过连贯 Mysql 服务端查问元数据信息,再将这些信息封装成 Calcite
须要的对象格局。
这里同样要留神 csv
形式的 2 个留神点,大小写和关键字问题。
public static JdbcSchema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand) {
DataSource dataSource;
try {final String dataSourceName = (String) operand.get("dataSource");
if (dataSourceName != null) {
dataSource =
AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
} else {
// 会走在这里来,这里就是咱们在 model.json 中指定的 jdbc 的连贯信息,最终会创立一个 datasource
final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
final String jdbcDriver = (String) operand.get("jdbcDriver");
final String jdbcUser = (String) operand.get("jdbcUser");
final String jdbcPassword = (String) operand.get("jdbcPassword");
dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
}
} catch (Exception e) {throw new RuntimeException("Error while reading dataSource", e);
}
String jdbcCatalog = (String) operand.get("jdbcCatalog");
String jdbcSchema = (String) operand.get("jdbcSchema");
String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
return JdbcSchema.create(parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
} else {
SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(SqlDialectFactory.class, sqlDialectFactory);
return JdbcSchema.create(parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
}
}
@Override public @Nullable Table getTable(String name) {return getTableMap(false).get(name);
}
private synchronized ImmutableMap<String, JdbcTable> getTableMap(boolean force) {if (force || tableMap == null) {tableMap = computeTables();
}
return tableMap;
}
private ImmutableMap<String, JdbcTable> computeTables() {
Connection connection = null;
ResultSet resultSet = null;
try {connection = dataSource.getConnection();
final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
final String catalog = catalogSchema.left;
final String schema = catalogSchema.right;
final Iterable<MetaImpl.MetaTable> tableDefs;
Foo threadMetadata = THREAD_METADATA.get();
if (threadMetadata != null) {tableDefs = threadMetadata.apply(catalog, schema);
} else {final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
// 获取元数据
final DatabaseMetaData metaData = connection.getMetaData();
resultSet = metaData.getTables(catalog, schema, null, null);
while (resultSet.next()) {
// 获取库名,表明等信息
final String catalogName = resultSet.getString(1);
final String schemaName = resultSet.getString(2);
final String tableName = resultSet.getString(3);
final String tableTypeName = resultSet.getString(4);
tableDefList.add(
new MetaImpl.MetaTable(catalogName, schemaName, tableName,
tableTypeName));
}
tableDefs = tableDefList;
}
final ImmutableMap.Builder<String, JdbcTable> builder =
ImmutableMap.builder();
for (MetaImpl.MetaTable tableDef : tableDefs) {
final String tableTypeName2 =
tableDef.tableType == null
? null
: tableDef.tableType.toUpperCase(Locale.ROOT).replace('','_');
final TableType tableType =
Util.enumVal(TableType.OTHER, tableTypeName2);
if (tableType == TableType.OTHER && tableTypeName2 != null) {System.out.println("Unknown table type:" + tableTypeName2);
}
// 最终封装成 JdbcTable 对象
final JdbcTable table =
new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
tableDef.tableName, tableType);
builder.put(tableDef.tableName, table);
}
return builder.build();} catch (SQLException e) {
throw new RuntimeException("Exception while reading tables", e);
} finally {close(connection, null, resultSet);
}
}
SQL 执行流程
OK,到这里基本上两个简略的案例曾经演示好了,最初补充一下整个 Calcite
架构和整个 SQL 的执行流程。
整个流程如下:SQL 解析(Parser)=> SQL 校验(Validator)=> SQL 查问优化(optimizer)=> SQL 生成 => SQL 执行
SQL Parser
所有的 SQL 语句在执行前都须要经验 SQL 解析器解析,解析器的工作内容就是将 SQL 中的 Token 解析成形象语法树,每个树的节点都是一个 SqlNode,这个过程其实就是 Sql Text => SqlNode 的过程。
咱们后面的 Demo 没有自定义 Parser,是因为 Calcite 采纳了本人默认的 Parser(SqlParserImpl)。
SqlNode
SqlNode
是整个解析中的外围,比方图中你能够发现,对于每个比方 select
、from
、where
关键字之后的内容其实都是一个SqlNode
。
parserConfig
办法次要是设置 SqlParserFactory 的参数,比方咱们下面所说的我本地测试的时候踩的大小写的坑,就能够在这里设置。
间接调用 setCaseSensitive=false
即不会将 SQL 语句中的表名列名转为大写,上面是默认的,其余的参数能够按需配置。
SQL Validator
SQL 语句先通过 Parser,而后通过语法验证器,留神 Parser 并不会验证语法的正确性。
其实 Parser 只会验证 SQL 关键词的地位是否正确,咱们上述 2 个 Parser 的例子中都没有创立 schema
和 table
这些,然而如果这样写,那就会报错,这个谬误就是 parser
检测后抛出来的(ParseLocationErrorTest)。
真正的校验在 validator
中,会去验证查问的表名是否存在,查问的字段是否存在,类型是否匹配,这个过程比较复杂,默认的 validator
是SqlValidatorImpl
。
查问优化
比方关系代数,比方什么投影、笛卡尔积这些,Calcite
提供了很多外部的优化器,也能够实现本人的优化器。
适配器
Calcite
是不蕴含存储层的,所以提供一种适配器的机制来拜访内部的数据存储或者存储引擎。
最初,进阶
官网外面写了将来会反对 Kafka
适配器到公共 Api
中,到时候应用起来就和上述集成 Mysql
一样不便,然而当初还没有反对,我这里给大家提供个本人实现的形式,这样就能够通过 SQL 的形式间接查问 Kafka 中的 Topic 数据等信息。
这里咱们外部集成实现了 KSQL
的能力,查问后果是 OK 的。
还是像上述步骤一样,咱们须要筹备库、表名称、字段名称、字段类型、数据源(多进去的中央)。
- 自定义
Sql
解析,之前咱们都没有自定义解析,这里须要自定义解析,是因为我须要动静解析sql
的where
条件外面的partation
。
- 配置解析器,就是之前案例中提到的配置大小写之类的
- 创立解析器,应用的默认
SqlParseImpl
- 开始解析,生成
AST
,咱们能够基于生成的SqlNode
做一些业务相干的校验和参数解析
- 适配器获取数据源
public class KafkaConsumerAdapter {public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) {Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<TopicPartition> topics = new ArrayList<>();
for (Integer partition : kafkaSql.getPartition()) {TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
topics.add(tp);
}
consumer.assign(topics);
for (TopicPartition tp : topics) {Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
long position = 500;
if (offsets.get(tp).longValue() > position) {consumer.seek(tp, offsets.get(tp).longValue() - 500);
} else {consumer.seek(tp, 0);
}
}
List<KafkaResult> results = new ArrayList<>();
boolean flag = true;
while (flag) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 转成我定义的对象汇合
KafkaResult result = new KafkaResult();
result.setPartition(record.partition());
result.setOffset(record.offset());
result.setMsg(record.value());
result.setKey(record.key());
results.add(result);
}
if (!records.isEmpty()) {flag = false;}
}
consumer.close();
return results;
}
}
- 执行查问,就能够失去咱们想要的成果了。
public class TestKafka {public static void main(String[] args) throws Exception {KafkaService kafkaService = new KafkaService();
// 把解析到的参数放在我本人定义的 kafkaSqlInfo 对象中
KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000");
// 适配器获取数据源,次要是从上述的 sqlInfo 对象中去 poll 数据
List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
// 执行查问
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like'%account%'limit 1000");
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql("select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000");
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
}
private static void query(String tableName, List<KafkaResult> results,
String sql) throws Exception {
// 创立 model.json,设置我的 SchemaFactory,设置库名
String model = createTempJson();
// 设置我的表构造,表名称和表字段名以及类型
KafkaTableSchema.generateSchema(tableName, results);
Properties info = new Properties();
info.setProperty("lex", Lex.JAVA.toString());
Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
Statement st = connection.createStatement();
// 执行
ResultSet result = st.executeQuery(sql);
ResultSetMetaData rsmd = result.getMetaData();
List<Map<String, Object>> ret = new ArrayList<>();
while (result.next()) {Map<String, Object> map = new LinkedHashMap<>();
for (int i = 1; i <= rsmd.getColumnCount(); i++) {map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
}
ret.add(map);
}
result.close();
st.close();
connection.close();}
private static void print(ResultSet resultSet) throws SQLException {final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {for (int i = 1; ; i++) {System.out.print(resultSet.getString(i));
if (i < columnCount) {System.out.print(",");
} else {System.out.println();
break;
}
}
}
}
private static String createTempJson() throws IOException {JSONObject object = new JSONObject();
object.put("version", "1.0");
object.put("defaultSchema", "QAKAFKA");
JSONArray array = new JSONArray();
JSONObject tmp = new JSONObject();
tmp.put("name", "QAKAFKA");
tmp.put("type", "custom");
tmp.put("factory", "kafka.KafkaSchemaFactory");
array.add(tmp);
object.put("schemas", array);
return object.toJSONString();}
}
- 生成长期的
model.json
,之前是基于文件,当初基于text
字符串,mode=inline
模式 - 设置我的表构造、表名称、字段名、字段类型等,并搁置在内存中,同时将适配器查问进去的数据也放进去
table
外面 - 获取连贯,执行查问,完满!