乐趣区

关于flink:FlinkIceberg和Hive的Catalog比较研究

所谓 Catalog 即数据目录,简略讲,Catalog 是企业用于治理数据资产的形式,Catalog 借助元数据来治理数据,包含数据收集、组织、拜访、发现和治理。可见,Catalog 在数据资产治理中处于外围地位。元数据自身内容十分丰盛,包含技术元数据、业务元数据和操作元数据,本文仅仅钻研大数据计算存储框架自身的技术元数据,比方数据库、数据表、分区、视图、函数等。限于篇幅,参加比拟的计算存储框架为 Flink、Iceberg 和 Hive,比拟维度为 Catalog 的定义、Catalog 的实现和生态拓展几个方面。

Catalog 接口定义

1 Flink Catalog

从 Flink Catalog 的接口定义来看,Flink Catalog 提供了根本的数据库、数据表、函数、视图、分区的增删改查基本操作。

2 Iceberg Catalog

从 Iceberg Catalog 的接口定义来看,Iceberg Catalog 提供了根本的表创立、表替换、表删除、表改名和表加载、查问等操作。在对外接口参数中,Iceberg 应用 TableIdentifier 来标识一个表,TableIdentifier 外部又蕴含一个 Namespace。在 Iceberg 中,一个表的残缺标识组成为:TableIdentifier=Namespace+table,其中 Namespace 是一个字符串数组,反对多层级的表润饰,第 0 层为 table,第 1 层为 database。

3 Hive Catalog

Hive Catalog 在 3.x 版本以前没有 Catalog 的概念,尔后的版本中才勉强在 Metastore 中加上了一张表,专门存储 Catalog,从 Hive Catalog 的类定义中可见,申明是相当简略的:

private String name; // required private

String description; // optional private

String locationUri; // required

其中 locationUri 指定了 Catalog 所属的数据存储门路,该属性必填。

小结

通过 Catalog 定义来看,Flink Catalog 性能绝对欠缺,Iceberg Catalog 跟 Flink Catalog 相比,没有明确的对数据库相干的操作,而且也没有像 Flink Catalog 那样明确的表的全名称(如用 database.table 来标识一个表)润饰概念,而是将表的标识概念泛化。相比 Flink、Iceberg 的 Catalog,Hive Catalog 显得”后知后觉“,因为 Hive 晚期设计是基于 database 和 table 两级命名来实现,也没有思考到以后蓬勃发展的联邦查问场景。

Catalog 实现

1 Flink Catalog

Flink Catalog 的实现有两种:Hive Catalog 和 Memory Catalog。两者都继承形象的 AbstractCatalog,区别是前者借助 HMS 来治理元数据,后者基于内存治理,Flink Job 进行之后,这些数据会失落,须要重建。

2 Iceberg Catalog

Iceberg Catalog 的实现类有 4 种,Hive Catalog、Hadoop Catalog、CacheCatalog 和 JDBC Catalog。它们都继承抽象类 BaseMetastoreCatalog。Hive Catalog 将表的元数据信息存储在 Hive Metastore,为了兼容 HMS,Namespace 必须蕴含 table 和 database。

Hadoop Catalog 将表的元数据信息存储在 Hadoop 之上,因为 Hadoop 反对存算拆散,因而底层的数据文件能够是 HDFS 或者是 S3 这样的对象零碎,对 Hadoop Catalog 来讲,定位一个表的地位,只须要提供表的门路即可,因为表的元信息都存储在文件中,比方 TableIdentifier 为 [“test_table”,”test_db”,”test_nm1″,”test_nm2″] 的表全门路为:

对 Hadoop Catalog 来讲,Namespace 能够只有一层,即 table 名称即可,它并不关怀数据库的概念,只关怀表的地位,但在理论利用中,为了标准治理表,倡议应用标准的组织形式,具体如何组织,要看企业的行为习惯,目前没有最佳实际。

须要揭示一点的是,Hadoop Catalog 构建在文件系统之上,一些文件系统不反对的操作,Catalog 也无奈实现,比方对表名的更改,波及到更改门路,对象存储系统无奈反对,有代码为证:

public class HadoopCatalog{

​    @Override  

​    public void renameTable(TableIdentifier from, TableIdentifier to) {​            throw new UnsupportedOperationException("Cannot rename Hadoop tables");    

​    }

 }

