共计 5447 个字符,预计需要花费 14 分钟才能阅读完成。
指标:从 iceberg 从找到 spark 相干类就算胜利
获得 plan:ReplaceData、MergeInto、DynamicFileFilterExec、ExtendedBatchScan
版本:spark 3.0.1,iceberg 0.11.0
数据源门路:file:///Users/bjhl/tmp/icebergData
创立一个 maven 我的项目,pom.xml 文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-3.x-worker</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.12.8</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark3-runtime -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.11.0</version>
</dependency>
<!-- <!– https://mvnrepository.com/artifact/org.apache.avro/avro –>-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
SparkSession 配置
val spark = SparkSession
.builder()
.config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 设置数据源类别为 hadoop
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.hadoop_prod.warehouse", "file:///Users/bjhl/tmp/icebergData") // 设置数据源地位 (本地)
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") // 设置扩大,反对 merge into 等性能
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
创立一张表
// 获取 catalog
val hdpCatalog = spark.sessionState.catalogManager.catalog("hadoop_prod").asInstanceOf[SparkCatalog]
val namespaces = Array("test")
// 自定义 Identifier
val identifier = new SimpleLocalIdentifierImpl("/Users/bjhl/tmp/icebergData/test/table_a", namespaces)
val options = new util.HashMap[String, String]()
// 用 spark StructType 创立 schema
val schema = new StructType()
.add("c1", IntegerType, true)
.add("c2", StringType, true)
.add("c3", StringType, true)
// 创立一张表
hdpCatalog.createTable(identifier, schema, null, options)
// 插入一条数据
spark.sql("insert into hadoop_prod.test.table_a VALUES (1, \"wlq\",\"zyc\")")
生成的构造如下
蕴含元数据信息和数据信息,test 相似库名,table_a 是表名
读取并更新,打印执行打算
// 获取 表构造信息
val df = spark.table("hadoop_prod.test.table_a")
df.printSchema()
df.show()
// val dfTableA = spark.read.format("iceberg").load("/Users/bjhl/tmp/icebergData/default/table_a")
// dfTableA.show()
spark.sql("merge into hadoop_prod.test.table_a t" +
"using (select 1 as c1, \"zyc\"as c2, \"wlq\"as c3) s" +
"on t.c1 = s.c1" +
"when matched" +
"then update set t.c3 = s.c3").explain()
df.show()
println("读写取 iceberg 数据完结")
留神:这里间接 read.format 形式始终应用的是 HiveCatalog 去获取信息,老是报错,目前还没定位出问题
成果如下:
更新数据后,存储门路目录变动如下
元数据和数据都有新增相应的版本,猜想是以快照的形式实现?
表构造
更新前数据
更新后数据
重点:物理执行打算,如下
联合 iceberg
clone iceberg 代码构建下,下面的类来自 iceberg-spark3-extensions
前面就是依据代码验证猜测的过程
结束语
留神:”spark.sql.extensions”, “org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions” 要设置,能力反对 merge into 等性能
疑难:
iceberg 是以什么形式做的更新?
对于 iceberg 的存储形式,spark 工作的运行过程哪个阶段性能有所晋升或者有所降落?
对于 iceberg 的实现形式,spark 基于其做了哪些优化?
正文完