关于数据库:Spark-Catalog深入理解与实战DEEPNOVA开发者社区

30次阅读

共计 6626 个字符,预计需要花费 17 分钟才能阅读完成。

作者:闻乃松

写过 Spark 应用程序的同学都晓得,通过上面这段代码就能够加载和拜访内部 Hive 数据源:

SparkSession.builder().

    appName(TestSparkHive.class.getSimpleName()).

    master("local[*]").

    enableHiveSupport().

    getOrCreate();

List<Row> list= spark.sql("show databases").collectAsList();

兴许你会好奇,它是怎么找到并拜访内部 Hive 数据源的?

其实,Spark 辨认 Hive,也是须要依赖 Hive 配置项的,配置项的起源能够是 $HIVE_HOME 环境变量,也能够从 Spark 的运行环境的 classpath 下加载 Hive 相干的配置文件。

创立对 Hive 内部数据源的拜访,不得不提到 Spark 的两个类:SessionCatalog 和 ExternalCatalog。前者是对后者的封装,对外部数据源的拜访都是通过 ExternalCatalog 实现。而 ExternalCatalog 是一个 Trait 类型,提供了对表、函数和分区的增删改查根本接口。对 Hive 数据源来讲,别离继承上述两个类,提供了具体的实现:HiveSessionCatalog 和 HiveExternalCatalog。

随着新数据源(Spark 中称为 DataSourceV2)的呈现,原来的 SessionCatalog 暴露出弊病和有余,为了适应新的数据源个性,Spark 推出了新的接口:CatalogPlugin,因为属于顶层接口,CatalogPlugin 自身很简略,只有 3 个办法:

public interface CatalogPlugin {void initialize(String name, CaseInsensitiveStringMap options);

   String name();

   default String[] defaultNamespace() {return new String[0]; }

}

实现自定义 Catalog,既能够间接实现 CatalogPlugin,也能够扩大 TableCatalog 接口,TableCatalog 扩大了 CatalogPlugin 并提供了表操作相干性能的接口。同理,实现函数相干的 Catalog,也能够间接扩大 FunctionCatalog,因为它提供了函数治理相干的接口。同 SessionCatalog 绝对应,CatalogPlugin 接口体系也实现了 V2SessionCatalog,整个 CatalogPlugin 类体系示意为下图所示:

V2SessionCatalog 不同于 SessionCataolog,次要体现在:

  1. V2SessionCatalog 实现了 CatalogPlugIn 接口,CatalogPlugIn 是针对新数据源(DatasourceV2)的元数据管理。
  2. SessionCatalog 只是一般类,封装了内部数据源的元数据管理接口 ExternalCatalog。
  3. SessionCatalog 作为 V2SessionCatalog 的属性,或者说 V2SessionCatalog 是 SessionCatalog 的代理实现。

对于第 3 点,能够从 V2SessionCatalog 的实现失去佐证,同时以一个办法 listTables 的实现为例来看:

/**translates calls to the v1 SessionCatalog. */

class V2SessionCatalog(catalog: SessionCatalog) extends TableCatalog

with SupportsNamespaces with SQLConfHelper {override def listTables(namespace: Array[String]): Array[Identifier] = {

        namespace match {case Array(db) => catalog .listTables(db)

        .map(ident => Identifier.of(Array(ident.database.getOrElse("")),

        ident.table)) .toArray

            case _ => throw QueryCompilationErrors.noSuchNamespaceError(namespace)

        }

    }    

....

}

有了 SessionCatalog 和 V2SessionCatalog,Spark 又是如何治理这些 Catalog 呢?

Spark 提供了 CatalogManager,其外部通过一个 Map 类型的内存数据结构保护注册的 Catalog 实例:

class CatalogManager( defaultSessionCatalog: CatalogPlugin, val v1SessionCatalog:

                     SessionCatalog) extends SQLConfHelper with Logging {

    //SESSION_CATALOG_NAME 常量:spark_catalog

    import CatalogManager.SESSION_CATALOG_NAME

    import CatalogV2Util._ private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]

    

    def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog}

        else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))

        }

    }

}

CatalogManager 保护了所有 Catalog 实例的键值对信息,可能依据 catalog 名称返回对应的 Catalog 实例,其中有一个固定的名字叫 spark_catalog,用于以后默认的 Catalog 实例实现,该示例就是 V2SessionCatalog,它代理了一般的 SessionCatalog,因而,在应用时,即便什么 Catalog 都不注册,Spark 也会依据默认的 Catalog 实例加载 Hive 数据源。然而 V2SessionCatalog 只是对 SessionCatalog 的简略代理,那么如何实现简单的数据源元数据管理性能呢?这就须要扩大 V2SessionCatalog 的实现,这里以 Spark Iceberg 的实现为例阐明:

public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>

extends BaseCatalog implements CatalogExtension {private static final String[] DEFAULT_NAMESPACE = new String[]{"default"};

    private String catalogName = null;

    private TableCatalog icebergCatalog = null;

    private StagingTableCatalog asStagingCatalog = null;

    private T sessionCatalog = null;

    ...

}