同 Flink Catalog 一样,Iceberg 也提供了 JDBC Catalog,JDBC Catalog 将表的元信息存储在反对 JDBC 协定的数据库中。然而两者还是有区别的:Flink 的 JDBC Catalog 可能查问注册和查问内部 JDBC 数据源,而 Iceberg 的 JDBC Catalog 只是将自身的元数据存储在 JDBC 数据库中,Iceberg 目前反对的数据起源也仅仅是 Hadoop。除了以上三类 Catalog,Iceberg 也提供了自定义 Catalog 实现,这类的典型实现是 AWS Glue。CacheCatalog 相似 Flink 的 MemoryCatalog。

3 Hive Catalog

尽管 Hive 没有跟 Flink、Iceberg 相似的 Catalog 相干接口定义,然而在 IMetaStoreClient 接口有类似的实现,而且更欠缺:

下面只是截取了局部接口定义,实际上还有很多,感兴趣的读者本人去翻阅 IMetaStoreClient 接口定义。

Catalog 治理

从后面介绍可知,Flink 和 Iceberg 别离实现了多种 Catalog 的实例,在理论应用当中,如何不便跟应用方交互呢?

1 Flink Catalog

Flink 通过定义类 CatalogManager 来组织以后零碎中可用的 Catalog 和设置、查问以后 Catalog 等的信息:

@Internal public final class CatalogManager {   

 // A map between names and catalogs.   

 private final Map<String, Catalog> catalogs;   

 // The name of the current catalog and database    

private String currentCatalogName;        

private String currentDatabaseName;

 }

在 Catalogmanager 的外围,通过 StreamTableEnvironment 裸露给用户罕用的操作接口,比方查问以后零碎所有的 Catalog,以后 Catalog 的所有表和数据库等:

其中 registerCatalog 办法将 Catalog 实现注册到 Catalogmanager,默认是 MemoryCatalog。何为 StreamTableEnvironment?文档中这么形容:

A table environment is responsible for:

  • Connecting to external systems.
  • Registering and retrieving Tables and other meta objects from a catalog.
  • Executing SQL statements.
  • Offering further configuration options.

StreamTableEnvironment 的根本用法如下:

tableEnv.registerCatalog(catalogName, hiveCatalog);

 tableEnv.useCatalog(catalogName); 

tableEnv.executeSql("CREATE TABLE...");

tableEnv.executeSql 会在注册的 Catalog 下创立数据表,如果没有指定 Catalog 和数据库,会在类型为 GenericInMemoryCatalog,默认名为 default_catalog 的默认 default_db 下创立表。比方上面的示例,在内部数据源 mysql 上创立了一张映射表,能够在 Flink 中对其进行读写操作:

-- register a MySQL table 'users' in Flink SQL 

CREATE TABLE MyUserTable (  

​    id BIGINT,  

​    name STRING, 

​    age INT,  

​    status BOOLEAN,  

​    PRIMARY KEY (id) NOT ENFORCED ) 

​    WITH ('connector' = 'jdbc',   'url' = 'jdbc:mysql://localhost:3306/mydatabase',   'table-name' = 'users');

再比方上面,将 Kafka 指定的 Topic user_behavior 映射到 Flink 中的表 KafkaTable,而后通过 Flink 对其进行读写:

CREATE TABLE KafkaTable (​    event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format  

​    origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format  

​    partition_id BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector 

​     offset BIGINT METADATA VIRTUAL,  -- from Kafka connector 

​     user_id BIGINT, 

​     item_id BIGINT,

​     behavior STRING ) 

WITH (

  'connector' = 'kafka', 

 'topic' = 'user_behavior', 

 'properties.bootstrap.servers' = 'localhost:9092',  

'properties.group.id' = 'testGroup', 

 'scan.startup.mode' = 'earliest-offset', 

 'value.format' = 'debezium-json' 

);

Currently, PostgresCatalog is the only implementation of JDBC Catalog at the moment。如需应用,须要引入依赖:

<dependency>  

​    <groupId>org.apache.flink</groupId> 

​     <artifactId>flink-connector-jdbc_2.11</artifactId>

​      <version>1.14.0</version>

 </dependency>

应用形式示例:

JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl); 

tableEnv.registerCatalog("mypg", catalog); 

// set the JdbcCatalog as the current catalog of the session tableEnv.useCatalog("mypg");

