关于flink:Flink集成Hive之Hive-Catalog与Hive-Dialect以Flink112

65次阅读

共计 7081 个字符,预计需要花费 18 分钟才能阅读完成。

在上一篇分享 Flink 集成 Hive 之疾速入门 – 以 Flink1.12 为例中,介绍了 Flink 集成 Hive 的进本步骤。本文分享,将持续介绍 Flink 集成 Hive 的另外两个概念:Hive Catalog 与 Hive Dialect。本文包含以下内容,心愿对你有所帮忙。

  • 什么是 Hive Catalog
  • 如何应用 Hive Catalog
  • 什么是 Hive Dialect
  • 如何应用 Hive Dialect

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

什么是 Hive Catalog

咱们晓得,Hive 应用 Hive Metastore(HMS)存储元数据信息,应用关系型数据库来长久化存储这些信息。所以,Flink 集成 Hive 须要买通 Hive 的 metastore,去治理 Flink 的元数据,这就是 Hive Catalog 的性能。

Hive Catalog 的次要作用是应用 Hive MetaStore 去治理 Flink 的元数据。Hive Catalog 能够将元数据进行长久化,这样后续的操作就能够重复应用这些表的元数据,而不必每次应用时都要从新注册。如果不去长久化 catalog,那么在每个 session 中取解决数据,都要去反复地创立元数据对象,这样是十分耗时的。

如何应用 Hive Catalog

HiveCatalog 是开箱即用的,所以,一旦配置好 Flink 与 Hive 集成,就能够应用 HiveCatalog。比方,咱们通过 FlinkSQL 的 DDL 语句创立一张 kafka 的数据源表,立即就能查看该表的元数据信息。

HiveCatalog 能够解决两种类型的表:一种是 Hive 兼容的表,另一种是一般表(generic table)。其中 Hive 兼容表是以兼容 Hive 的形式来存储的,所以,对于 Hive 兼容表而言,咱们既能够应用 Flink 去操作该表,又能够应用 Hive 去操作该表。

一般表是对 Flink 而言的,当应用 HiveCatalog 创立一张一般表,仅仅是应用 Hive MetaStore 将其元数据进行了长久化,所以能够通过 Hive 查看这些表的元数据信息(通过 DESCRIBE FORMATTED 命令),然而不能通过 Hive 去解决这些表,因为语法不兼容。

对于是否是一般表,Flink 应用 is_generic 属性进行标识。默认状况下,创立的表是一般表,即is_generic=true,如果要创立 Hive 兼容表,须要在建表属性中指定is_generic=false

尖叫提醒:

因为依赖 Hive Metastore,所以必须开启 Hive MetaStore 服务

代码中应用 Hive Catalog

   EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 应用注册的 catalog
        tableEnv.useCatalog("myhive");

Flink SQLCli 中应用 Hive Catalog

在 FlinkSQL Cli 中应用 Hive Catalog 很简略,只须要配置一下 sql-cli-defaults.yaml 文件即可。配置内容如下:

catalogs:
   - name: myhive
     type: hive
     default-database: default
     hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf

在 FlinkSQL Cli 中创立一张 kafka 表,该表默认为一般表,即 is_generic=true

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用户 id
    `item_id` BIGINT, -- 商品 id
    `cat_id` BIGINT, -- 品类 id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为产生的工夫戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义 watermark
 ) WITH ( 
    'connector' = 'kafka', -- 应用 kafka connector
    'topic' = 'user_behavior', -- kafka 主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格局为 json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

咱们能够在 Hive 客户端中查看该表的元数据信息

hive (default)> desc formatted  user_behavior;
Table Parameters:                
       ...
        is_generic              true                
      ...         

从下面的元数据信息能够看出,is_generic=true,阐明该表是一张一般表,如果在 Hive 中去查看该表,则会报错。

下面创立的表是一般表,该表不能应用 Hive 去查问。那么,该如何创立一张 Hive 兼容表呢?咱们只须要在建表的属性中显示指定 is_generic=false 即可,具体如下:

CREATE TABLE hive_compatible_tbl ( 
    `user_id` BIGINT, -- 用户 id
    `item_id` BIGINT, -- 商品 id
    `cat_id` BIGINT, -- 品类 id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT -- 用户行为产生的工夫戳
 ) WITH ( 
    'connector' = 'kafka', -- 应用 kafka connector
    'topic' = 'user_behavior', -- kafka 主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格局为 json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false',
    'is_generic' = 'false'
);

当咱们在 Hive 中查看该表的元数据信息时,能够看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:                
        ...           
        is_generic              false               
        ...

咱们能够应用 FlinkSQL Cli 或者 HiveCli 向该表中写入数据,而后别离通过 FlinkSQL Cli 和 Hive Cli 去查看该表数据的变动

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;

再在 FlinkSQL Cli 中查看该表,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
                   user_id                   item_id                    action
                      2020                      1221                       buy
    

同样,咱们能够在 FlinkSQL Cli 中去向该表中写入数据:

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;

                   user_id                   item_id                    action
                      2020                      1221                       buy
                      2020                      1222                       fav

尖叫提醒:

对于 Hive 兼容的表,须要留神数据类型,具体的数据类型对应关系以及留神点如下

Flink 数据类型 Hive 数据类型
CHAR(p) CHAR(p)
VARCHAR(p) VARCHAR(p)
STRING STRING
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(9) TIMESTAMP
BYTES BINARY
ARRAY<T> LIST<T>
MAP<K, V> MAP<K, V>
ROW STRUCT

留神

  • Hive CHAR(p) 类型的最大长度为 255
  • Hive VARCHAR(p)类型的最大长度为 65535
  • Hive MAP类型的 key 仅反对根本类型,而 Flink’s MAP 类型的 key 执行任意类型
  • Hive 不反对联结数据类型,比方 STRUCT
  • Hive’s TIMESTAMP 的精度是 9,Hive UDFs 函数只能解决 precision <= 9 的 TIMESTAMP
  • Hive 不反对 Flink 提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及 MULTISET 类型
  • FlinkINTERVAL 类型与 Hive INTERVAL 类型不一样

下面介绍了一般表和 Hive 兼容表,那么咱们该如何应用 Hive 的语法进行建表呢?这个时候就须要应用Hive Dialect

什么是 Hive Dialect

从 Flink1.11.0 开始,只有开启了 Hive dialect 配置,用户就能够应用 HiveQL 语法,这样咱们就能够在 Flink 中应用 Hive 的语法应用一些 DDL 和 DML 操作。

Flink 目前反对两种 SQL 方言(SQL dialects), 别离为:default 和 hive。默认的 SQL 方言是default,如果要应用 Hive 的语法,须要将 SQL 方言切换到hive

如何应用 Hive Dialect

在 SQL Cli 中应用 Hive dialect

应用 hive dialect 只须要配置一个参数即可,该参数名称为:table.sql-dialect。咱们就能够在 sql-client-defaults.yaml 配置文件中进行配置,也能够在具体的会话窗口中进行设定,对于 SQL dialect 的切换,不须要进行重启 session。

execution:
  planner: blink
  type: batch
  result-mode: table

configuration:
  table.sql-dialect: hive

如果咱们须要在 SQL Cli 中进行切换 hive dialect,能够应用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 应用 hive dialect
Flink SQL> set table.sql-dialect=default; -- 应用 default dialect

尖叫提醒:

一旦切换到了 hive dialect,就只能应用 Hive 的语法建表,如果尝试应用 Flink 的语法建表,则会报错

在 Table API 中配合 dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 应用 hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 应用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

操作示例

Flink SQL> set table.sql-dialect=hive;
-- 应用 Hive 语法创立一张表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
  `id` int COMMENT 'id',
  `name` string COMMENT '名称',
  `age` int COMMENT '年龄' 
)
COMMENT 'hive dialect 表测试'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

进入 Hive 客户端去查看该表的元数据信息

desc formatted hive_dialect_tbl;
col_name        data_type       comment
# col_name              data_type               comment             
                 
id                      int                                         
name                    string                                      
age                     int                                         
                 
# Detailed Table Information             
Database:               default                  
Owner:                  null                     
CreateTime:             Mon Dec 21 17:23:48 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl        
Table Type:             MANAGED_TABLE            
Table Parameters:                
        comment                 hive dialect 表测试     
        is_generic              false               
        transient_lastDdlTime   1608542628          
                 
# Storage Information            
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        field.delim             ,                   
        serialization.format    ,                   

很显著,该表是一张 Hive 兼容表,即is_generic=false

应用 FlinkSQLCli 向该表中写入一条数据:

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;

咱们也能够在 Hive 的 Cli 中去操作该表

hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;

以下是应用 Hive 方言的一些注意事项。

  • Hive dialect 只能用于操作 Hive 表,不能用于一般表。Hive 方言应与 HiveCatalog 一起应用。
  • 尽管所有 Hive 版本都反对雷同的语法,然而是否有特定性能依然取决于应用的 Hive 版本。例如,仅在 Hive-2.4.0 或更高版本中反对更新数据库地位。
  • Hive 和 Calcite 具备不同的保留关键字。例如,default在 Calcite 中是保留关键字,在 Hive 中是非保留关键字。所以,在应用 Hive dialect 时,必须应用反引号(`)援用此类关键字,能力将其用作标识符。
  • 在 Hive 中不能查问在 Flink 中创立的视图。

当然,一旦开启了 Hive dialect,咱们就能够依照 Hive 的操作形式在 Flink 中去解决 Hive 的数据了,具体的操作与 Hive 统一,本文不再赘述。

总结

本文次要介绍了 Hive Catalog 和 Hive Dialect。其中 Hive Catalog 的作用是长久化 Flink 的元数据信息,Hive Dialect 是反对 Hive 语法的一个配置参数,这两个概念是 Flink 集成 Hive 的要害。下一篇分享将介绍如何应用 Flink 读写 Hive。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

正文完
 0