关于云计算:Flink-SQL-Client综合实战

41次阅读

共计 5603 个字符,预计需要花费 15 分钟才能阅读完成。

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

在《Flink SQL Client 初探》一文中,咱们体验了 Flink SQL Client 的基本功能,明天来通过实战更深刻学习和体验 Flink SQL;

实战内容

本次实战次要是通过 Flink SQL Client 生产 kafka 的实时音讯,再用各种 SQL 操作对数据进行查问统计,内容汇总如下:

  1. DDL 创立 Kafka 表
  2. 窗口统计;
  3. 数据写入 ElasticSearch
  4. 联表操作

版本信息

  1. Flink:1.10.0
  2. Flink 所在操作系统:CentOS Linux release 7.7.1908
  3. JDK:1.8.0_211
  4. Kafka:2.4.0(scala:2.12)
  5. Mysql:5.7.29

数据源筹备

  1. 本次实战用的数据,起源是阿里云天池公开数据集的一份淘宝用户行为数据集,获取形式请参考《筹备数据集用于 flink 学习》
  2. 获取到数据集文件后转成 kafka 音讯收回,这样咱们应用 Flink SQL 时就依照实时生产 kafka 音讯的形式来操作,具体的操作形式请参考《将 CSV 的数据发送到 kafka》
  3. 上述操作实现后,一百零四万条淘宝用户行为数据就会通过 kafka 音讯程序收回,咱们的实战就有不间断实时数据可用 了,音讯内容如下:
{"user_id":1004080,"item_id":2258662,"category_id":79451,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
{"user_id":100814,"item_id":5071478,"category_id":1107469,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
{"user_id":114321,"item_id":4306269,"category_id":4756105,"behavior":"pv","ts":"2017-11-24T23:47:48Z"}
  1. 上述音讯中每个字段的含意如下表:
列名称 阐明
用户 ID 整数类型,序列化后的用户 ID
商品 ID 整数类型,序列化后的商品 ID
商品类目 ID 整数类型,序列化后的商品所属类目 ID
行为类型 字符串,枚举类型,包含 (‘pv’, ‘buy’, ‘cart’, ‘fav’)
工夫戳 行为产生的工夫戳
工夫字符串 依据工夫戳字段生成的工夫字符串

jar 筹备

实战过程中要用到上面这五个 jar 文件:

  1. flink-jdbc_2.11-1.10.0.jar
  2. flink-json-1.10.0.jar
  3. flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
  4. flink-sql-connector-kafka_2.11-1.10.0.jar
  5. mysql-connector-java-5.1.48.jar

我已将这些文件打包上传到 GitHub,下载地址:https://raw.githubusercontent…

请在 flink 装置目录下新建文件夹 <font color=”blue”>sql_lib</font>,而后将这五个 jar 文件放进去;

Elasticsearch 筹备

如果您装了 docker 和 docker-compose,那么上面的命令能够疾速部署 elasticsearch 和 head 工具:

wget https://raw.githubusercontent.com/zq2599/blog_demos/master/elasticsearch_docker_compose/docker-compose.yml && \
docker-compose up -d

筹备结束,开始操作吧;

DDL 创立 Kafka 表

  1. 进入 flink 目录,启动 flink:<font color=”blue”>bin/start-cluster.sh</font>
  2. 启动 Flink SQL Client:<font color=”blue”>bin/sql-client.sh embedded -l sql_lib</font>
  3. 启动胜利显示如下:

  1. 执行以下命令即可创立 kafka 表,请依照本人的信息调整参数:
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 解决工夫列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在 ts 上定义 watermark,ts 成为事件工夫列
) WITH (
    'connector.type' = 'kafka',  -- kafka connector
    'connector.version' = 'universal',  -- universal 反对 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
    'connector.properties.zookeeper.connect' = '192.168.50.43:2181',  -- zk 地址
    'connector.properties.bootstrap.servers' = '192.168.50.43:9092',  -- broker 地址
    'format.type' = 'json'  -- 数据源格局为 json
);
  1. 执行 <font color=”blue”>SELECT * FROM user_behavior;</font> 看看原始数据,如果音讯失常应该和下图相似:

窗口统计

  1. 上面的 SQL 是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START 返回的数据格式是 timestamp,这里再调用 DATE_FORMAT 函数将其格式化成了字符串:
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
COUNT(*)
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);
  1. 失去数据如下所示:

