共计 5489 个字符,预计需要花费 14 分钟才能阅读完成。
Flink 1.11 正式公布曾经三周了,其中最吸引我的个性就是 Hive Streaming。刚巧 Zeppelin-0.9-preview2 也在前不久公布了,所以就写了一篇 Zeppelin 上的 Flink Hive Streaming 的实战解析。本文次要从以下几局部跟大家分享:
- Hive Streaming 的意义
- Checkpoint & Dependency
- 写入 Kafka
- Hive Streaming Sink
- Hive Streaming Source
- Hive Temporal Table
Hive Streaming 的意义
很多同学可能会好奇,为什么 Flink 1.11 中,Hive Streaming 的位置这么高?它的呈现,到底能给咱们带来什么?其实在大数据畛域,始终存在两种架构 Lambda 和 Kappa:
- Lambda 架构——流批拆散,静态数据通过定时调度同步到 Hive 数仓,实时数据既会同步到 Hive,也会被实时计算引擎生产,这里就引出了一点问题。
- 数据口径问题
- 离线计算产出延时太大
- 数据冗余存储
- Kappa 架构——全副应用实时计算来产出数据,历史数据通过回溯音讯的生产位点计算,同样也有很多的问题,毕竟没有一劳永逸的架构。
- 消息中间件无奈保留全副历史数据,同样数据都是行式存储,占用空间太大
- 实时计算计算历史数据力不从心
- 无奈进行 Ad-Hoc 的剖析
为了解决这些问题,行业内推出了实时数仓,解决了大部分痛点,然而还是有些中央力不从心。比方波及到历史数据的计算怎么办?我想做 Ad-Hoc 的剖析又怎么玩?所以行业内当初都是实时数仓与离线数仓并行存在,而这又带来了更多的问题:模型须要多份、数据产出不统一、历史数据的计算等等。
而 Hive Streaming 的呈现就能够解决这些问题!再也不必多套模型了;也不须要同一个指标因为波及到历史数据,写一遍实时 SQL 再写一遍离线 SQL;Ad-Hoc 也能做了,怎么做?读 Hive Streaming 产出的表就行!
接下来,让咱们从参数配置开始,接着流式的写入 Hive,再到流式的读取 Hive 表,最初再 Join 上 Hive 维表吧。这一整套流程都体验后,想必大家对 Hive Streaming 肯定会有更深刻的理解,更可能领会到它的作用。
Checkpoint & Dependency
因为只有在实现 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,所以,咱们须要正当的去配置 Checkpoint,在 Zeppelin 中配置 Checkpoint 很简略。
%flink.conf
# checkpoint 配置
pipeline.time-characteristic EventTime
execution.checkpointing.interval 120000
execution.checkpointing.min-pause 60000
execution.checkpointing.timeout 60000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
# 依赖 jar 包配置
flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.11.0,org.apache.flink:flink-connector-kafka-base_2.11:1.11.0
又因为咱们须要从 Kafka 中读取数据,所以将 Kafka 的依赖也退出进去了。
写入 Kafka
咱们的数据来自于天池数据集,是以 CSV 的格局存在于本地磁盘,所以须要先将他们写入 Kafka。
先建一下 CSV Source 和 Kafka Sink 的表:
%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS source_csv;
CREATE TABLE source_csv (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) WITH (
'connector' = 'filesystem',
'path' = 'file:///Users/dijie/Downloads/Cloud_Theme_Click/theme_click_log.csv',
'format' = 'csv'
)
%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS kafka_table;
CREATE TABLE kafka_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string,
ts AS localtimestamp,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'theme_click_log',
'properties.bootstrap.servers' = '10.70.98.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
因为注册的表即能够读又能够写,于是我在建表时将 Watermark 加上了;又因为源数据中的工夫戳曾经很老了,所以我这里采纳以后工夫减去 5 秒作为我的 Watermark。
大家能够看到,我在语句一开始指定了 SQL 方言为 Default,这是为啥呢?还有别的方言吗?别急,听我慢慢说。
其实在之前的版本,Flink 就曾经能够和 Hive 买通,包含能够把表建在 Hive 上,然而很多语法和 Hive 不兼容,包含建的表在 Hive 中也无奈查看,次要起因就是方言不兼容。所以,在 Flink 1.11 中,为了缩小学习老本(语法不兼容),能够用 DDL 建 Hive 表并在 Hive 中查问,Flink 反对了方言,默认的就是 Default 了,就和之前一样,如果想建 Hive 表,并反对查问,请应用 Hive 方言,具体能够参考下方链接。
Hive 方言:
https://ci.apache.org/project…
再把数据从 CSV 中读取后写入 Kafka。
%flink.ssql(type=update)
insert into kafka_table select * from source_csv ;
再瞄一眼 Kafka,看看数据有没有被灌进去:
看来没问题,那么接下来让咱们写入 Hive。
Hive Streaming Sink
建一个 Hive Sink Table,记得将方言切换到 Hive,否则会有问题。
%flink.ssql
SET table.sql-dialect=hive;
DROP TABLE IF EXISTS hive_table;
CREATE TABLE hive_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) PARTITIONED BY (dt string, hr string, mi string) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
参数给大家略微解释一下:
- partition.time-extractor.timestamp-pattern:分区工夫抽取器,与 DDL 中的分区字段保持一致;
- sink.partition-commit.trigger:分区触发器类型,可选 process-time 或 partition-time。process-time:不须要下面的参数,也不须要水印,当以后工夫大于分区创立工夫 +sink.partition-commit.delay 中定义的工夫,提交分区;partition-time:须要 Source 表中定义 watermark,当 watermark > 提取到的分区工夫 +sink.partition-commit.delay 中定义的工夫,提交分区;
- sink.partition-commit.delay:相当于延时工夫;
- sink.partition-commit.policy.kind:怎么提交,个别提交胜利之后,须要告诉 metastore,这样 Hive 能力读到你最新分区的数据;如果须要合并小文件,也能够自定义 Class,通过实现 PartitionCommitPolicy 接口。
接下来让咱们把数据插入刚刚创立的 Hive Table:
%flink.ssql
insert into hive_table select user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table
让程序再跑一会儿~ 咱们先去倒一杯 95 年的 Java☕️。
而后再看看咱们的 HDFS,看看门路下的货色。
大家也能够用 Hive 自行查问看看,我呢就先卖个关子,一会儿用 Hive Streaming 来读数据。
Hive Streaming Source
因为 Hive 表下面曾经创立过了,所以这边读数据的时候间接拿来用就行了,不同的中央是须要应用 Table Hints 去笼罩参数。
Hive Streaming Source 最大的有余是,无奈读取曾经读取过的分区下新增的文件。简略来说就是,读过的分区,就不会再读了。看似很坑,不过认真想想,这样才合乎流的个性。
依旧给大家说一下参数的意思:
- stream-source.enable:不言而喻,示意是否开启流模式。
- stream-source.monitor-interval:监控新文件 / 分区产生的距离。
- stream-source.consume-order:能够选 create-time 或者 partition-time;create-time 指的不是分区创立工夫,而是在 HDFS 中文件 / 文件夹的创立工夫;partition-time 指的是分区的工夫;对于非分区表,只能用 create-time。官网这边的介绍写的有点含糊,会让人误以为能够查到曾经读过的分区下新增的文件,其实通过我的测试和翻看源码发现并不能。
- stream-source.consume-start-offset:示意从哪个分区开始读。
光说不干假把式,让咱们捞一把数据看看~
SET 那一行得带着,不然无奈应用 Table Hints。
Hive Temporal Table
看完了 Streaming Source 和 Streaming Sink,让咱们最初再试一下 Hive 作为维表吧。
其实用 Hive 维表很简略,只有是在 Hive 中存在的表,都能够当做维表应用,参数齐全能够用 Table Hints 来笼罩。
- lookup.join.cache.ttl:示意缓存工夫;这里值得注意的是,因为 Hive 维表会把维表所有数据缓存在 TM 的内存中,如果维表量很大,那么很容易就 OOM;如果 ttl 工夫太短,那么会频繁的加载数据,性能会有很大影响。
因为是 LEFT JOIN,所以维表中不存在的数据会以 NULL 补全。再看一眼 DAG 图:
大家看一下画框的中央,能看到这边是应用的维表关联 LookupJoin。
如果大家 SQL 语句写错了,丢了 for system_time as of a.p,那么 DAG 图就会变成这样:
这种就不是维表 JOIN 其实更像是流和批在 JOIN。
写在最初
Hive Streaming 的欠缺意味着买通了流批一体的最初一道壁垒,既能够做到历史数据的 OLAP 剖析,又能够实时吐出后果,这无疑是 ETL 开发者的福音,想必接下来的日子,会有更多的企业实现他们实时数仓的建设。
参考文档:
[1]https://ci.apache.org/project…
[2]https://github.com/apache/zep…
Note 下载:
https://github.com/lonelyGhos…
最初,给大家介绍一下 Flink on Zeppelin 的钉钉群,大家有问题能够在外面探讨,Apache Zeppelin PMC 简锋大佬也在外面,有问题能够间接在钉群中发问交换~
作者介绍:
狄杰,蘑菇街资深数据专家,负责蘑菇街实时计算平台。目前 Focus 在 Flink on Zeppelin,Apache Zeppelin Contributor。