共计 2207 个字符,预计需要花费 6 分钟才能阅读完成。
数栈是云原生—站式数据中台 PaaS,咱们在 github 和 gitee 上有一个乏味的开源我的项目:FlinkX,FlinkX 是一个基于 Flink 的批流对立的数据同步工具,既能够采集动态的数据,也能够采集实时变动的数据,是全域、异构、批流一体的数据同步引擎。大家喜爱的话请给咱们点个 star!star!star!
github 开源我的项目:https://github.com/DTStack/fl…
gitee 开源我的项目:https://gitee.com/dtstack_dev…
首先,本文所述均基于 flink 1.5.4。
一、咱们为什么扩大 Flink-SQL?
因为 Flink 自身 SQL 语法并不提供在对接输出源和输入目标的 SQL 语法。数据开发在应用的过程中须要依据其提供的 Api 接口编写 Source 和 Sink, 异样繁琐,不仅须要理解 FLink 各类 Operator 的 API, 还须要对各个组件的相干调用形式有理解(比方 kafka,redis,mongo,hbase 等),并且在须要关联到内部数据源的时候没有提供 SQL 相干的实现形式,因而数据开发间接应用 Flink 编写 SQL 作为实时的数据分析时须要较大的额定工作量。
咱们的目标是在应用 Flink-SQL 的时候只须要关怀做什么,而不须要关怀怎么做。不须要过多的关怀程序的实现,专一于业务逻辑。
接下来,咱们一起来看下 Flink-SQL 的扩大实现吧!
二、扩大了哪些 flink 相干 sql
1、创立源表语句
2、创立输出表语句
3、创立自定义函数
4、维表关联
三、各个模块是如何翻译到 flink 的实现
1、如何将创立源表的 sql 语句转换为 flink 的 operator
Flink 中表的都会映射到 Table 这个类。而后调用注册办法将 Table 注册到 environment。
StreamTableEnvironment.registerTable(tableName, table);
以后咱们只反对 kafka 数据源。Flink 自身有读取 kafka 的实现类, FlinkKafkaConsumer09, 所以只须要依据指定参数实例化出该对象。并调用注册办法注册即可。
另外须要留神在 flink sql 常常会须要用到 rowtime, proctime, 所以咱们在注册表构造的时候额定增加 rowtime,proctime。
当须要用到 rowtime 的应用须要额定指定 DataStream.watermarks(assignTimestampsAndWatermarks),自定义 watermark 次要做两个事件:1:如何从 Row 中获取工夫字段。2:设定最大延迟时间。
2、如何将创立的输出表 sql 语句转换为 flink 的 operator
Flink 输入 Operator 的基类是 OutputFormat, 咱们这里继承的是 RichOutputFormat, 该抽象类继承 OutputFormat,额定实现了获取运行环境的办法 getRuntimeContext(), 不便于咱们之后自定义 metric 等操作。
咱们以输入到 mysql 插件 mysql-sink 为例,分两局部:
将 create table 解析出表名称,字段信息,mysql 连贯信息。
该局部应用正则表达式的形式将 create table 语句转换为外部的一个实现类。该类存储了表名称,字段信息,插件类型,插件连贯信息。
继承 RichOutputFormat 将数据写到对应的内部数据源。
次要是实现 writeRecord 办法,在 mysql 插件中其实就是调用 jdbc 实现插入或者更新办法。
3、如何将自定义函数语句转换为 flink 的 operator;
Flink 对 udf 提供两种类型的实现形式:
1)继承 ScalarFunction
2)继承 TableFunction
须要做的将用户提供的 jar 增加到 URLClassLoader, 并加载指定的 class (实现上述接口的类门路), 而后调用 TableEnvironment.registerFunction(funcName, udfFunc);即实现了 udf 的注册。之后即可应用改定义的 udf;
4、维表性能是如何实现的?
流计算中一个常见的需要就是为数据流补齐字段。因为数据采集端采集到的数据往往比拟无限,在做数据分析之前,就要先将所需的维度信息补全,然而以后 flink 并未提供 join 内部数据源的 SQL 性能。
实现该性能须要留神的几个问题:
1)维表的数据是一直变动的
在实现的时候须要反对定时更新内存中的缓存的内部数据源,比方应用 LRU 等策略。
2)IO 吞吐问题
如果每接管到一条数据就串行到内部数据源去获取对应的关联记录的话,网络提早将会是零碎最大的瓶颈。这里咱们抉择阿里奉献给 flink 社区的算子 RichAsyncFunction。该算子应用异步的形式从内部数据源获取数据,大大减少了破费在网络申请上的工夫。
3)如何将 sql 中蕴含的维表解析到 flink operator
为了从 sql 中解析出指定的维表和过滤条件,应用正则显著不是一个适合的方法。须要匹配各种可能性。将是一个无穷无尽的过程。查看 flink 自身对 sql 的解析。它应用了 calcite 做为 sql 解析的工作。将 sql 解析出一个语法树,通过迭代的形式,搜寻到对应的维表;而后将维表和非维表构造离开。
通过上述步骤能够通过 SQL 实现罕用的从 kafka 源表,join 内部数据源,写入到指定的内部目标构造中。