所谓Catalog即数据目录,简略讲,Catalog是企业用于治理数据资产的形式,Catalog借助元数据来治理数据,包含数据收集、组织、拜访、发现和治理。可见,Catalog在数据资产治理中处于外围地位。元数据自身内容十分丰盛,包含技术元数据、业务元数据和操作元数据,本文仅仅钻研大数据计算存储框架自身的技术元数据,比方数据库、数据表、分区、视图、函数等。限于篇幅,参加比拟的计算存储框架为Flink、Iceberg和Hive,比拟维度为Catalog的定义、Catalog的实现和生态拓展几个方面。
Catalog接口定义
1 Flink Catalog
从Flink Catalog的接口定义来看,Flink Catalog提供了根本的数据库、数据表、函数、视图、分区的增删改查基本操作。
2 Iceberg Catalog
从Iceberg Catalog的接口定义来看,Iceberg Catalog提供了根本的表创立、表替换、表删除、表改名和表加载、查问等操作。在对外接口参数中,Iceberg应用TableIdentifier来标识一个表,TableIdentifier外部又蕴含一个Namespace。在Iceberg中,一个表的残缺标识组成为: TableIdentifier=Namespace+table,其中Namespace是一个字符串数组,反对多层级的表润饰,第0层为table,第1层为database。
3 Hive Catalog
Hive Catalog在3.x版本以前没有Catalog 的概念,尔后的版本中才勉强在Metastore中加上了一张表,专门存储Catalog,从Hive Catalog的类定义中可见,申明是相当简略的:
private String name; // required privateString description; // optional privateString locationUri; // required
其中locationUri指定了Catalog所属的数据存储门路,该属性必填。
小结
通过Catalog定义来看,Flink Catalog性能绝对欠缺,Iceberg Catalog跟Flink Catalog相比,没有明确的对数据库相干的操作,而且也没有像Flink Catalog那样明确的表的全名称(如用database.table来标识一个表)润饰概念,而是将表的标识概念泛化。相比Flink、Iceberg的 Catalog,Hive Catalog显得”后知后觉“,因为Hive晚期设计是基于database和table两级命名来实现,也没有思考到以后蓬勃发展的联邦查问场景。
Catalog实现
1 Flink Catalog
Flink Catalog的实现有两种:Hive Catalog和Memory Catalog。两者都继承形象的AbstractCatalog,区别是前者借助HMS来治理元数据,后者基于内存治理,Flink Job进行之后,这些数据会失落,须要重建。
2 Iceberg Catalog
Iceberg Catalog的实现类有4种,Hive Catalog、Hadoop Catalog、CacheCatalog和JDBC Catalog。它们都继承抽象类BaseMetastoreCatalog。Hive Catalog将表的元数据信息存储在Hive Metastore,为了兼容HMS,Namespace必须蕴含table和database。
Hadoop Catalog将表的元数据信息存储在Hadoop之上,因为Hadoop反对存算拆散,因而底层的数据文件能够是HDFS或者是S3这样的对象零碎,对Hadoop Catalog来讲,定位一个表的地位,只须要提供表的门路即可,因为表的元信息都存储在文件中,比方TableIdentifier为["test_table","test_db","test_nm1","test_nm2"]的表全门路为:
对Hadoop Catalog来讲,Namespace能够只有一层,即table 名称即可,它并不关怀数据库的概念,只关怀表的地位,但在理论利用中,为了标准治理表,倡议应用标准的组织形式,具体如何组织,要看企业的行为习惯,目前没有最佳实际。
须要揭示一点的是,Hadoop Catalog构建在文件系统之上,一些文件系统不反对的操作,Catalog也无奈实现,比方对表名的更改,波及到更改门路,对象存储系统无奈反对,有代码为证:
public class HadoopCatalog{ @Override public void renameTable(TableIdentifier from, TableIdentifier to) { throw new UnsupportedOperationException("Cannot rename Hadoop tables"); } }
同Flink Catalog一样,Iceberg也提供了JDBC Catalog,JDBC Catalog将表的元信息存储在反对JDBC协定的数据库中。然而两者还是有区别的:Flink的JDBC Catalog可能查问注册和查问内部JDBC数据源,而Iceberg的JDBC Catalog只是将自身的元数据存储在JDBC数据库中,Iceberg目前反对的数据起源也仅仅是Hadoop。除了以上三类Catalog,Iceberg也提供了自定义Catalog实现,这类的典型实现是AWS Glue。CacheCatalog相似Flink的MemoryCatalog。
3 Hive Catalog
尽管Hive 没有跟Flink、Iceberg相似的Catalog相干接口定义,然而在IMetaStoreClient接口有类似的实现,而且更欠缺:
下面只是截取了局部接口定义,实际上还有很多,感兴趣的读者本人去翻阅IMetaStoreClient接口定义。
Catalog治理
从后面介绍可知,Flink和Iceberg别离实现了多种Catalog的实例,在理论应用当中,如何不便跟应用方交互呢?
1 Flink Catalog
Flink通过定义类CatalogManager来组织以后零碎中可用的Catalog和设置、查问以后Catalog等的信息:
@Internal public final class CatalogManager { // A map between names and catalogs. private final Map<String, Catalog> catalogs; // The name of the current catalog and database private String currentCatalogName; private String currentDatabaseName; }
在Catalogmanager的外围,通过StreamTableEnvironment裸露给用户罕用的操作接口,比方查问以后零碎所有的Catalog,以后Catalog 的所有表和数据库等:
其中registerCatalog办法将Catalog实现注册到Catalogmanager,默认是MemoryCatalog。何为StreamTableEnvironment?文档中这么形容:
A table environment is responsible for:
- Connecting to external systems.
- Registering and retrieving Tables and other meta objects from a catalog.
- Executing SQL statements.
- Offering further configuration options.
StreamTableEnvironment的根本用法如下:
tableEnv.registerCatalog(catalogName, hiveCatalog); tableEnv.useCatalog(catalogName); tableEnv.executeSql("CREATE TABLE...");
tableEnv.executeSql会在注册的Catalog下创立数据表,如果没有指定Catalog和数据库,会在类型为GenericInMemoryCatalog,默认名为default_catalog 的默认default_db下创立表。比方上面的示例,在内部数据源mysql上创立了一张映射表,能够在Flink中对其进行读写操作:
-- register a MySQL table 'users' in Flink SQL CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users' );
再比方上面,将Kafka指定的Topic user_behavior映射到Flink中的表KafkaTable,而后通过Flink对其进行读写:
CREATE TABLE KafkaTable ( event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format partition_id BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector offset BIGINT METADATA VIRTUAL, -- from Kafka connector user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' );
Currently, PostgresCatalog is the only implementation of JDBC Catalog at the moment。如需应用,须要引入依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.14.0</version> </dependency>
应用形式示例:
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl); tableEnv.registerCatalog("mypg", catalog); // set the JdbcCatalog as the current catalog of the session tableEnv.useCatalog("mypg");
2 Iceberg Catalog
Iceberg Catalog的CatalogLoader等价于Catalog Mamager,接口中实现了3种CatalogLoader:hadoop、Hive和custom,见下图:
在对外接口裸露中,通过TableLoader:
一旦有了TableLoader,就能够通过去loadTable办法加载相应的表。上述接口能够通过API不便的调用,那如何通过SQL形式跟Iceberg交互呢?上面列举了Hive、Hadoop和Custom三种Catalog的用法:
CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://localhost:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://nn:8020/warehouse/path' ); CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://nn:8020/warehouse/path', 'property-version'='1' ); CREATE CATALOG my_catalog WITH ( 'type'='iceberg', 'catalog-impl'='com.my.custom.CatalogImpl', 'my-additional-catalog-config'='my-value' );
有了Catalog就能够进行通常的ddl了,比方创立一张表sample:
CREATE TABLE hive_catalog.default.sample ( id BIGINT COMMENT 'unique id', data STRING );
3 Hive Catalog
HiveMetaStoreClient、HiveMetaStoreClientPreCatalog、CacheableHiveMetaStoreClient和SessionHiveMetaStoreClient都是IMetaStoreClient接口类的实现,最罕用的是HiveMetaStoreClient。在对外裸露接口方面,既能够应用HiveMetaStoreClient,也能够应用HCatalog。HCatalog是Hadoop的表存储管理工具,扩大了HiveMetaStoreClient的性能,它将Hive Metastore的表格数据公开给其余Hadoop应用程序,使得具备不同数据处理工具(Pig,MapReduce)的用户可能轻松将数据写入表格。它确保用户不用放心数据存储在何处或以何种格局存储。Hcatalog能够通过命令行及REST API来拜访HiveMetaStore,容许你应用HiveQLDDL语法来定义表。
以下是HCatalog的三个根本用处:
- 工具间通信——大多数简单的Hadoop应用程序都会应用多种工具来解决雷同的数据。它们可能将Pig和MapReduce的组合用于抽取、转换、加载(ETL)的实现,MapReduce用于理论的数据处理,而Hive用于剖析查问。中心化元数据存储库的应用简化了数据共享,并确保了某个工具的执行后果总是对其余工具可见。
- 数据发现——对于大型Hadoop集群来说,常见的情景是应用程序和数据具备多样性。通常,一个应用程序的数据能够被其余应用程序应用,但试图发现这些状况须要大量跨应用程序的信息。在这种状况下,能够将HCatalog用作对任何应用程序可见的注册表。将数据在HCatalog中公布就能够让其余应用程序发现它们。
- 系统集成——HCatalog所提供的REST服务,关上了Hadoop数据和解决的大门,使其能够利用在整体的企业级数据和解决基础设施中。Hadoop以繁难API和相似SQL语言的模式提供了简略的接口。
小结
Flink和Iceberg都实现了类似的Catalog 治理性能,前者通过CatalogManager,后者通过CatalogLoader。两者相同点都反对HMS和JDBC,然而Iceberg也有额定对Hadoop的反对。Hive Catalog因为呈现较早,没有专门的CatalogManager,然而它的 MetaStoreClient接口实现了同样的性能,而且更为欠缺。不仅如此,HCatalog的引入解决不同Hadoop利用之间,依照对立标准的形式拜访HDFS文件数据。另外,HMS 底层存储反对derby、jdbc数据库等。相比而言,Flink和Iceberg是横向发展,而Hive是纵向倒退。
生态扩大
在Catalog之上,Flinl、Iceberg和Hive通过不同的形式来拓展本人的生态。
1 Flink Catalog
Flink次要通过Connector来连贯不同类型的Source和Sink,Source比方Kafka、Kinesis、RabbitMQ、ActiveMQ等,Sink比方Kafka、Kinesis、RabbitMQ、ActiveMQ、HDFS、Redis、Elasticsearch、Cassandra等,应用这些Connector,须要引入相应的依赖包,对于零碎自带的比方Socket、Files、Collections、Std Out等能够间接应用,除了上述类型之外,为满足自定义应用场景,Flink还提供了SourceFunction和SinkFunction。残缺的零碎生态反对类型列举如下:
2 Iceberg Catalog
Iceberg实现了名叫FlinkCatalog的Catalog,该Catalog不同于Iceberg的Catalog,是Flink定义的类型。实践上,Fink Source反对的的数据源都能够写入Iceberg。因而Iceberg的生态拓展形式能够这样形容:
3 Hive Catalog
Hive没有相似Connector的概念,Hive是通过HiveStorageHandler接口来提供对不同数据格式的拜访,上面是Hive曾经实现的StorageHandler:
其中比拟罕用的是DefaultStorageHandler、HBaseStorageHandler、DruidStorageHandler和JdbcStorageHandler。DefaultStorageHandler罕用于基类,用于实现自定义StoargeHandler,JdbcStorageHandler较新。
引入Storage Handler,Hive用户应用SQL可读写内部数据源。ElasticSearch, Kafka,HBase等数据源的查问对非专业开发是有肯定门槛的,借助Storage Handler,他们有了一种方便快捷的伎俩查问数据。另外,Hive作为数仓的外围组件,借助Storage Handler,数据导入导出能够对立以SQL实现,缩小了大数据开发保护的技术栈。下图列举了罕用的一些Storage Handler实现:
小结
在生态拓展方面,Flink次要通过Connector机制反对不同数据源和数据目标的读取和写入;Iceberg则借助Flink的Source Connector能力,向下反对基于Hadoop的数据入湖,间接形式来拓展生态边界;Hive通过HiveStorageHandler接口,内部通过实现HiveStorageHandler来接入对不同数据源的反对。在积极性方面,Flink处于踊跃防御态势,Iceberg处于练内功阶段,Hive正在补短板,处于踊跃追赶态势。
总结
Catalog是大数据技术体系中至关重要的一环,本文从技术角度,从Catalog 的接口定义、Catalog的实现、Cataslog的治理和生态拓展几个方面比拟了Flink、Iceberg和Hive在这方面的利用和倒退,钻研结果表明Flink、Iceberg和Hive都具备了肯定的Catalog治理方面的能力,然而倒退节奏不同:Flink在流计算方面倒退较快,并且建设起了绝对欠缺的生态,Iceberg处于后到一方,目前专一于湖上数据的组织治理,在数据获取方面通过拥抱Flink大腿来拓展生态边界,而Hive属于传统离线数据的元数据管理的相对霸主,绝对Flink等计算引擎,Hive向内倒退,在纵向拓展方面倒退很欠缺,在横向发展方面,目前处于查漏补缺,踊跃追赶阶段,比方实现了Streaming解决,在联邦查问方面,通过内部三方零碎的反对,也拓展到了HBase、Kafka、Iceberg、JDBC等技术体系。而且Hive Metastore也被Flink、Iceberg深度依赖,看起来相当长时间都很难解脱对HMS的依赖(也没有解脱的必要),然而Hive还不具备对JDBC Catalog等主动元数据发现能力,因而如果须要开发新的Catalog Manager,较好的形式仍然是像Flink、Iceberg做的那样,将HMS集成到零碎之中,而后再横向拓展其余类型的Catalog。
参考链接
https://www.oracle.com/big-da...
https://iceberg.apache.org/#h...
https://nightlies.apache.org/...
https://trino.io/docs/current...
https://issues.apache.org/jir...
0505-应用Apache Hive3实现跨数据库的联邦查问
https://cwiki.apache.org/conf...
https://issues.apache.org/jir...