在上一篇分享Flink集成Hive之疾速入门--以Flink1.12为例中,介绍了Flink集成Hive的进本步骤。本文分享,将持续介绍Flink集成Hive的另外两个概念:Hive Catalog与Hive Dialect。本文包含以下内容,心愿对你有所帮忙。

  • 什么是Hive Catalog
  • 如何应用Hive Catalog
  • 什么是Hive Dialect
  • 如何应用Hive Dialect
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

什么是Hive Catalog

咱们晓得,Hive应用Hive Metastore(HMS)存储元数据信息,应用关系型数据库来长久化存储这些信息。所以,Flink集成Hive须要买通Hive的metastore,去治理Flink的元数据,这就是Hive Catalog的性能。

Hive Catalog的次要作用是应用Hive MetaStore去治理Flink的元数据。Hive Catalog能够将元数据进行长久化,这样后续的操作就能够重复应用这些表的元数据,而不必每次应用时都要从新注册。如果不去长久化catalog,那么在每个session中取解决数据,都要去反复地创立元数据对象,这样是十分耗时的。

如何应用Hive Catalog

HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就能够应用HiveCatalog。比方,咱们通过FlinkSQL 的DDL语句创立一张kafka的数据源表,立即就能查看该表的元数据信息。

HiveCatalog能够解决两种类型的表:一种是Hive兼容的表,另一种是一般表(generic table)。其中Hive兼容表是以兼容Hive的形式来存储的,所以,对于Hive兼容表而言,咱们既能够应用Flink去操作该表,又能够应用Hive去操作该表。

一般表是对Flink而言的,当应用HiveCatalog创立一张一般表,仅仅是应用Hive MetaStore将其元数据进行了长久化,所以能够通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),然而不能通过Hive去解决这些表,因为语法不兼容。

对于是否是一般表,Flink应用is_generic属性进行标识。默认状况下,创立的表是一般表,即is_generic=true,如果要创立Hive兼容表,须要在建表属性中指定is_generic=false

尖叫提醒:

因为依赖Hive Metastore,所以必须开启Hive MetaStore服务

代码中应用Hive Catalog

   EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();        TableEnvironment tableEnv = TableEnvironment.create(settings);        String name            = "myhive";        String defaultDatabase = "default";        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);        tableEnv.registerCatalog("myhive", hive);        // 应用注册的catalog        tableEnv.useCatalog("myhive");

Flink SQLCli中应用Hive Catalog

在FlinkSQL Cli中应用Hive Catalog很简略,只须要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:

catalogs:   - name: myhive     type: hive     default-database: default     hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf

在FlinkSQL Cli中创立一张kafka表,该表默认为一般表,即is_generic=true

