乐趣区

关于flink:Flink-on-Hive构建流批一体数仓

Flink 应用 HiveCatalog 能够通过 或者 的形式来解决 Hive 中的表。这就意味着 Flink 既能够作为 Hive 的一个批处理引擎,也能够通过流解决的形式来读写 Hive 中的表,从而为实时数仓的利用和流批一体的落地实际奠定了松软的根底。本文将以 Flink1.12 为例,介绍 Flink 集成 Hive 的另外一个十分重要的方面——Hive 维表 JOIN(Temporal Table Join)与 Flink 读写 Hive 表的形式。以下是全文,心愿本文对你有所帮忙。

Flink 写入 Hive 表

Flink 反对以 批处理 (Batch) 和流解决 (Streaming) 的形式写入 Hive 表。当以批处理的形式写入 Hive 表时,只有当写入作业完结时,才能够看到写入的数据。批处理的形式写入反对 append 模式和 overwrite 模式

批处理模式写入

  • 向非分区表写入数据
Flink SQL> use catalog myhive; -- 应用 catalog
Flink SQL> INSERT INTO users SELECT 2,'tom';
Flink SQL> set execution.type=batch; -- 应用批处理模式
Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';
  • 向分区表写入数据
-- 向动态分区表写入数据
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
-- 向动静分区表写入数据
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';

流解决模式写入

流式写入 Hive 表,不反对 Insert overwrite 形式,否则报如下谬误:

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.

上面的示例是将 kafka 的数据流式写入 Hive 的分区表