2 Iceberg Catalog

Iceberg Catalog 的 CatalogLoader 等价于 Catalog Mamager,接口中实现了 3 种 CatalogLoader:hadoop、Hive 和 custom,见下图:

在对外接口裸露中,通过 TableLoader:

一旦有了 TableLoader,就能够通过去 loadTable 办法加载相应的表。上述接口能够通过 API 不便的调用,那如何通过 SQL 形式跟 Iceberg 交互呢?上面列举了 Hive、Hadoop 和 Custom 三种 Catalog 的用法:

CREATE CATALOG hive_catalog WITH (

  'type'='iceberg',  

'catalog-type'='hive', 

 'uri'='thrift://localhost:9083',

  'clients'='5',  'property-version'='1', 

 'warehouse'='hdfs://nn:8020/warehouse/path' ); 



CREATE CATALOG hadoop_catalog 

WITH (

  'type'='iceberg',

 'catalog-type'='hadoop',  

'warehouse'='hdfs://nn:8020/warehouse/path',

  'property-version'='1' ); 


CREATE CATALOG my_catalog WITH ( 

 'type'='iceberg', 

 'catalog-impl'='com.my.custom.CatalogImpl',  

'my-additional-catalog-config'='my-value' );

有了 Catalog 就能够进行通常的 ddl 了,比方创立一张表 sample:


CREATE TABLE hive_catalog.default.sample (  

  id BIGINT COMMENT 'unique id',   

 data STRING );

3 Hive Catalog

HiveMetaStoreClient、HiveMetaStoreClientPreCatalog、CacheableHiveMetaStoreClient 和 SessionHiveMetaStoreClient 都是 IMetaStoreClient 接口类的实现,最罕用的是 HiveMetaStoreClient。在对外裸露接口方面,既能够应用 HiveMetaStoreClient,也能够应用 HCatalog。HCatalog 是 Hadoop 的表存储管理工具,扩大了 HiveMetaStoreClient 的性能,它将 Hive Metastore 的表格数据公开给其余 Hadoop 应用程序,使得具备不同数据处理工具(Pig,MapReduce)的用户可能轻松将数据写入表格。它确保用户不用放心数据存储在何处或以何种格局存储。Hcatalog 能够通过命令行及 REST API 来拜访 HiveMetaStore,容许你应用 HiveQLDDL 语法来定义表。

以下是 HCatalog 的三个根本用处:

  • 工具间通信——大多数简单的 Hadoop 应用程序都会应用多种工具来解决雷同的数据。它们可能将 Pig 和 MapReduce 的组合用于抽取、转换、加载 (ETL) 的实现,MapReduce 用于理论的数据处理,而 Hive 用于剖析查问。中心化元数据存储库的应用简化了数据共享,并确保了某个工具的执行后果总是对其余工具可见。
  • 数据发现——对于大型 Hadoop 集群来说,常见的情景是应用程序和数据具备多样性。通常,一个应用程序的数据能够被其余应用程序应用,但试图发现这些状况须要大量跨应用程序的信息。在这种状况下,能够将 HCatalog 用作对任何应用程序可见的注册表。将数据在 HCatalog 中公布就能够让其余应用程序发现它们。
  • 系统集成——HCatalog 所提供的 REST 服务,关上了 Hadoop 数据和解决的大门,使其能够利用在整体的企业级数据和解决基础设施中。Hadoop 以繁难 API 和相似 SQL 语言的模式提供了简略的接口。

小结

Flink 和 Iceberg 都实现了类似的 Catalog 治理性能,前者通过 CatalogManager,后者通过 CatalogLoader。两者相同点都反对 HMS 和 JDBC,然而 Iceberg 也有额定对 Hadoop 的反对。Hive Catalog 因为呈现较早,没有专门的 CatalogManager,然而它的 MetaStoreClient 接口实现了同样的性能,而且更为欠缺。不仅如此,HCatalog 的引入解决不同 Hadoop 利用之间,依照对立标准的形式拜访 HDFS 文件数据。另外,HMS 底层存储反对 derby、jdbc 数据库等。相比而言,Flink 和 Iceberg 是横向发展,而 Hive 是纵向倒退。

生态扩大

在 Catalog 之上,Flinl、Iceberg 和 Hive 通过不同的形式来拓展本人的生态。

1 Flink Catalog