SparkSessionCatalog 实现了 CatalogExtension 接口,而 CatalogExtension 接口扩大了 SparkPlugIn。留神到类中有两个 TableCatalog 类型的属性:icebergCatalog 和 sessionCatalog。其中 sessionCatalog 就是下面介绍的 V2SessionCatalog。

实际上,icebergCatalog 和 sessionCatalog 是 Iceberg Runtime 提供的两个类定义,别离是:

org.apache.iceberg.spark.SparkCatalog

 org.apache.iceberg.spark.SparkSessionCatalog

对于这两个类的区别,官网有这么一段解释:

什么意思?

就是说,SparkCatalog 专用于 Iceberg 治理,比方你能够这样在 Spark Catalog Manager 中注册 hive 和 hadoop 类型的 Catalog:

set spark.sql.catalog.hive_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;

set spark.sql.catalog.hive_iceberg_catalog_demo.type=hive;

或者

set spark.sql.catalog.hadoop_iceberg_catalog_demo=org.apache.iceberg.spark.SparkCatalog;

set spark.sql.catalog.hadoop_iceberg_catalog_demo.type=hadoop;

你能够应用如下的模式创立 Iceberg 表:

CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(id bigint COMMENT 'unique id', data string)

 USING iceberg

 location 'your path'

 TBLPROPERTIES ('iceberg.catalog'='new_iceberg_catalog');

如果以后默认 namespace 在 default 下,你甚至能够将下面建表语句简写为:

CREATE TABLE sample_iceberg_table(id bigint COMMENT 'unique id', data string);

因为以后 Catalog 曾经明确定义为 Iceberg 表,它能主动创立 Iceberg 表,但无法访问一般的 Hive 表。而 SparkSessionCatalog 不仅能够定义下面的 Iceberg Catalog,并在其中创立 Iceberg 类型的表,还能够创立非 Iceberg 类型的表,注册形式同上:

set spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;

set spark.sql.catalog.spark_catalog.type=hive;

对 SparkSessionCatalog 类型的 Catalog,其名称为固定的 spark_catalog。它重写了 Spark 默认的 V2SessionCatalog 行为,SparkSessionCatalog 可看做是对 Hive 数据源的兼容,对非 Iceberg 类型的表操作,跟一般的 Hive 表操作并无区别。以 createTable 这个办法为例:

public Table createTable(Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)

     throws TableAlreadyExistsException, NoSuchNamespaceException {String provider = (String)properties.get("provider");

     return this.useIceberg(provider) ?

         this.icebergCatalog.createTable(ident, schema, partitions, properties) :

         this.getSessionCatalog().createTable(ident, schema, partitions, properties);

 }

如果是 Iceberg 表,它应用 icebergCatalog 创立表,否则就用 SessionCatalog 创立表。而 listTables 间接将申请转给了 SessionCatalog,因为 Hive Iceberg 表和一般 Hive 表都基于 HMS 存储,所以能够应用 SessionCatalog。

public Identifier[] listTables(String[] namespace)

    throws NoSuchNamespaceException {return this.getSessionCatalog().listTables(namespace);

}

除了上述区别外,SparkSessionCatalog 对 Create Table AS Select 或者 Replace Table As Select 无奈保障原子性,而 SparkCatalog 能够。

下面介绍了 Spark Iceberg 对多类型 Catalog 的反对,下一步天然要问,这有什么用?

举两个场景来阐明:

  1. Hive 数据入湖
-- 定义 catalog

set spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;

set spark.sql.catalog.spark_catalog.type=hive;

set spark.sql.catalog.spark_catalog.uri=thrift://ip:9083;

set spark.sql.catalog.spark_catalog.warehouse=s3a://mybucket/warehouse;

CREATE TABLE spark_catalog.default.sample_iceberg_table(id bigint COMMENT 'unique id', data string)

USING iceberg;insert into spark_catalog.default.sample_iceberg_table

select * from default.sample_hive_table;
  1. 不同 Hive 版本的数据湖数据迁徙,比方从低版本的 HMS 数据湖迁徙到高版本的数据湖,能够这样实现:
-- 定义新的数据湖

SET spark.sql.catalog.new_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;

SET spark.sql.catalog.new_iceberg_catalog.type=hive;

SET spark.sql.catalog.new_iceberg_catalog.uri=thrift://ip-new:9083;

SET spark.sql.catalog.new_iceberg_catalog.warehouse=s3a://mybucket/warehouse;

-- 创立新数据湖的表 CREATE TABLE new_iceberg_catalog.default.sample_iceberg_table(id bigint COMMENT 'unique id', data string) USING iceberg;-- 定义旧的数据湖 SET spark.sql.catalog.old_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;

SET spark.sql.catalog.old_iceberg_catalog.type=hive;

SET spark.sql.catalog.old_iceberg_catalog.uri=thrift://ip-old:9083;

SET spark.sql.catalog.old_iceberg_catalog.warehouse=hdfs://service/warehouse;

insert into new_iceberg_catalog.default.sample_iceberg_table

select * from old_iceberg_catalog.default.sample_iceberg_table;

正文完
 0