-- 应用流解决模式
Flink SQL> set execution.type=streaming;
-- 应用 Hive 方言
Flink SQL> SET table.sql-dialect=hive; 
-- 创立一张 Hive 分区表
CREATE TABLE user_behavior_hive_tbl (
   `user_id` BIGINT, -- 用户 id
    `item_id` BIGINT, -- 商品 id
    `cat_id` BIGINT, -- 品类 id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT -- 用户行为产生的工夫戳
) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- 应用默认 SQL 方言
Flink SQL> SET table.sql-dialect=default; 
-- 创立一张 kafka 数据源表
CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用户 id
    `item_id` BIGINT, -- 商品 id
    `cat_id` BIGINT, -- 品类 id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为产生的工夫戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义 watermark
 ) WITH ( 
    'connector' = 'kafka', -- 应用 kafka connector
    'topic' = 'user_behaviors', -- kafka 主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格局为 json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

对于 Hive 表的一些属性解释:

  • partition.time-extractor.timestamp-pattern

    • 默认值:(none)
    • 解释:分区工夫抽取器,与 DDL 中的分区字段保持一致, 如果是按天分区,则能够是 $dt,如果是按年(year) 月(month)日 (day) 时(hour)进行分区,则该属性值为:$year-$month-$day $hour:00:00,如果是按地利进行分区,则该属性值为:$dt $hour:00:00;
  • sink.partition-commit.trigger

    • 默认值:process-time
    • 解释:分区触发器类型,可选 process-time 或 partition-time

      • process-time:不须要工夫提取器和水位线,当以后工夫大于分区创立工夫 + sink.partition-commit.delay 中定义的工夫,提交分区;
      • partition-time:须要 Source 表中定义 watermark,当 watermark > 提取到的分区工夫 +sink.partition-commit.delay 中定义的工夫,提交分区;
  • sink.partition-commit.delay

    • 默认值:0S
    • 解释:分区提交的延时工夫,如果是按天分区,则该属性的值为:1d,如果是按小时分区,则该属性值为1h;
  • sink.partition-commit.policy.kind

    • 默认值:(none)
    • 解释:提交分区的策略,用于告诉上游的利用该分区曾经实现了写入,也就是说该分区的数据能够被拜访读取。可选的值如下:

      • metastore:增加分区的元数据信息,仅 Hive 表反对该值配置
      • success-file:在表的存储门路下增加一个 _SUCCESS 文件

      能够同时配置下面的两个值,比方metastore,success-file

执行流式写入 Hive 表

-- streaming sql, 将数据写入 Hive 表
INSERT INTO user_behavior_hive_tbl 
SELECT 
    user_id,
    item_id,
    cat_id,
    action,
    province,
    ts,
    FROM_UNIXTIME(ts, 'yyyy-MM-dd'),
    FROM_UNIXTIME(ts, 'HH'),
    FROM_UNIXTIME(ts, 'mm')
FROM user_behavior;

-- batch sql, 查问 Hive 表的分区数据
SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND  hr='16' AND mi = '46';

同时查看 Hive 表的分区数据:

尖叫提醒:

1.Flink 读取 Hive 表默认应用的是 batch 模式,如果要应用流式读取 Hive 表,须要而外指定一些参数,见下文。

2. 只有在实现 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成 _SUCCESS 文件,所以,Flink 流式写入 Hive 表须要开启并配置 Checkpoint。对于 Flink SQL Client 而言,须要在 flink-conf.yaml 中开启 CheckPoint,配置内容为:

state.backend: filesystem
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints

Flink 读取 Hive 表

Flink 反对以 批处理 (Batch) 和流解决 (Streaming) 的形式读取 Hive 中的表。批处理的形式与 Hive 的自身查问相似,即只在提交查问的时刻查问一次 Hive 表。流解决的形式将会继续地监控 Hive 表,并且会增量地提取新的数据。默认状况下,Flink 是以批处理的形式读取 Hive 表

对于流式读取 Hive 表,Flink 既反对分区表又反对非分区表。对于分区表而言,Flink 将会监控新产生的分区数据,并以增量的形式读取这些数据。对于非分区表,Flink 会监控 Hive 表存储门路文件夹外面的新文件,并以增量的形式读取新的数据。

Flink 读取 Hive 表能够配置一下参数:

  • streaming-source.enable

    • 默认值:false
    • 解释:是否开启流式读取 Hive 表,默认不开启。
  • streaming-source.partition.include

    • 默认值:all
    • 解释:配置读取 Hive 的分区,包含两种形式:all 和 latest。all 意味着读取所有分区的数据,latest 示意只读取最新的分区数据。值得注意的是,latest 形式只能用于开启了流式读取 Hive 表,并用于维表 JOIN 的场景。
  • streaming-source.monitor-interval

    • 默认值:None
    • 解释:继续监控 Hive 表分区或者文件的工夫距离。值得注意的是,当以流的形式读取 Hive 表时,该参数的默认值是1m,即 1 分钟。当 temporal join 时,默认的值是60m,即 1 小时。另外,该参数配置不宜过短,最短是 1 个小时,因为目前的实现是每个 task 都会查问 metastore,高频的查可能会对 metastore 产生过大的压力。
  • streaming-source.partition-order

    • 默认值:partition-name
    • 解释:streaming source 的分区程序。默认的是partition-name,示意应用默认分区名称程序加载最新分区,也是举荐应用的形式。除此之外还有两种形式,别离为:create-time 和 partition-time。其中 create-time 示意应用分区文件创建工夫程序。partition-time 示意应用分区工夫程序。指的留神的是,对于非分区表,该参数的默认值为:create-time
  • streaming-source.consume-start-offset

    • 默认值:None
    • 解释:流式读取 Hive 表的起始偏移量。
  • partition.time-extractor.kind
    • 默认值:default
    • 分区工夫提取器类型。用于从分区中提取工夫,反对 default 和自定义。如果应用 default,则须要通过参数 partition.time-extractor.timestamp-pattern 配置工夫戳提取的正则表达式。

    在 SQL Client 中须要显示地开启 SQL Hint 性能

Flink SQL> set table.dynamic-table-options.enabled= true;  

应用 SQLHint 流式查问 Hive 表


SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */;

Hive 维表 JOIN

Flink 1.12 反对了 Hive 最新的分区作为时态表的性能,能够通过 SQL 的形式间接关联 Hive 分区表的最新分区,并且会主动监听最新的 Hive 分区,当监控到新的分区后,会主动地做维表数据的全量替换。

Flink 反对的是 processing-time 的 temporal join,也就是说总是与最新版本的时态表进行 JOIN。另外,Flink 既反对非分区表的 temporal join,又反对分区表的 temporal join。对于分区表而言,Flink 会监听 Hive 表的最新分区数据。值得注意的是,Flink 尚不反对 event-time temporal join。

Temporal Join 最新分区

对于一张随着工夫变动的 Hive 分区表,Flink 能够读取该表的数据作为一个无界流。如果 Hive 分区表的每个分区都蕴含全量的数据,那么每个分区将做为一个时态表的版本数据,行将最新的分区数据作为一个全量维表数据。值得注意的是,该性能特点仅反对 Flink 的 STREAMING 模式。

应用 Hive 最新分区作为 Tempmoral table 之前,须要设置必要的两个参数:

'streaming-source.enable' = 'true',  
'streaming-source.partition.include' = 'latest'

除此之外还有一些其余的参数,对于参数的解释见下面的剖析。咱们在应用 Hive 维表的时候,既能够在创立 Hive 表时指定具体的参数,也能够应用 SQL Hint 的形式动静指定参数。一个 Hive 维表的创立模板如下:

-- 应用 Hive 的 sql 方言
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (-- 形式 1:依照分区名排序来辨认最新分区(举荐应用该种形式)
  'streaming-source.enable' = 'true', -- 开启 Streaming source
  'streaming-source.partition.include' = 'latest',-- 抉择最新分区
  'streaming-source.monitor-interval' = '12 h',-- 每 12 小时加载一次最新分区数据
  'streaming-source.partition-order' = 'partition-name',  -- 依照分区名排序

  -- 形式 2: 分区文件的创立工夫排序来辨认最新分区
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.partition-order' = 'create-time',-- 分区文件的创立工夫排序
  'streaming-source.monitor-interval' = '12 h'

  -- 形式 3: 依照分区工夫排序来辨认最新分区
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-time', -- 依照分区工夫排序
  'partition.time-extractor.kind' = 'default',
  'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' 
);

有了下面的 Hive 维表,咱们就能够应用该维表与 Kafka 的实时流数据进行 JOIN,失去相应的宽表数据。

-- 应用 default sql 方言
SET table.sql-dialect=default;
-- kafka 实时流数据表
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()) WITH (...);

