欢送拜访我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;

对于Flink SQL Client

Flink Table & SQL的API实现了通过SQL语言解决实时技术算业务,但还是要编写局部Java代码(或Scala),并且还要编译构建能力提交到Flink运行环境,这对于不相熟Java或Scala的开发者就略有些不敌对了;
SQL Client的指标就是解决上述问题(官网原话<font color="blue">with a build tool before being submitted to a cluster.</font>)

局限性

遗憾的是,在Flink-1.10.0版本中,SQL Client只是个Beta版本(不适宜用于生产环境),并且只能连贯到本地Flink,不能像mysql、cassandra等客户端工具那样近程连贯server,这些在未来的版本会解决:

环境信息

接下来采纳实战的形式对Flink SQL Client做初步尝试,环境信息如下:

  1. 电脑:MacBook Pro2018 13寸,macOS Catalina 10.15.3
  2. Flink:1.10.0
  3. JDK:1.8.0_211

本地启动flink

  1. 下载flink包,地址:<font color="blue">http://ftp.kddilabs.jp/infosy...</font>
  2. 解压:<font color="blue">tar -zxvf flink-1.10.0-bin-scala_2.11.tgz</font>
  3. 进目录flink-1.10.0/bin/,执行命令<font color="blue">./start-cluster.sh</font>启动本地flink;
  4. 拜访该机器的8081端口,可见本地flink启动胜利:

启动SQL Client CLI

  1. 在目录flink-1.10.0/bin/执行./sql-client.sh即可启动SQL Client CLI,如下图所示,红框中的BETA揭示着在生产环境如果要用此工具:

  1. 第一个要把握的是HELP命令:

  1. 从hello world开始把,执行命令<font color="blue">select ‘Hello world!’;</font>,控制台输入如下图所示,输出Q可退出:

两种展现模式

  1. 第一种是table mode,成果像是对一般数据表的查问,设置该模式的命令:
SET execution.result-mode=table;
  1. 第二种是<font color="blue">changelog mode</font>,成果像是打印每一次数据变更的日志,设置该模式的命令:
SET execution.result-mode=changelog;
  1. 设置table mode后,执行以下命令作一次简略的分组查问:
SELECT name,   COUNT(*) AS cnt   FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob'))   AS NameTable(name)   GROUP BY name;
  1. 为了便于比照,下图同时贴上两种模式的查问后果,留神绿框中显示了该行记录是减少还是删除:

  1. 不论是哪种模式,查问构造都保留在SQL Client CLI过程的堆内存中;
  2. 在chenglog模式下,为了保障控制台能够失常输入输出,查问后果只展现最近1000条;
  3. table模式下,能够翻页查问更多后果,后果数量受配置项<font color="blue">max-table-result-rows</font>以及可用堆内存限度;

进一步体验

后面写了几行SQL,对Flink SQL Client有了最根本的感触,接下来做进一步的体验,内容如下:

  1. 创立CSV文件,这是个最简略的图书信息表,只有三个字段:名字、数量、类目,一共十条记录;
  2. 创立SQL Client用到的环境配置文件,该文件形容了数据源以及对应的表的信息;
  3. 启动SQL Client,执行SQL查问上述CSV文件;
  4. 整个操作步骤如下图所示:

操作

  1. 首先请确保Flink曾经启动;
  2. 创立名为<font color="blue">book-store.csv</font>的文件,内容如下:
name001,1,aaaname002,2,aaaname003,3,bbbname004,4,bbbname005,5,bbbname006,6,cccname007,7,cccname008,8,cccname009,9,cccname010,10,ccc
  1. 在<font color="blue">flink-1.10.0/conf</font>目录下创立名为<font color="blue">book-store.yaml</font>的文件,内容如下:
tables:  - name: BookStore    type: source-table    update-mode: append    connector:      type: filesystem      path: "/Users/zhaoqin/temp/202004/26/book-store.csv"    format:      type: csv      fields:        - name: BookName          type: VARCHAR        - name: BookAmount          type: INT        - name: BookCatalog          type: VARCHAR      line-delimiter: "\n"      comment-prefix: ","    schema:      - name: BookName        type: VARCHAR      - name: BookAmount        type: INT      - name: BookCatalog        type: VARCHAR  - name: MyBookView    type: view    query: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"execution:  planner: blink                    # optional: either 'blink' (default) or 'old'  type: streaming                   # required: execution mode either 'batch' or 'streaming'  result-mode: table                # required: either 'table' or 'changelog'  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)  parallelism: 1                    # optional: Flink's parallelism (1 by default)  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)  min-idle-state-retention: 0       # optional: table program's minimum idle state time  max-idle-state-retention: 0       # optional: table program's maximum idle state time                                    #   (default database of the current catalog by default)  restart-strategy:                 # optional: restart strategy    type: fallback                  #   "fallback" to global restart strategy by default# Configuration options for adjusting and tuning table programs.# A full list of options and their default values can be found# on the dedicated "Configuration" page.configuration:  table.optimizer.join-reorder-enabled: true  table.exec.spill-compression.enabled: true  table.exec.spill-compression.block-size: 128kb# Properties that describe the cluster to which table programs are submitted to.deployment:  response-timeout: 5000
  1. 对于<font color="blue">book-store.yaml</font>文件,有以下几处须要留神:

a. tables.type等于source-table,表明这是数据源的配置信息;

b. tables.connector形容了具体的数据源信息,path是<font color="blue">book-store.csv</font>文件的残缺门路;

c. tables.format形容了文件内容;

d. tables.schema形容了数据源表的表构造;

e. type为view示意MyBookView是个视图(参考数据库的视图概念);

  1. 在<font color="blue">flink-1.10.0</font>目录执行以下命令,即可启动SQL Client,并指定<font color="blue">book-store.yaml</font>为环境配置:
bin/sql-client.sh embedded -d conf/book-store.yaml
  1. 查全表:
SELECT * FROM BookStore;

  1. 依照BookCatalog分组统计记录数:
SELECT BookCatalog, COUNT(*) AS BookCount FROM BookStore GROUP BY BookCatalog;

  1. 查问视图:
select * from MyBookView;

至此,Flink SQL Client的首次体验就实现了,咱们此工具算是有了根本理解,接下来的文章会进一步应用Flink SQL Client做些简单的操作;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界...
https://github.com/zq2599/blog_demos