Flink 次要通过 Connector 来连贯不同类型的 Source 和 Sink,Source 比方 Kafka、Kinesis、RabbitMQ、ActiveMQ 等,Sink 比方 Kafka、Kinesis、RabbitMQ、ActiveMQ、HDFS、Redis、Elasticsearch、Cassandra 等,应用这些 Connector,须要引入相应的依赖包,对于零碎自带的比方 Socket、Files、Collections、Std Out 等能够间接应用,除了上述类型之外,为满足自定义应用场景,Flink 还提供了 SourceFunction 和 SinkFunction。残缺的零碎生态反对类型列举如下:

2 Iceberg Catalog

Iceberg 实现了名叫 FlinkCatalog 的 Catalog,该 Catalog 不同于 Iceberg 的 Catalog,是 Flink 定义的类型。实践上,Fink Source 反对的的数据源都能够写入 Iceberg。因而 Iceberg 的生态拓展形式能够这样形容:

3 Hive Catalog

Hive 没有相似 Connector 的概念,Hive 是通过 HiveStorageHandler 接口来提供对不同数据格式的拜访,上面是 Hive 曾经实现的 StorageHandler:

其中比拟罕用的是 DefaultStorageHandler、HBaseStorageHandler、DruidStorageHandler 和 JdbcStorageHandler。DefaultStorageHandler 罕用于基类,用于实现自定义 StoargeHandler,JdbcStorageHandler 较新。

引入 Storage Handler,Hive 用户应用 SQL 可读写内部数据源。ElasticSearch, Kafka,HBase 等数据源的查问对非专业开发是有肯定门槛的,借助 Storage Handler,他们有了一种方便快捷的伎俩查问数据。另外,Hive 作为数仓的外围组件,借助 Storage Handler,数据导入导出能够对立以 SQL 实现,缩小了大数据开发保护的技术栈。下图列举了罕用的一些 Storage Handler 实现:

小结

在生态拓展方面,Flink 次要通过 Connector 机制反对不同数据源和数据目标的读取和写入;Iceberg 则借助 Flink 的 Source Connector 能力,向下反对基于 Hadoop 的数据入湖,间接形式来拓展生态边界;Hive 通过 HiveStorageHandler 接口,内部通过实现 HiveStorageHandler 来接入对不同数据源的反对。在积极性方面,Flink 处于踊跃防御态势,Iceberg 处于练内功阶段,Hive 正在补短板,处于踊跃追赶态势。

总结

Catalog 是大数据技术体系中至关重要的一环,本文从技术角度,从 Catalog 的接口定义、Catalog 的实现、Cataslog 的治理和生态拓展几个方面比拟了 Flink、Iceberg 和 Hive 在这方面的利用和倒退,钻研结果表明 Flink、Iceberg 和 Hive 都具备了肯定的 Catalog 治理方面的能力,然而倒退节奏不同:Flink 在流计算方面倒退较快,并且建设起了绝对欠缺的生态,Iceberg 处于后到一方,目前专一于湖上数据的组织治理,在数据获取方面通过拥抱 Flink 大腿来拓展生态边界,而 Hive 属于传统离线数据的元数据管理的相对霸主,绝对 Flink 等计算引擎,Hive 向内倒退,在纵向拓展方面倒退很欠缺,在横向发展方面,目前处于查漏补缺,踊跃追赶阶段,比方实现了 Streaming 解决,在联邦查问方面,通过内部三方零碎的反对,也拓展到了 HBase、Kafka、Iceberg、JDBC 等技术体系。而且 Hive Metastore 也被 Flink、Iceberg 深度依赖,看起来相当长时间都很难解脱对 HMS 的依赖(也没有解脱的必要),然而 Hive 还不具备对 JDBC Catalog 等主动元数据发现能力,因而如果须要开发新的 Catalog Manager,较好的形式仍然是像 Flink、Iceberg 做的那样,将 HMS 集成到零碎之中,而后再横向拓展其余类型的 Catalog。

参考链接

https://www.oracle.com/big-da…

https://iceberg.apache.org/#h…

https://nightlies.apache.org/…

https://trino.io/docs/current…

https://issues.apache.org/jir…

0505- 应用 Apache Hive3 实现跨数据库的联邦查问

https://cwiki.apache.org/conf…

https://issues.apache.org/jir…

退出移动版