-- 将流表与 hive 最新分区数据关联 
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim 
ON orders.product_id = dim.product_id;

除了在定义 Hive 维表时指定相干的参数,咱们还能够通过 SQL Hint 的形式动静指定相干的参数,具体形式如下:

SELECT *
FROM orders_table AS orders
JOIN dimension_table
/*+ OPTIONS('streaming-source.enable'='true',             
    'streaming-source.partition.include' = 'latest',
    'streaming-source.monitor-interval' = '1 h',
    'streaming-source.partition-order' = 'partition-name') */
FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 时态表(维表)
ON orders.product_id = dim.product_id;

Temporal Join 最新表

对于 Hive 的非分区表,当应用 temporal join 时,整个 Hive 表会被缓存到 Slot 内存中,而后依据流中的数据对应的 key 与其进行匹配。应用最新的 Hive 表进行 temporal join 不须要进行额定的配置,咱们只须要配置一个 Hive 表缓存的 TTL 工夫,该工夫的作用是:当缓存过期时,就会从新扫描 Hive 表并加载最新的数据。

  • lookup.join.cache.ttl

    • 默认值:60min
    • 解释:示意缓存工夫。因为 Hive 维表会把维表所有数据缓存在 TM 的内存中,当维表数据量很大时,很容易造成 OOM。当然 TTL 的工夫也不能太短,因为会频繁地加载数据,从而影响性能。

    尖叫提醒:

    当应用此种形式时,Hive 表必须是有界的 lookup 表,即非 Streaming Source 的时态表,换句话说,该表的属性streaming-source.enable = false

    如果要应用 Streaming Source 的时态表,记得配置 streaming-source.monitor-interval 的值,即数据更新的工夫距离。

-- Hive 维表数据应用批处理的形式按天装载
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) TBLPROPERTIES (
  'streaming-source.enable' = 'false', -- 敞开 streaming source
  'streaming-source.partition.include' = 'all',  -- 读取所有数据
  'lookup.join.cache.ttl' = '12 h'
);
-- kafka 事实表
SET table.sql-dialect=default;
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()) WITH (...);

-- Hive 维表 join,Flink 会加载该维表的所有数据到内存中
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim
ON orders.product_id = dim.product_id;

尖叫提醒:

1. 每一个子工作都须要缓存一份维表的全量数据,肯定要确保 TM 的 task Slot 大小可能包容维表的数据量;

2. 举荐将 streaming-source.monitor-interval 和 lookup.join.cache.ttl 的值设为一个较大的数,因为频繁的更新和加载数据会影响性能。

3. 当缓存的维表数据须要从新刷新时,目前的做法是将整个表进行加载,因而不可能将新数据与旧数据辨别开来。

Hive 维表 JOIN 示例

假如维表的数据是通过批处理的形式 (比方每天) 装载至 Hive 中,而 Kafka 中的事实流数据须要与该维表进行 JOIN,从而构建一个宽表数据,这个时候就能够应用 Hive 的维表 JOIN。

  • 创立一张 kafka 数据源表, 实时流
SET table.sql-dialect=default;
CREATE TABLE fact_user_behavior ( 
    `user_id` BIGINT, -- 用户 id
    `item_id` BIGINT, -- 商品 id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为产生的工夫戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义 watermark
 ) WITH ( 
    'connector' = 'kafka', -- 应用 kafka connector
    'topic' = 'user_behaviors', -- kafka 主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格局为 json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);
  • 创立一张 Hive 维表
SET table.sql-dialect=hive;
CREATE TABLE dim_item (
  item_id BIGINT,
  item_name STRING,
  unit_price DECIMAL(10, 4)
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-name'
);
  • 关联 Hive 维表的最新数据
SELECT 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;

应用 SQL Hint 形式,关联非分区的 Hive 维表:

set table.dynamic-table-options.enabled= true; 
SELECT 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item1
/*+ OPTIONS('streaming-source.enable'='false',             
    'streaming-source.partition.include' = 'all',
    'lookup.join.cache.ttl' = '12 h') */
FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;

总结

本文以最新版本的 Flink1.12 为例,介绍了 Flink 读写 Hive 的不同形式,并对每种形式给出了相应的应用示例。在理论利用中,通常有将实时数据流与 Hive 维表 join 来结构宽表的需要,Flink 提供了 Hive 维表 JOIN,能够简化用户应用的复杂度。本文在最初具体阐明了 Flink 进行 Hive 维表 JOIN 的根本步骤以及应用示例,心愿对你有所帮忙。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版