共计 4430 个字符,预计需要花费 12 分钟才能阅读完成。
欢送拜访我的 GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;
对于 Flink SQL Client
Flink Table & SQL 的 API 实现了通过 SQL 语言解决实时技术算业务,但还是要编写局部 Java 代码 (或 Scala),并且还要编译构建能力提交到 Flink 运行环境,这对于不相熟 Java 或 Scala 的开发者就略有些不敌对了;
SQL Client 的指标就是解决上述问题(官网原话 <font color=”blue”>with a build tool before being submitted to a cluster.</font>)
局限性
遗憾的是,在 Flink-1.10.0 版本中,SQL Client 只是个 Beta 版本 (不适宜用于生产环境),并且只能连贯到本地 Flink,不能像 mysql、cassandra 等客户端工具那样近程连贯 server,这些在未来的版本会解决:
环境信息
接下来采纳实战的形式对 Flink SQL Client 做初步尝试,环境信息如下:
- 电脑:MacBook Pro2018 13 寸,macOS Catalina 10.15.3
- Flink:1.10.0
- JDK:1.8.0_211
本地启动 flink
- 下载 flink 包,地址:<font color=”blue”>http://ftp.kddilabs.jp/infosy…</font>
- 解压:<font color=”blue”>tar -zxvf flink-1.10.0-bin-scala_2.11.tgz</font>
- 进目录 flink-1.10.0/bin/,执行命令 <font color=”blue”>./start-cluster.sh</font> 启动本地 flink;
- 拜访该机器的 8081 端口,可见本地 flink 启动胜利:
启动 SQL Client CLI
- 在目录 flink-1.10.0/bin/ 执行./sql-client.sh 即可启动 SQL Client CLI,如下图所示,红框中的 BETA 揭示着在生产环境如果要用此工具:
- 第一个要把握的是 HELP 命令:
- 从 hello world 开始把,执行命令 <font color=”blue”>select‘Hello world!’;</font>,控制台输入如下图所示,输出 Q 可退出:
两种展现模式
- 第一种是 table mode,成果像是对一般数据表的查问,设置该模式的命令:
SET execution.result-mode=table;
- 第二种是 <font color=”blue”>changelog mode</font>,成果像是打印每一次数据变更的日志,设置该模式的命令:
SET execution.result-mode=changelog;
- 设置 table mode 后,执行以下命令作一次简略的分组查问:
SELECT name,
COUNT(*) AS cnt
FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob'))
AS NameTable(name)
GROUP BY name;
- 为了便于比照,下图同时贴上两种模式的查问后果,留神绿框中显示了该行记录是减少还是删除:
- 不论是哪种模式,查问构造都保留在 SQL Client CLI 过程的堆内存中;
- 在 chenglog 模式下,为了保障控制台能够失常输入输出,查问后果只展现最近 1000 条;
- table 模式下,能够翻页查问更多后果,后果数量受配置项 <font color=”blue”>max-table-result-rows</font> 以及可用堆内存限度;
进一步体验
后面写了几行 SQL,对 Flink SQL Client 有了最根本的感触,接下来做进一步的体验,内容如下:
- 创立 CSV 文件,这是个最简略的图书信息表,只有三个字段:名字、数量、类目,一共十条记录;
- 创立 SQL Client 用到的环境配置文件,该文件形容了数据源以及对应的表的信息;
- 启动 SQL Client,执行 SQL 查问上述 CSV 文件;
- 整个操作步骤如下图所示:
操作
- 首先请确保 Flink 曾经启动;
- 创立名为 <font color=”blue”>book-store.csv</font> 的文件,内容如下:
name001,1,aaa
name002,2,aaa
name003,3,bbb
name004,4,bbb
name005,5,bbb
name006,6,ccc
name007,7,ccc
name008,8,ccc
name009,9,ccc
name010,10,ccc
- 在 <font color=”blue”>flink-1.10.0/conf</font> 目录下创立名为 <font color=”blue”>book-store.yaml</font> 的文件,内容如下:
tables:
- name: BookStore
type: source-table
update-mode: append
connector:
type: filesystem
path: "/Users/zhaoqin/temp/202004/26/book-store.csv"
format:
type: csv
fields:
- name: BookName
type: VARCHAR
- name: BookAmount
type: INT
- name: BookCatalog
type: VARCHAR
line-delimiter: "\n"
comment-prefix: ","
schema:
- name: BookName
type: VARCHAR
- name: BookAmount
type: INT
- name: BookCatalog
type: VARCHAR
- name: MyBookView
type: view
query: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"
execution:
planner: blink # optional: either 'blink' (default) or 'old'
type: streaming # required: execution mode either 'batch' or 'streaming'
result-mode: table # required: either 'table' or 'changelog'
max-table-result-rows: 1000000 # optional: maximum number of maintained rows in
# 'table' mode (1000000 by default, smaller 1 means unlimited)
time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default)
parallelism: 1 # optional: Flink's parallelism (1 by default)
periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
min-idle-state-retention: 0 # optional: table program's minimum idle state time
max-idle-state-retention: 0 # optional: table program's maximum idle state time
# (default database of the current catalog by default)
restart-strategy: # optional: restart strategy
type: fallback # "fallback" to global restart strategy by default
# Configuration options for adjusting and tuning table programs.
# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
table.optimizer.join-reorder-enabled: true
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128kb
# Properties that describe the cluster to which table programs are submitted to.
deployment:
response-timeout: 5000
- 对于 <font color=”blue”>book-store.yaml</font> 文件,有以下几处须要留神:
a. tables.type 等于 source-table,表明这是数据源的配置信息;
b. tables.connector 形容了具体的数据源信息,path 是 <font color=”blue”>book-store.csv</font> 文件的残缺门路;
c. tables.format 形容了文件内容;
d. tables.schema 形容了数据源表的表构造;
e. type 为 view 示意 MyBookView 是个视图 (参考数据库的视图概念);
- 在 <font color=”blue”>flink-1.10.0</font> 目录执行以下命令,即可启动 SQL Client,并指定 <font color=”blue”>book-store.yaml</font> 为环境配置:
bin/sql-client.sh embedded -d conf/book-store.yaml
- 查全表:
SELECT * FROM BookStore;
- 依照 BookCatalog 分组统计记录数:
SELECT BookCatalog, COUNT(*) AS BookCount FROM BookStore GROUP BY BookCatalog;
- 查问视图:
select * from MyBookView;
至此,Flink SQL Client 的首次体验就实现了,咱们此工具算是有了根本理解,接下来的文章会进一步应用 Flink SQL Client 做些简单的操作;
欢送关注公众号:程序员欣宸
微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos