作者:闻乃松

写过Spark应用程序的同学都晓得,通过上面这段代码就能够加载和拜访内部Hive数据源:

SparkSession.builder().    appName(TestSparkHive.class.getSimpleName()).    master("local[*]").    enableHiveSupport().    getOrCreate();List<Row> list= spark.sql("show databases").collectAsList();

兴许你会好奇,它是怎么找到并拜访内部Hive数据源的?

其实,Spark辨认Hive,也是须要依赖Hive配置项的,配置项的起源能够是$HIVE_HOME环境变量,也能够从Spark的运行环境的classpath下加载Hive相干的配置文件。

创立对Hive内部数据源的拜访,不得不提到Spark的两个类:SessionCatalog和ExternalCatalog。前者是对后者的封装,对外部数据源的拜访都是通过ExternalCatalog实现。而ExternalCatalog是一个Trait类型,提供了对表、函数和分区的增删改查根本接口。对Hive数据源来讲,别离继承上述两个类,提供了具体的实现:HiveSessionCatalog和HiveExternalCatalog。

随着新数据源(Spark中称为DataSourceV2)的呈现,原来的SessionCatalog暴露出弊病和有余,为了适应新的数据源个性,Spark推出了新的接口:CatalogPlugin,因为属于顶层接口,CatalogPlugin自身很简略,只有3个办法:

public interface CatalogPlugin {   void initialize(String name, CaseInsensitiveStringMap options);   String name();   default String[] defaultNamespace() { return new String[0]; }}

实现自定义Catalog,既能够间接实现CatalogPlugin,也能够扩大TableCatalog接口,TableCatalog扩大了CatalogPlugin并提供了表操作相干性能的接口。同理,实现函数相干的Catalog,也能够间接扩大FunctionCatalog,因为它提供了函数治理相干的接口。同SessionCatalog绝对应,CatalogPlugin接口体系也实现了V2SessionCatalog,整个CatalogPlugin类体系示意为下图所示:

V2SessionCatalog不同于SessionCataolog,次要体现在:

  1. V2SessionCatalog实现了CatalogPlugIn接口,CatalogPlugIn是针对新数据源(DatasourceV2)的元数据管理。
  2. SessionCatalog 只是一般类,封装了内部数据源的元数据管理接口ExternalCatalog。
  3. SessionCatalog 作为V2SessionCatalog的属性,或者说 V2SessionCatalog是SessionCatalog的代理实现。

对于第3点,能够从V2SessionCatalog的实现失去佐证,同时以一个办法listTables的实现为例来看:

/**translates calls to the v1 SessionCatalog. */class V2SessionCatalog(catalog: SessionCatalog) extends TableCatalogwith SupportsNamespaces with SQLConfHelper {    override def listTables(namespace: Array[String]): Array[Identifier] = {        namespace match {            case Array(db) => catalog .listTables(db)        .map(ident => Identifier.of(Array(ident.database.getOrElse("")),        ident.table)) .toArray            case _ => throw QueryCompilationErrors.noSuchNamespaceError(namespace)        }    }    ....}

有了SessionCatalog和V2SessionCatalog,Spark又是如何治理这些Catalog呢?

Spark提供了CatalogManager,其外部通过一个Map类型的内存数据结构保护注册的Catalog实例:

