共计 7081 个字符,预计需要花费 18 分钟才能阅读完成。
在上一篇分享 Flink 集成 Hive 之疾速入门 – 以 Flink1.12 为例中,介绍了 Flink 集成 Hive 的进本步骤。本文分享,将持续介绍 Flink 集成 Hive 的另外两个概念:Hive Catalog 与 Hive Dialect。本文包含以下内容,心愿对你有所帮忙。
- 什么是 Hive Catalog
- 如何应用 Hive Catalog
- 什么是 Hive Dialect
- 如何应用 Hive Dialect
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包
什么是 Hive Catalog
咱们晓得,Hive 应用 Hive Metastore(HMS)存储元数据信息,应用关系型数据库来长久化存储这些信息。所以,Flink 集成 Hive 须要买通 Hive 的 metastore,去治理 Flink 的元数据,这就是 Hive Catalog 的性能。
Hive Catalog 的次要作用是应用 Hive MetaStore 去治理 Flink 的元数据。Hive Catalog 能够将元数据进行长久化,这样后续的操作就能够重复应用这些表的元数据,而不必每次应用时都要从新注册。如果不去长久化 catalog,那么在每个 session 中取解决数据,都要去反复地创立元数据对象,这样是十分耗时的。
如何应用 Hive Catalog
HiveCatalog 是开箱即用的,所以,一旦配置好 Flink 与 Hive 集成,就能够应用 HiveCatalog。比方,咱们通过 FlinkSQL 的 DDL 语句创立一张 kafka 的数据源表,立即就能查看该表的元数据信息。
HiveCatalog 能够解决两种类型的表:一种是 Hive 兼容的表,另一种是一般表(generic table)。其中 Hive 兼容表是以兼容 Hive 的形式来存储的,所以,对于 Hive 兼容表而言,咱们既能够应用 Flink 去操作该表,又能够应用 Hive 去操作该表。
一般表是对 Flink 而言的,当应用 HiveCatalog 创立一张一般表,仅仅是应用 Hive MetaStore 将其元数据进行了长久化,所以能够通过 Hive 查看这些表的元数据信息(通过 DESCRIBE FORMATTED 命令),然而不能通过 Hive 去解决这些表,因为语法不兼容。
对于是否是一般表,Flink 应用 is_generic 属性进行标识。默认状况下,创立的表是一般表,即is_generic=true,如果要创立 Hive 兼容表,须要在建表属性中指定is_generic=false。
尖叫提醒:
因为依赖 Hive Metastore,所以必须开启 Hive MetaStore 服务
代码中应用 Hive Catalog
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 应用注册的 catalog
tableEnv.useCatalog("myhive");
Flink SQLCli 中应用 Hive Catalog
在 FlinkSQL Cli 中应用 Hive Catalog 很简略,只须要配置一下 sql-cli-defaults.yaml 文件即可。配置内容如下:
catalogs:
- name: myhive
type: hive
default-database: default
hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
在 FlinkSQL Cli 中创立一张 kafka 表,该表默认为一般表,即 is_generic=true
CREATE TABLE user_behavior (
`user_id` BIGINT, -- 用户 id
`item_id` BIGINT, -- 商品 id
`cat_id` BIGINT, -- 品类 id
`action` STRING, -- 用户行为
`province` INT, -- 用户所在的省份
`ts` BIGINT, -- 用户行为产生的工夫戳
`proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义 watermark
) WITH (
'connector' = 'kafka', -- 应用 kafka connector
'topic' = 'user_behavior', -- kafka 主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 数据源格局为 json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
咱们能够在 Hive 客户端中查看该表的元数据信息
hive (default)> desc formatted user_behavior;
Table Parameters:
...
is_generic true
...
从下面的元数据信息能够看出,is_generic=true,阐明该表是一张一般表,如果在 Hive 中去查看该表,则会报错。
下面创立的表是一般表,该表不能应用 Hive 去查问。那么,该如何创立一张 Hive 兼容表呢?咱们只须要在建表的属性中显示指定 is_generic=false 即可,具体如下:
CREATE TABLE hive_compatible_tbl (
`user_id` BIGINT, -- 用户 id
`item_id` BIGINT, -- 商品 id
`cat_id` BIGINT, -- 品类 id
`action` STRING, -- 用户行为
`province` INT, -- 用户所在的省份
`ts` BIGINT -- 用户行为产生的工夫戳
) WITH (
'connector' = 'kafka', -- 应用 kafka connector
'topic' = 'user_behavior', -- kafka 主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 数据源格局为 json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false',
'is_generic' = 'false'
);
当咱们在 Hive 中查看该表的元数据信息时,能够看出:is_generic =false
hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:
...
is_generic false
...
咱们能够应用 FlinkSQL Cli 或者 HiveCli 向该表中写入数据,而后别离通过 FlinkSQL Cli 和 Hive Cli 去查看该表数据的变动
hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;
再在 FlinkSQL Cli 中查看该表,
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
user_id item_id action
2020 1221 buy
同样,咱们能够在 FlinkSQL Cli 中去向该表中写入数据:
Flink SQL> insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
user_id item_id action
2020 1221 buy
2020 1222 fav
尖叫提醒:
对于 Hive 兼容的表,须要留神数据类型,具体的数据类型对应关系以及留神点如下
Flink 数据类型 | Hive 数据类型 |
---|---|
CHAR(p) | CHAR(p) |
VARCHAR(p) | VARCHAR(p) |
STRING | STRING |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
DATE | DATE |
TIMESTAMP(9) | TIMESTAMP |
BYTES | BINARY |
ARRAY<T> | LIST<T> |
MAP<K, V> | MAP<K, V> |
ROW | STRUCT |
留神:
- Hive
CHAR(p)
类型的最大长度为 255 - Hive
VARCHAR(p)
类型的最大长度为 65535 - Hive
MAP
类型的 key 仅反对根本类型,而 Flink’sMAP
类型的 key 执行任意类型 - Hive 不反对联结数据类型,比方 STRUCT
- Hive’s
TIMESTAMP
的精度是 9,Hive UDFs 函数只能解决 precision <= 9 的TIMESTAMP
值 - Hive 不反对 Flink 提供的
TIMESTAMP_WITH_TIME_ZONE
,TIMESTAMP_WITH_LOCAL_TIME_ZONE
, 及MULTISET
类型 - Flink
INTERVAL
类型与 HiveINTERVAL
类型不一样
下面介绍了一般表和 Hive 兼容表,那么咱们该如何应用 Hive 的语法进行建表呢?这个时候就须要应用Hive Dialect。
什么是 Hive Dialect
从 Flink1.11.0 开始,只有开启了 Hive dialect 配置,用户就能够应用 HiveQL 语法,这样咱们就能够在 Flink 中应用 Hive 的语法应用一些 DDL 和 DML 操作。
Flink 目前反对两种 SQL 方言(SQL dialects), 别离为:default 和 hive。默认的 SQL 方言是default,如果要应用 Hive 的语法,须要将 SQL 方言切换到hive。
如何应用 Hive Dialect
在 SQL Cli 中应用 Hive dialect
应用 hive dialect 只须要配置一个参数即可,该参数名称为:table.sql-dialect。咱们就能够在 sql-client-defaults.yaml 配置文件中进行配置,也能够在具体的会话窗口中进行设定,对于 SQL dialect 的切换,不须要进行重启 session。
execution:
planner: blink
type: batch
result-mode: table
configuration:
table.sql-dialect: hive
如果咱们须要在 SQL Cli 中进行切换 hive dialect,能够应用如下命令:
Flink SQL> set table.sql-dialect=hive; -- 应用 hive dialect
Flink SQL> set table.sql-dialect=default; -- 应用 default dialect
尖叫提醒:
一旦切换到了 hive dialect,就只能应用 Hive 的语法建表,如果尝试应用 Flink 的语法建表,则会报错
在 Table API 中配合 dialect
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 应用 hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 应用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
操作示例
Flink SQL> set table.sql-dialect=hive;
-- 应用 Hive 语法创立一张表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
`id` int COMMENT 'id',
`name` string COMMENT '名称',
`age` int COMMENT '年龄'
)
COMMENT 'hive dialect 表测试'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
进入 Hive 客户端去查看该表的元数据信息
desc formatted hive_dialect_tbl;
col_name data_type comment
# col_name data_type comment
id int
name string
age int
# Detailed Table Information
Database: default
Owner: null
CreateTime: Mon Dec 21 17:23:48 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl
Table Type: MANAGED_TABLE
Table Parameters:
comment hive dialect 表测试
is_generic false
transient_lastDdlTime 1608542628
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
field.delim ,
serialization.format ,
很显著,该表是一张 Hive 兼容表,即is_generic=false。
应用 FlinkSQLCli 向该表中写入一条数据:
Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;
咱们也能够在 Hive 的 Cli 中去操作该表
hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;
以下是应用 Hive 方言的一些注意事项。
- Hive dialect 只能用于操作 Hive 表,不能用于一般表。Hive 方言应与 HiveCatalog 一起应用。
- 尽管所有 Hive 版本都反对雷同的语法,然而是否有特定性能依然取决于应用的 Hive 版本。例如,仅在 Hive-2.4.0 或更高版本中反对更新数据库地位。
- Hive 和 Calcite 具备不同的保留关键字。例如,
default
在 Calcite 中是保留关键字,在 Hive 中是非保留关键字。所以,在应用 Hive dialect 时,必须应用反引号(`)援用此类关键字,能力将其用作标识符。 - 在 Hive 中不能查问在 Flink 中创立的视图。
当然,一旦开启了 Hive dialect,咱们就能够依照 Hive 的操作形式在 Flink 中去解决 Hive 的数据了,具体的操作与 Hive 统一,本文不再赘述。
总结
本文次要介绍了 Hive Catalog 和 Hive Dialect。其中 Hive Catalog 的作用是长久化 Flink 的元数据信息,Hive Dialect 是反对 Hive 语法的一个配置参数,这两个概念是 Flink 集成 Hive 的要害。下一篇分享将介绍如何应用 Flink 读写 Hive。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包