Flink应用HiveCatalog能够通过批或者流的形式来解决Hive中的表。这就意味着Flink既能够作为Hive的一个批处理引擎,也能够通过流解决的形式来读写Hive中的表,从而为实时数仓的利用和流批一体的落地实际奠定了松软的根底。本文将以Flink1.12为例,介绍Flink集成Hive的另外一个十分重要的方面——Hive维表JOIN(Temporal Table Join)与Flink读写Hive表的形式。以下是全文,心愿本文对你有所帮忙。
Flink写入Hive表
Flink反对以批处理(Batch)和流解决(Streaming)的形式写入Hive表。当以批处理的形式写入Hive表时,只有当写入作业完结时,才能够看到写入的数据。批处理的形式写入反对append模式和overwrite模式。
批处理模式写入
- 向非分区表写入数据
Flink SQL> use catalog myhive; -- 应用catalogFlink SQL> INSERT INTO users SELECT 2,'tom';Flink SQL> set execution.type=batch; -- 应用批处理模式Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';
- 向分区表写入数据
-- 向动态分区表写入数据Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;-- 向动静分区表写入数据Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
流解决模式写入
流式写入Hive表,不反对Insert overwrite 形式,否则报如下谬误:
[ERROR] Could not execute SQL statement. Reason:java.lang.IllegalStateException: Streaming mode not support overwrite.
上面的示例是将kafka的数据流式写入Hive的分区表
-- 应用流解决模式Flink SQL> set execution.type=streaming;-- 应用Hive方言Flink SQL> SET table.sql-dialect=hive; -- 创立一张Hive分区表CREATE TABLE user_behavior_hive_tbl ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品类id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT -- 用户行为产生的工夫戳) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file');-- 应用默认SQL方言Flink SQL> SET table.sql-dialect=default; -- 创立一张kafka数据源表CREATE TABLE user_behavior ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品类id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT, -- 用户行为产生的工夫戳 `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列 `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark ) WITH ( 'connector' = 'kafka', -- 应用 kafka connector 'topic' = 'user_behaviors', -- kafka主题 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 数据源格局为json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false');
对于Hive表的一些属性解释:
partition.time-extractor.timestamp-pattern
- 默认值:(none)
- 解释:分区工夫抽取器,与 DDL 中的分区字段保持一致,如果是按天分区,则能够是$dt,如果是按年(year)月(month)日(day)时(hour)进行分区,则该属性值为:
$year-$month-$day $hour:00:00
,如果是按地利进行分区,则该属性值为:$dt $hour:00:00
;
sink.partition-commit.trigger
- 默认值:process-time
解释:分区触发器类型,可选 process-time 或partition-time。
- process-time:不须要工夫提取器和水位线,当以后工夫大于分区创立工夫 + sink.partition-commit.delay 中定义的工夫,提交分区;
- partition-time:须要 Source 表中定义 watermark,当 watermark > 提取到的分区工夫 +sink.partition-commit.delay 中定义的工夫,提交分区;
sink.partition-commit.delay
- 默认值:0S
- 解释:分区提交的延时工夫,如果是按天分区,则该属性的值为:1d,如果是按小时分区,则该属性值为1h;
sink.partition-commit.policy.kind
- 默认值:(none)
解释:提交分区的策略,用于告诉上游的利用该分区曾经实现了写入,也就是说该分区的数据能够被拜访读取。可选的值如下:
- metastore:增加分区的元数据信息,仅Hive表反对该值配置
- success-file:在表的存储门路下增加一个
_SUCCESS
文件
能够同时配置下面的两个值,比方metastore,success-file
执行流式写入Hive表
-- streaming sql,将数据写入Hive表INSERT INTO user_behavior_hive_tbl SELECT user_id, item_id, cat_id, action, province, ts, FROM_UNIXTIME(ts, 'yyyy-MM-dd'), FROM_UNIXTIME(ts, 'HH'), FROM_UNIXTIME(ts, 'mm')FROM user_behavior;-- batch sql,查问Hive表的分区数据SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND hr='16' AND mi = '46';
同时查看Hive表的分区数据:
尖叫提醒:1.Flink读取Hive表默认应用的是batch模式,如果要应用流式读取Hive表,须要而外指定一些参数,见下文。
2.只有在实现 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成
_SUCCESS
文件,所以,Flink流式写入Hive表须要开启并配置 Checkpoint。对于Flink SQL Client而言,须要在flink-conf.yaml中开启CheckPoint,配置内容为:state.backend: filesystem
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints
Flink读取Hive表
Flink反对以批处理(Batch)和流解决(Streaming)的形式读取Hive中的表。批处理的形式与Hive的自身查问相似,即只在提交查问的时刻查问一次Hive表。流解决的形式将会继续地监控Hive表,并且会增量地提取新的数据。默认状况下,Flink是以批处理的形式读取Hive表。
对于流式读取Hive表,Flink既反对分区表又反对非分区表。对于分区表而言,Flink将会监控新产生的分区数据,并以增量的形式读取这些数据。对于非分区表,Flink会监控Hive表存储门路文件夹外面的新文件,并以增量的形式读取新的数据。
Flink读取Hive表能够配置一下参数:
streaming-source.enable
- 默认值:false
- 解释:是否开启流式读取 Hive 表,默认不开启。
streaming-source.partition.include
- 默认值:all
- 解释:配置读取Hive的分区,包含两种形式:all和latest。all意味着读取所有分区的数据,latest示意只读取最新的分区数据。值得注意的是,latest形式只能用于开启了流式读取Hive表,并用于维表JOIN的场景。
streaming-source.monitor-interval
- 默认值:None
- 解释:继续监控Hive表分区或者文件的工夫距离。值得注意的是,当以流的形式读取Hive表时,该参数的默认值是1m,即1分钟。当temporal join时,默认的值是60m,即1小时。另外,该参数配置不宜过短 ,最短是1 个小时,因为目前的实现是每个 task 都会查问 metastore,高频的查可能会对metastore 产生过大的压力。
streaming-source.partition-order
- 默认值:partition-name
- 解释:streaming source的分区程序。默认的是partition-name,示意应用默认分区名称程序加载最新分区,也是举荐应用的形式。除此之外还有两种形式,别离为:create-time和partition-time。其中create-time示意应用分区文件创建工夫程序。partition-time示意应用分区工夫程序。指的留神的是,对于非分区表,该参数的默认值为:create-time。
streaming-source.consume-start-offset
- 默认值:None
- 解释:流式读取Hive表的起始偏移量。
partition.time-extractor.kind
- 默认值:default
- 分区工夫提取器类型。用于从分区中提取工夫,反对default和自定义。如果应用default,则须要通过参数
partition.time-extractor.timestamp-pattern
配置工夫戳提取的正则表达式。
在 SQL Client 中须要显示地开启 SQL Hint 性能
Flink SQL> set table.dynamic-table-options.enabled= true;
应用SQLHint流式查问Hive表
SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */;
Hive维表JOIN
Flink 1.12 反对了 Hive 最新的分区作为时态表的性能,能够通过 SQL 的形式间接关联 Hive 分区表的最新分区,并且会主动监听最新的 Hive 分区,当监控到新的分区后,会主动地做维表数据的全量替换。
Flink反对的是processing-time的temporal join,也就是说总是与最新版本的时态表进行JOIN。另外,Flink既反对非分区表的temporal join,又反对分区表的temporal join。对于分区表而言,Flink会监听Hive表的最新分区数据。值得注意的是,Flink尚不反对 event-time temporal join。
Temporal Join最新分区
对于一张随着工夫变动的Hive分区表,Flink能够读取该表的数据作为一个无界流。如果Hive分区表的每个分区都蕴含全量的数据,那么每个分区将做为一个时态表的版本数据,行将最新的分区数据作为一个全量维表数据。值得注意的是,该性能特点仅反对Flink的STREAMING模式。
应用 Hive 最新分区作为 Tempmoral table 之前,须要设置必要的两个参数:
'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest'
除此之外还有一些其余的参数,对于参数的解释见下面的剖析。咱们在应用Hive维表的时候,既能够在创立Hive表时指定具体的参数,也能够应用SQL Hint的形式动静指定参数。一个Hive维表的创立模板如下:
-- 应用Hive的sql方言SET table.sql-dialect=hive;CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP(3), update_user STRING, ...) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES ( -- 形式1:依照分区名排序来辨认最新分区(举荐应用该种形式) 'streaming-source.enable' = 'true', -- 开启Streaming source 'streaming-source.partition.include' = 'latest',-- 抉择最新分区 'streaming-source.monitor-interval' = '12 h',-- 每12小时加载一次最新分区数据 'streaming-source.partition-order' = 'partition-name', -- 依照分区名排序 -- 形式2:分区文件的创立工夫排序来辨认最新分区 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.partition-order' = 'create-time',-- 分区文件的创立工夫排序 'streaming-source.monitor-interval' = '12 h' -- 形式3:依照分区工夫排序来辨认最新分区 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-time', -- 依照分区工夫排序 'partition.time-extractor.kind' = 'default', 'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' );
有了下面的Hive维表,咱们就能够应用该维表与Kafka的实时流数据进行JOIN,失去相应的宽表数据。
-- 应用default sql方言SET table.sql-dialect=default;-- kafka实时流数据表CREATE TABLE orders_table ( order_id STRING, order_amount DOUBLE, product_id STRING, log_ts TIMESTAMP(3), proctime as PROCTIME()) WITH (...);-- 将流表与hive最新分区数据关联 SELECT *FROM orders_table AS ordersJOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim ON orders.product_id = dim.product_id;
除了在定义Hive维表时指定相干的参数,咱们还能够通过SQL Hint的形式动静指定相干的参数,具体形式如下:
SELECT *FROM orders_table AS ordersJOIN dimension_table/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '1 h', 'streaming-source.partition-order' = 'partition-name') */FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 时态表(维表)ON orders.product_id = dim.product_id;
Temporal Join最新表
对于Hive的非分区表,当应用temporal join时,整个Hive表会被缓存到Slot内存中,而后依据流中的数据对应的key与其进行匹配。应用最新的Hive表进行temporal join不须要进行额定的配置,咱们只须要配置一个Hive表缓存的TTL工夫,该工夫的作用是:当缓存过期时,就会从新扫描Hive表并加载最新的数据。
lookup.join.cache.ttl
- 默认值:60min
- 解释:示意缓存工夫。因为 Hive 维表会把维表所有数据缓存在 TM 的内存中,当维表数据量很大时,很容易造成 OOM。当然TTL的工夫也不能太短,因为会频繁地加载数据,从而影响性能。
尖叫提醒:
当应用此种形式时,Hive表必须是有界的lookup表,即非Streaming Source的时态表,换句话说,该表的属性streaming-source.enable = false。
如果要应用Streaming Source的时态表,记得配置streaming-source.monitor-interval的值,即数据更新的工夫距离。
-- Hive维表数据应用批处理的形式按天装载SET table.sql-dialect=hive;CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP(3), update_user STRING, ...) TBLPROPERTIES ( 'streaming-source.enable' = 'false', -- 敞开streaming source 'streaming-source.partition.include' = 'all', -- 读取所有数据 'lookup.join.cache.ttl' = '12 h');-- kafka事实表SET table.sql-dialect=default;CREATE TABLE orders_table ( order_id STRING, order_amount DOUBLE, product_id STRING, log_ts TIMESTAMP(3), proctime as PROCTIME()) WITH (...);-- Hive维表join,Flink会加载该维表的所有数据到内存中SELECT *FROM orders_table AS ordersJOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dimON orders.product_id = dim.product_id;
尖叫提醒:1.每一个子工作都须要缓存一份维表的全量数据,肯定要确保TM的task Slot 大小可能包容维表的数据量;
2.举荐将streaming-source.monitor-interval和lookup.join.cache.ttl的值设为一个较大的数,因为频繁的更新和加载数据会影响性能。
3.当缓存的维表数据须要从新刷新时,目前的做法是将整个表进行加载,因而不可能将新数据与旧数据辨别开来。
Hive维表JOIN示例
假如维表的数据是通过批处理的形式(比方每天)装载至Hive中,而Kafka中的事实流数据须要与该维表进行JOIN,从而构建一个宽表数据,这个时候就能够应用Hive的维表JOIN。
- 创立一张kafka数据源表,实时流
SET table.sql-dialect=default;CREATE TABLE fact_user_behavior ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT, -- 用户行为产生的工夫戳 `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列 `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark ) WITH ( 'connector' = 'kafka', -- 应用 kafka connector 'topic' = 'user_behaviors', -- kafka主题 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 数据源格局为json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false');
- 创立一张Hive维表
SET table.sql-dialect=hive;CREATE TABLE dim_item ( item_id BIGINT, item_name STRING, unit_price DECIMAL(10, 4)) PARTITIONED BY (dt STRING) TBLPROPERTIES ( 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-name');
- 关联Hive维表的最新数据
SELECT fact.item_id, dim.item_name, count(*) AS buy_cntFROM fact_user_behavior AS factLEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dimON fact.item_id = dim.item_idWHERE fact.action = 'buy'GROUP BY fact.item_id,dim.item_name;
应用SQL Hint形式,关联非分区的Hive维表:
set table.dynamic-table-options.enabled= true; SELECT fact.item_id, dim.item_name, count(*) AS buy_cntFROM fact_user_behavior AS factLEFT JOIN dim_item1/*+ OPTIONS('streaming-source.enable'='false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '12 h') */FOR SYSTEM_TIME AS OF fact.proctime AS dimON fact.item_id = dim.item_idWHERE fact.action = 'buy'GROUP BY fact.item_id,dim.item_name;
总结
本文以最新版本的Flink1.12为例,介绍了Flink读写Hive的不同形式,并对每种形式给出了相应的应用示例。在理论利用中,通常有将实时数据流与 Hive 维表 join 来结构宽表的需要,Flink提供了Hive维表JOIN,能够简化用户应用的复杂度。本文在最初具体阐明了Flink进行Hive维表JOIN的根本步骤以及应用示例,心愿对你有所帮忙。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包