数据写入 ElasticSearch

  1. 确保 elasticsearch 已部署好;
  2. 执行以下语句即可创立 es 表,请依照您本人的 es 信息调整上面的参数:
CREATE TABLE pv_per_minute ( 
    start_time STRING,
    end_time STRING,
    pv_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- 类型
    'connector.version' = '6',  -- elasticsearch 版本
    'connector.hosts' = 'http://192.168.133.173:9200',  -- elasticsearch 地址
    'connector.index' = 'pv_per_minute',  -- 索引名,相当于数据库表名
    'connector.document-type' = 'user_behavior', -- type,相当于数据库库名
    'connector.bulk-flush.max-actions' = '1',  -- 每条数据都刷新
    'format.type' = 'json',  -- 输入数据格式 json
    'update-mode' = 'append'
);
  1. 执行以下语句,就会将每分钟的 pv 总数写入 es 的 pv_per_minute 索引:
INSERT INTO pv_per_minute
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time, 
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time, 
COUNT(*) AS pv_cnt
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
  1. 用 es-head 查看,发现数据已胜利写入:

联表操作

  1. 以后 user_behavior 表的 category_id 示意商品类目,例如 <font color=”blue”>11120</font> 示意计算机书籍,<font color=”blue”>61626</font> 示意牛仔裤,本次实战的数据集中,这样的类目共有五千多种;
  2. 如果咱们将这五千多品种目分成 6 个大类,例如 <font color=”blue”>11120</font> 属于教育类,<font color=”blue”>61626</font> 属于服装类,那么应该有个大类和类目标关系表;
  3. 这个大类和类目标关系表在 MySQL 创立,表名叫 <font color=”blue”>category_info</font>,建表语句如下:
CREATE TABLE `category_info`(`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
   `parent_id` bigint ,
   `category_id` bigint ,
   PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  1. 表 <font color=”blue”>category_info</font> 所有数据来自对原始数据中 <font color=”blue”>category_id</font> 字段的提取,并且随机将它们划分为 6 个大类,该表的数据请在我的 GitHub 下载:https://raw.githubusercontent…
  2. 请在 MySQL 上建表 <font color=”blue”>category_info</font>,并将上述数据全副写进去;
  3. 在 Flink SQL Client 执行以下语句创立这个维表,mysql 信息请按您本人配置调整:
CREATE TABLE category_info (
    parent_id BIGINT, -- 商品大类
    category_id BIGINT  -- 商品具体类目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo',
    'connector.table' = 'category_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);
  1. 尝试联表查问:
SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id;
  1. 如下图,联表查问胜利,每条记录都能对应大类:

  1. 再试试联表统计,每个大类的总浏览量:

SELECT C.parent_id, COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = ‘pv’
GROUP BY C.parent_id;


10. 如下图,数据是动静更新的:![在这里插入图片形容](/img/bVcKp6U)
11. 执行以下语句,能够在统计时将大类 ID 转成中文名:

SELECT CASE C.parent_id

WHEN 1 THEN '服饰鞋包'
WHEN 2 THEN '家装家饰'
WHEN 3 THEN '家电'
WHEN 4 THEN '美妆'
WHEN 5 THEN '母婴'
WHEN 6 THEN '3C 数码'
ELSE '其余'

END AS category_name,
COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = ‘pv’
GROUP BY C.parent_id;


12. 成果如下图:![在这里插入图片形容](/img/bVcKp6V)
至此,咱们借助 Flink SQL Client 体验了 Flink SQL 丰盛的性能,如果您也在学习 Flink SQL,心愿本文能给您一些参考;### 欢送关注公众号:程序员欣宸
> 微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界...

正文完
 0