关于flink:Flink集成Hive之Hive-Catalog与Hive-Dialect以Flink112

在上一篇分享Flink集成Hive之疾速入门–以Flink1.12为例中,介绍了Flink集成Hive的进本步骤。本文分享,将持续介绍Flink集成Hive的另外两个概念:Hive Catalog与Hive Dialect。本文包含以下内容,心愿对你有所帮忙。

  • 什么是Hive Catalog
  • 如何应用Hive Catalog
  • 什么是Hive Dialect
  • 如何应用Hive Dialect

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

什么是Hive Catalog

咱们晓得,Hive应用Hive Metastore(HMS)存储元数据信息,应用关系型数据库来长久化存储这些信息。所以,Flink集成Hive须要买通Hive的metastore,去治理Flink的元数据,这就是Hive Catalog的性能。

Hive Catalog的次要作用是应用Hive MetaStore去治理Flink的元数据。Hive Catalog能够将元数据进行长久化,这样后续的操作就能够重复应用这些表的元数据,而不必每次应用时都要从新注册。如果不去长久化catalog,那么在每个session中取解决数据,都要去反复地创立元数据对象,这样是十分耗时的。

如何应用Hive Catalog

HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就能够应用HiveCatalog。比方,咱们通过FlinkSQL 的DDL语句创立一张kafka的数据源表,立即就能查看该表的元数据信息。

HiveCatalog能够解决两种类型的表:一种是Hive兼容的表,另一种是一般表(generic table)。其中Hive兼容表是以兼容Hive的形式来存储的,所以,对于Hive兼容表而言,咱们既能够应用Flink去操作该表,又能够应用Hive去操作该表。

一般表是对Flink而言的,当应用HiveCatalog创立一张一般表,仅仅是应用Hive MetaStore将其元数据进行了长久化,所以能够通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),然而不能通过Hive去解决这些表,因为语法不兼容。

对于是否是一般表,Flink应用is_generic属性进行标识。默认状况下,创立的表是一般表,即is_generic=true,如果要创立Hive兼容表,须要在建表属性中指定is_generic=false

尖叫提醒:

因为依赖Hive Metastore,所以必须开启Hive MetaStore服务

代码中应用Hive Catalog

   EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 应用注册的catalog
        tableEnv.useCatalog("myhive");

Flink SQLCli中应用Hive Catalog

在FlinkSQL Cli中应用Hive Catalog很简略,只须要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:

catalogs:
   - name: myhive
     type: hive
     default-database: default
     hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf

在FlinkSQL Cli中创立一张kafka表,该表默认为一般表,即is_generic=true

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为产生的工夫戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
 ) WITH ( 
    'connector' = 'kafka', -- 应用 kafka connector
    'topic' = 'user_behavior', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格局为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

咱们能够在Hive客户端中查看该表的元数据信息

hive (default)> desc formatted  user_behavior;
Table Parameters:                
       ...
        is_generic              true                
      ...         

从下面的元数据信息能够看出,is_generic=true,阐明该表是一张一般表,如果在Hive中去查看该表,则会报错。

下面创立的表是一般表,该表不能应用Hive去查问。那么,该如何创立一张Hive兼容表呢?咱们只须要在建表的属性中显示指定is_generic=false即可,具体如下:

CREATE TABLE hive_compatible_tbl ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT -- 用户行为产生的工夫戳
 ) WITH ( 
    'connector' = 'kafka', -- 应用 kafka connector
    'topic' = 'user_behavior', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格局为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false',
    'is_generic' = 'false'
);

当咱们在Hive中查看该表的元数据信息时,能够看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:                
        ...           
        is_generic              false               
        ...

咱们能够应用FlinkSQL Cli或者HiveCli向该表中写入数据,而后别离通过FlinkSQL Cli和Hive Cli去查看该表数据的变动

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;

再在FlinkSQL Cli中查看该表,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
                   user_id                   item_id                    action
                      2020                      1221                       buy
    

同样,咱们能够在FlinkSQL Cli中去向该表中写入数据:

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;

                   user_id                   item_id                    action
                      2020                      1221                       buy
                      2020                      1222                       fav

尖叫提醒:

对于Hive兼容的表,须要留神数据类型,具体的数据类型对应关系以及留神点如下