CREATE TABLE user_behavior (     `user_id` BIGINT, -- 用户id    `item_id` BIGINT, -- 商品id    `cat_id` BIGINT, -- 品类id    `action` STRING, -- 用户行为    `province` INT, -- 用户所在的省份    `ts` BIGINT, -- 用户行为产生的工夫戳    `proctime` AS PROCTIME(), -- 通过计算列产生一个解决工夫列    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件工夫     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark ) WITH (     'connector' = 'kafka', -- 应用 kafka connector    'topic' = 'user_behavior', -- kafka主题    'scan.startup.mode' = 'earliest-offset', -- 偏移量    'properties.group.id' = 'group1', -- 消费者组    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',     'format' = 'json', -- 数据源格局为json    'json.fail-on-missing-field' = 'true',    'json.ignore-parse-errors' = 'false');

咱们能够在Hive客户端中查看该表的元数据信息

hive (default)> desc formatted  user_behavior;Table Parameters:                       ...        is_generic              true                      ...         

从下面的元数据信息能够看出,is_generic=true,阐明该表是一张一般表,如果在Hive中去查看该表,则会报错。

下面创立的表是一般表,该表不能应用Hive去查问。那么,该如何创立一张Hive兼容表呢?咱们只须要在建表的属性中显示指定is_generic=false即可,具体如下:

CREATE TABLE hive_compatible_tbl (     `user_id` BIGINT, -- 用户id    `item_id` BIGINT, -- 商品id    `cat_id` BIGINT, -- 品类id    `action` STRING, -- 用户行为    `province` INT, -- 用户所在的省份    `ts` BIGINT -- 用户行为产生的工夫戳 ) WITH (     'connector' = 'kafka', -- 应用 kafka connector    'topic' = 'user_behavior', -- kafka主题    'scan.startup.mode' = 'earliest-offset', -- 偏移量    'properties.group.id' = 'group1', -- 消费者组    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',     'format' = 'json', -- 数据源格局为json    'json.fail-on-missing-field' = 'true',    'json.ignore-parse-errors' = 'false',    'is_generic' = 'false');

当咱们在Hive中查看该表的元数据信息时,能够看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl;Table Parameters:                        ...                   is_generic              false                       ...

咱们能够应用FlinkSQL Cli或者HiveCli向该表中写入数据,而后别离通过FlinkSQL Cli和Hive Cli去查看该表数据的变动

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;hive (default)> select * from hive_compatible_tbl;

再在FlinkSQL Cli中查看该表,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;                   user_id                   item_id                    action                      2020                      1221                       buy    

同样,咱们能够在FlinkSQL Cli中去向该表中写入数据:

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;Flink SQL> select user_id,item_id,action from hive_compatible_tbl;                   user_id                   item_id                    action                      2020                      1221                       buy                      2020                      1222                       fav
尖叫提醒:

对于Hive兼容的表,须要留神数据类型,具体的数据类型对应关系以及留神点如下

Flink 数据类型Hive 数据类型
CHAR(p)CHAR(p)
VARCHAR(p)VARCHAR(p)
STRINGSTRING
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(p, s)DECIMAL(p, s)
DATEDATE
TIMESTAMP(9)TIMESTAMP
BYTESBINARY
ARRAY<T>LIST<T>
MAP<K, V>MAP<K, V>
ROWSTRUCT

留神

  • Hive CHAR(p) 类型的最大长度为255
  • Hive VARCHAR(p)类型的最大长度为65535
  • Hive MAP类型的key仅反对根本类型,而Flink’s MAP 类型的key执行任意类型
  • Hive不反对联结数据类型,比方STRUCT
  • Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函数只能解决 precision <= 9的 TIMESTAMP
  • Hive 不反对 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型
  • FlinkINTERVAL 类型与 Hive INTERVAL 类型不一样

下面介绍了一般表和Hive兼容表,那么咱们该如何应用Hive的语法进行建表呢?这个时候就须要应用Hive Dialect

什么是Hive Dialect

从Flink1.11.0开始,只有开启了Hive dialect配置,用户就能够应用HiveQL语法,这样咱们就能够在Flink中应用Hive的语法应用一些DDL和DML操作。

Flink目前反对两种SQL方言(SQL dialects),别离为:default和hive。默认的SQL方言是default,如果要应用Hive的语法,须要将SQL方言切换到hive

如何应用Hive Dialect

在SQL Cli中应用Hive dialect

应用hive dialect只须要配置一个参数即可,该参数名称为:table.sql-dialect。咱们就能够在sql-client-defaults.yaml配置文件中进行配置,也能够在具体的会话窗口中进行设定,对于SQL dialect的切换,不须要进行重启session。

execution:  planner: blink  type: batch  result-mode: tableconfiguration:  table.sql-dialect: hive

如果咱们须要在SQL Cli中进行切换hive dialect,能够应用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 应用hive dialectFlink SQL> set table.sql-dialect=default; -- 应用default dialect
尖叫提醒:

一旦切换到了hive dialect,就只能应用Hive的语法建表,如果尝试应用Flink的语法建表,则会报错

在Table API中配合dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();TableEnvironment tableEnv = TableEnvironment.create(settings);// 应用hive dialecttableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// 应用 default dialecttableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

操作示例

Flink SQL> set table.sql-dialect=hive;-- 应用Hive语法创立一张表CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (  `id` int COMMENT 'id',  `name` string COMMENT '名称',  `age` int COMMENT '年龄' )COMMENT 'hive dialect表测试'ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

进入Hive客户端去查看该表的元数据信息

desc formatted hive_dialect_tbl;col_name        data_type       comment# col_name              data_type               comment                              id                      int                                         name                    string                                      age                     int                                                          # Detailed Table Information             Database:               default                  Owner:                  null                     CreateTime:             Mon Dec 21 17:23:48 CST 2020     LastAccessTime:         UNKNOWN                  Retention:              0                        Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl        Table Type:             MANAGED_TABLE            Table Parameters:                        comment                 hive dialect表测试             is_generic              false                       transient_lastDdlTime   1608542628                           # Storage Information            SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       InputFormat:            org.apache.hadoop.mapred.TextInputFormat         OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   Compressed:             No                       Num Buckets:            -1                       Bucket Columns:         []                       Sort Columns:           []                       Storage Desc Params:                     field.delim             ,                           serialization.format    ,                   

很显著,该表是一张Hive兼容表,即is_generic=false

应用FlinkSQLCli向该表中写入一条数据:

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;

咱们也能够在Hive的Cli中去操作该表

hive (default)> select * from hive_dialect_tbl;hive (default)> insert into hive_dialect_tbl select 2,'jack',22;

以下是应用Hive方言的一些注意事项。

  • Hive dialect只能用于操作Hive表,不能用于一般表。Hive方言应与HiveCatalog一起应用。
  • 尽管所有Hive版本都反对雷同的语法,然而是否有特定性能依然取决于应用的Hive版本。例如,仅在Hive-2.4.0或更高版本中反对更新数据库地位。
  • Hive和Calcite具备不同的保留关键字。例如,default在Calcite中是保留关键字,在Hive中是非保留关键字。所以,在应用Hive dialect时,必须应用反引号(`)援用此类关键字,能力将其用作标识符。
  • 在Hive中不能查问在Flink中创立的视图。

当然,一旦开启了Hive dialect,咱们就能够依照Hive的操作形式在Flink中去解决Hive的数据了,具体的操作与Hive统一,本文不再赘述。

总结

本文次要介绍了Hive Catalog和Hive Dialect。其中Hive Catalog的作用是长久化Flink的元数据信息,Hive Dialect是反对Hive语法的一个配置参数,这两个概念是Flink集成Hive的要害。下一篇分享将介绍如何应用Flink读写Hive。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包