class CatalogManager( defaultSessionCatalog: CatalogPlugin, val v1SessionCatalog:                     SessionCatalog) extends SQLConfHelper with Logging {    //SESSION_CATALOG_NAME 常量:spark_catalog    import CatalogManager.SESSION_CATALOG_NAME    import CatalogV2Util._ private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]        def catalog(name: String): CatalogPlugin = synchronized {        if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {            v2SessionCatalog        }        else {            catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))        }    }}

CatalogManager保护了所有Catalog实例的键值对信息,可能依据catalog名称返回对应的Catalog实例,其中有一个固定的名字叫spark_catalog,用于以后默认的Catalog实例实现,该示例就是V2SessionCatalog,它代理了一般的SessionCatalog,因而,在应用时,即便什么Catalog都不注册,Spark也会依据默认的Catalog实例加载Hive数据源。然而V2SessionCatalog只是对SessionCatalog的简略代理,那么如何实现简单的数据源元数据管理性能呢?这就须要扩大V2SessionCatalog的实现,这里以Spark Iceberg的实现为例阐明:

public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>extends BaseCatalog implements CatalogExtension {    private static final String[] DEFAULT_NAMESPACE = new String[]{    "default"    };    private String catalogName = null;    private TableCatalog icebergCatalog = null;    private StagingTableCatalog asStagingCatalog = null;    private T sessionCatalog = null;    ...}

SparkSessionCatalog实现了CatalogExtension接口,而CatalogExtension接口扩大了SparkPlugIn。留神到类中有两个TableCatalog类型的属性:icebergCatalog和sessionCatalog。其中sessionCatalog就是下面介绍的V2SessionCatalog。

实际上,icebergCatalog和sessionCatalog是 Iceberg Runtime提供的两个类定义,别离是:

org.apache.iceberg.spark.SparkCatalog org.apache.iceberg.spark.SparkSessionCatalog

对于这两个类的区别,官网有这么一段解释:

什么意思?

就是说,SparkCatalog专用于Iceberg 治理,比方你能够这样在Spark Catalog Manager中注册hive和hadoop类型的Catalog:

set spark.sql.catalog.hive_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;set spark.sql.catalog.hive_iceberg_catalog_demo.type=hive;

或者

set spark.sql.catalog.hadoop_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;set spark.sql.catalog.hadoop_iceberg_catalog_demo.type=hadoop;

你能够应用如下的模式创立Iceberg表:

CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(     id bigint COMMENT 'unique id', data string) USING iceberg location 'your path' TBLPROPERTIES ('iceberg.catalog'='new_iceberg_catalog');

如果以后默认namespace在default下,你甚至能够将下面建表语句简写为:

CREATE TABLE sample_iceberg_table( id bigint COMMENT 'unique id', data string);

因为以后Catalog曾经明确定义为Iceberg表,它能主动创立Iceberg表,但无法访问一般的Hive表。而SparkSessionCatalog不仅能够定义下面的Iceberg Catalog,并在其中创立Iceberg类型的表,还能够创立非Iceberg类型的表,注册形式同上:

set spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;set spark.sql.catalog.spark_catalog.type=hive;

对SparkSessionCatalog类型的Catalog,其名称为固定的spark_catalog。它重写了Spark默认的V2SessionCatalog行为,SparkSessionCatalog可看做是对Hive数据源的兼容,对非Iceberg类型的表操作,跟一般的Hive表操作并无区别。以createTable这个办法为例:

public Table createTable(Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)     throws TableAlreadyExistsException, NoSuchNamespaceException {     String provider = (String)properties.get("provider");     return this.useIceberg(provider) ?         this.icebergCatalog.createTable(ident, schema, partitions, properties) :         this.getSessionCatalog().createTable(ident, schema, partitions, properties); }

如果是Iceberg表,它应用icebergCatalog创立表,否则就用SessionCatalog创立表。而listTables间接将申请转给了SessionCatalog,因为Hive Iceberg表和一般Hive表都基于HMS存储,所以能够应用SessionCatalog。

public Identifier[] listTables(String[] namespace)    throws NoSuchNamespaceException {    return this.getSessionCatalog().listTables(namespace);}

除了上述区别外,SparkSessionCatalog对Create Table AS Select或者Replace Table As Select无奈保障原子性,而SparkCatalog能够。

下面介绍了Spark Iceberg对多类型Catalog的反对,下一步天然要问,这有什么用?

举两个场景来阐明:

  1. Hive数据入湖
--定义catalogset spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;set spark.sql.catalog.spark_catalog.type=hive;set spark.sql.catalog.spark_catalog.uri=thrift://ip:9083;set spark.sql.catalog.spark_catalog.warehouse=s3a://mybucket/warehouse;CREATE TABLE spark_catalog.default.sample_iceberg_table(    id bigint COMMENT 'unique id', data string)USING iceberg;insert into spark_catalog.default.sample_iceberg_tableselect * from default.sample_hive_table;
  1. 不同Hive版本的数据湖数据迁徙,比方从低版本的HMS数据湖迁徙到高版本的数据湖,能够这样实现:
--定义新的数据湖SET spark.sql.catalog.new_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;SET spark.sql.catalog.new_iceberg_catalog.type=hive;SET spark.sql.catalog.new_iceberg_catalog.uri=thrift://ip-new:9083;SET spark.sql.catalog.new_iceberg_catalog.warehouse=s3a://mybucket/warehouse;--创立新数据湖的表 CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(    id bigint COMMENT 'unique id', data string) USING iceberg;    --定义旧的数据湖 SET spark.sql.catalog.old_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;SET spark.sql.catalog.old_iceberg_catalog.type=hive;SET spark.sql.catalog.old_iceberg_catalog.uri=thrift://ip-old:9083;SET spark.sql.catalog.old_iceberg_catalog.warehouse=hdfs://service/warehouse;insert into new_iceberg_catalog.default.sample_iceberg_tableselect * from old_iceberg_catalog.default.sample_iceberg_table;