Flink 数据类型 Hive 数据类型
CHAR(p) CHAR(p)
VARCHAR(p) VARCHAR(p)
STRING STRING
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(9) TIMESTAMP
BYTES BINARY
ARRAY<T> LIST<T>
MAP<K, V> MAP<K, V>
ROW STRUCT

留神

  • Hive CHAR(p) 类型的最大长度为255
  • Hive VARCHAR(p)类型的最大长度为65535
  • Hive MAP类型的key仅反对根本类型,而Flink’s MAP 类型的key执行任意类型
  • Hive不反对联结数据类型,比方STRUCT
  • Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函数只能解决 precision <= 9的 TIMESTAMP
  • Hive 不反对 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型
  • FlinkINTERVAL 类型与 Hive INTERVAL 类型不一样

下面介绍了一般表和Hive兼容表,那么咱们该如何应用Hive的语法进行建表呢?这个时候就须要应用Hive Dialect

什么是Hive Dialect

从Flink1.11.0开始,只有开启了Hive dialect配置,用户就能够应用HiveQL语法,这样咱们就能够在Flink中应用Hive的语法应用一些DDL和DML操作。

Flink目前反对两种SQL方言(SQL dialects),别离为:default和hive。默认的SQL方言是default,如果要应用Hive的语法,须要将SQL方言切换到hive

如何应用Hive Dialect

在SQL Cli中应用Hive dialect

应用hive dialect只须要配置一个参数即可,该参数名称为:table.sql-dialect。咱们就能够在sql-client-defaults.yaml配置文件中进行配置,也能够在具体的会话窗口中进行设定,对于SQL dialect的切换,不须要进行重启session。

execution:
  planner: blink
  type: batch
  result-mode: table

configuration:
  table.sql-dialect: hive

如果咱们须要在SQL Cli中进行切换hive dialect,能够应用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 应用hive dialect
Flink SQL> set table.sql-dialect=default; -- 应用default dialect

尖叫提醒:

一旦切换到了hive dialect,就只能应用Hive的语法建表,如果尝试应用Flink的语法建表,则会报错

在Table API中配合dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 应用hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 应用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

操作示例

Flink SQL> set table.sql-dialect=hive;
-- 应用Hive语法创立一张表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
  `id` int COMMENT 'id',
  `name` string COMMENT '名称',
  `age` int COMMENT '年龄' 
)
COMMENT 'hive dialect表测试'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

进入Hive客户端去查看该表的元数据信息

desc formatted hive_dialect_tbl;
col_name        data_type       comment
# col_name              data_type               comment             
                 
id                      int                                         
name                    string                                      
age                     int                                         
                 
# Detailed Table Information             
Database:               default                  
Owner:                  null                     
CreateTime:             Mon Dec 21 17:23:48 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl        
Table Type:             MANAGED_TABLE            
Table Parameters:                
        comment                 hive dialect表测试     
        is_generic              false               
        transient_lastDdlTime   1608542628          
                 
# Storage Information            
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        field.delim             ,                   
        serialization.format    ,                   

很显著,该表是一张Hive兼容表,即is_generic=false

应用FlinkSQLCli向该表中写入一条数据:

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;

咱们也能够在Hive的Cli中去操作该表

hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;

以下是应用Hive方言的一些注意事项。

  • Hive dialect只能用于操作Hive表,不能用于一般表。Hive方言应与HiveCatalog一起应用。
  • 尽管所有Hive版本都反对雷同的语法,然而是否有特定性能依然取决于应用的Hive版本。例如,仅在Hive-2.4.0或更高版本中反对更新数据库地位。
  • Hive和Calcite具备不同的保留关键字。例如,default在Calcite中是保留关键字,在Hive中是非保留关键字。所以,在应用Hive dialect时,必须应用反引号(`)援用此类关键字,能力将其用作标识符。
  • 在Hive中不能查问在Flink中创立的视图。

当然,一旦开启了Hive dialect,咱们就能够依照Hive的操作形式在Flink中去解决Hive的数据了,具体的操作与Hive统一,本文不再赘述。

总结

本文次要介绍了Hive Catalog和Hive Dialect。其中Hive Catalog的作用是长久化Flink的元数据信息,Hive Dialect是反对Hive语法的一个配置参数,这两个概念是Flink集成Hive的要害。下一篇分享将介绍如何应用Flink读写Hive。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理