共计 20227 个字符,预计需要花费 51 分钟才能阅读完成。
本篇文章意在通过某个性能逐渐相熟 hudi 整体架构上的实现,不会探讨算法的实现细节
hudi 新人,有问题欢送斧正
spark : version, 3.1.2
hudi : branch, master
Time: 2022/02/06 第一版
目标:通过扭转数据布局的形式,缩小 data scan 的数据量。
举个简略的栗子 :
- 一张 text 表,蕴含 id,name 两个字段
-
有两个数据文件 a.parquet 和 b.parquet
- a.parquet 数据 2,zs、1,ls、4,wu, 3,ts
- b.parquet 数据 1,ls、2,zs、4,wu 5,ts
- 这时候咱们须要对 id = 2 做过滤数量统计,须要扫描 a.parquet 和 b.parquet 两个文件
-
对数据进行排序并进行 Min/Max 索引记录后
- a.parquet 数据 1,ls、1,ls、2,zs、2,zs Min-id:1 | Max-id:2
- b.parquet 数据 3,ts、4,wu、4,wu、5,ts Min-id:3 | Max-id:5
- 这时候咱们须要对 id = 2 做过滤数量统计,只须要扫描 a.parquet 一个文件
查问阶段
入口类,DefaultSource,createRelation 办法 创立 Relation
type 条件:
- hoodie.datasource.query.type 为 read_optimized 时,hoodie.table.type 为 cow 或者 mor
- hoodie.datasource.query.type 为 snapshot 时,hoodie.table.type 为 cow
满足下面的条件后,会创立 HadoopFsRelation。其中,须要创立 HoodieFileIndex 并初始化,在初始化 HoodieFileIndex 的时候,会调用 refresh0() 构建须要查问的 fileSice,如果是分区表,仍然会读取 list 所有分区目录下的文件门路以及相干信息,缓存在 cachedAllInputFileSlices
private def refresh0(): Unit = {val startTime = System.currentTimeMillis()
// 加载所有分区的 file
val partitionFiles = loadPartitionPathFiles()
val allFiles = partitionFiles.values.reduceOption(_ ++ _)
.getOrElse(Array.empty[FileStatus])
metaClient.reloadActiveTimeline()
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
val latestInstant = activeInstants.lastInstant()
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
val queryInstant = if (specifiedQueryInstant.isDefined) {specifiedQueryInstant} else if (latestInstant.isPresent) {Some(latestInstant.get.getTimestamp)
} else {None}
(tableType, queryType) match {case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
// Fetch and store latest base and log files, and their sizes
// 所有的 FileSlices 会存储在这
cachedAllInputFileSlices = partitionFiles.map(p => {val latestSlices = if (latestInstant.isPresent) {fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get)
.iterator().asScala.toSeq} else {Seq()
}
(p._1, latestSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => {if (fileSlice.getBaseFile.isPresent) {fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
} else {fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
}
}).sum
case (_, _) =>
// Fetch and store latest base files and its sizes
cachedAllInputFileSlices = partitionFiles.map(p => {
val fileSlices = specifiedQueryInstant
.map(instant =>
fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true))
.getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath))
.iterator().asScala.toSeq
(p._1, fileSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum
}
HoodieFileIndex 继承 org.apache.spark.sql.execution.datasources.FileIndex 实现 listFiles 接口,用来读取 zIndex 索引以及达到分区过滤的成果。HoodieFileIndex 作为 HadoopFsRelation 的 location 的实现作为 data input 的数据
HoodieFileIndex 的 listFiles 实现
lookupCandidateFilesInZIndex 会应用 ZIndex 找到须要读取的文件
// 代码片段
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// Look up candidate files names in the Z-index, if all of the following conditions are true
// - Data-skipping is enabled
// - Z-index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInZIndex(dataFilters) match {case Success(opt) => opt
case Failure(e) =>
if (e.isInstanceOf[AnalysisException]) {logDebug("Failed to relay provided data filters to Z-index lookup", e)
} else {logError("Failed to lookup candidate files in Z-index", e)
}
Option.empty
}
...
}
先看 .zindex 索引的文件数据 /.zindex/20220202225600359/part-00000-3b9cdcd9-28ed-4cef-8f97-2bb6097b1445-c000.snappy.parquet(这里的例子是按 name,id 进行 zorder 排序),能够看出,每一行数据对应一个数据文件,并且记录了每个数据文件列 name 和列 id 的最大最小值,以实现数据过滤。
(注:数据是测试数据,可能不合乎过滤逻辑,因为每一个数据文件只有一行数据,并且没有达到 compact 的条件)
{"file": "859ccf12-253f-40d5-ba0b-a831e48e4f16-0_0-45-468_20220202155755496.parquet", "name_minValue": "cql", "name_maxValue": "wlq", "name_num_nulls": 0, "id_minValue": 1, "id_maxValue": 9, "id_num_nulls": 0}
{"file": "9f3054c8-5f57-45c8-8bdf-400707edd2d3-0_0-26-29_20220202223215267.parquet", "name_minValue": "cql", "name_maxValue": "wlq_update", "name_num_nulls": 0, "id_minValue": 1, "id_maxValue": 1, "id_num_nulls": 0}
{"file": "9c76a87b-8263-41c5-a830-08698b27ec0f-0_0-49-440_20220202160249241.parquet", "name_minValue": "cql", "name_maxValue": "cql", "name_num_nulls": 0, "id_minValue": 1, "id_maxValue": 1, "id_num_nulls": 0}
lookupCandidateFilesInZIndex 实现
private def lookupCandidateFilesInZIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
// .zindex 门路,在 tableName/.hoodie/.zindex
val indexPath = metaClient.getZindexPath
val fs = metaClient.getFs
if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) {
// scalastyle:off return
return Success(Option.empty)
// scalastyle:on return
}
// Collect all index tables present in `.zindex` folder
val candidateIndexTables =
fs.listStatus(new Path(indexPath))
.filter(_.isDirectory)
.map(_.getPath.getName)
.filter(f => completedCommits.contains(f))
.sortBy(x => x)
if (candidateIndexTables.isEmpty) {
// scalastyle:off return
return Success(Option.empty)
// scalastyle:on return
}
val dataFrameOpt = try {Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
} catch {
case t: Throwable =>
logError("Failed to read Z-index; skipping", t)
None
}
dataFrameOpt.map(df => {
val indexSchema = df.schema
// 通过索引文件的 schema 和下推的 querySchame 构建出过滤表达式
val indexFilter =
queryFilters.map(createZIndexLookupFilter(_, indexSchema))
.reduce(And)
logInfo(s"Index filter condition: $indexFilter")
df.persist()
// 获取所有的 file
val allIndexedFileNames =
df.select("file")
.collect()
.map(_.getString(0))
.toSet
// 过滤出满足条件的的 file
val prunedCandidateFileNames =
df.where(new Column(indexFilter))
.select("file")
.collect()
.map(_.getString(0))
.toSet
df.unpersist()
// NOTE: Z-index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to touching all of the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (Z-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in Z-index are included in the output of this method
// 不是所有的历史文件,或者分区应用 zOrder 索引,所以下推失去的 file 不肯定是所有的查问后果,比方有历史文件 a.parquet,这时候新加了 zorder 优化,没有刷历史,新增数据文件 b.parquet 和 c.parquet,这时候 zindex 索引只有 b.parquet 的信息,没有 a.parquet,如果间接应用会导致数据都是。所以,假如 zindex 命中了 b.parquet,只须要排除 c.parquet 就能够了,应用下面 cachedAllInputFileSlices - c.parquet,查问的就是 a.parquet + b.parquet 文件,过滤掉 c.parqeut
val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)
prunedCandidateFileNames ++ notIndexedFileNames
})
}
生成阶段
// 每次 write 完执行
hoodie.clustering.inline = 'true'
hoodie.clustering.inline.max.commits = '1'
hoodie.layout.optimize.strategy = 'z-order'
hoodie.layout.optimize.enable = 'true'
hoodie.clustering.plan.strategy.sort.columns = 'name,id'
类 AbstractHoodieWriteClient 在 commitStats 开释 lock 后,会 runTableServicesInline 执行相干的 compact 等操作,包含 zindex 的生成流程。流程次要分成两个步骤,首先是排序,而后再依据 replace 生成 zindex file
留神:下面的排序并不会产生在 spark 工作的外围流程中(而且能够异步执行),不会影响下次的 spark data write 的 commit 提交
runTableServicesInline 办法中,判断 inline_cluster 是否开启
重点关注 inlineCluster 办法,最终应用子类 SparkRDDWriteClient 实现的 cluster
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled());
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {rollbackInflightClustering(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
}
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at" + clusteringInstant);
// 依据须要构建 Partitioner 对数据进行排序,而后返回元数据
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = table.cluster(context, clusteringInstant);
JavaRDD<WriteStatus> statuses = clusteringMetadata.getWriteStatuses();
// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
// 生成 zindex
completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), statuses, table, clusteringInstant);
}
return clusteringMetadata;
}
排序阶段
table.cluster 最终会创立 SparkExecuteClusteringCommitActionExecutor 执行相干操作
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
// 应用性能 SparkSortAndSizeExecutionStrategy 进行排序优化,performClustering
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}
SparkSortAndSizeExecutionStrategy 的 performClusteringWithRecordsRDD 实现
@Override
public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + "commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
// 这里关注 getPartitioner 办法,如果开启 hoodie.layout.optimize.enable,就会返回 RDDSpatialCurveOptimizationSortPartitioner,最终调用分区器的 repartitionRecords
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
RDDSpatialCurveOptimizationSortPartitioner 的 repartitionRecords 内会依据 hoodie.layout.optimize.curve.build.method 调用 OrderingIndexHelper 的 createOptimizedDataFrameByXXX 办法
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, List<String> sortCols, int fileNum, String sortMode) {Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (sortCols.size() != checkCols.size()) {return df;}
// only one col to sort, no need to use z-order
if (sortCols.size() == 1) {return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(sortCols.get(0)));
}
Map<Integer, StructField> fieldMap = sortCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// do optimize
// 能够看出目前反对两种排序曲线,z 和 hilbert
JavaRDD<Row> sortedRDD = null;
switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
default:
throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode));
}
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index");
}
生成 Index 阶段
回到 SparkRDDWriteClient 的 cluster 办法,最终调用 completeTableService 执行 commit 操作
private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String clusteringCommitTime) {List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}
try {HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
finalizeWrite(table, clusteringCommitTime, writeStats);
writeTableMetadataForTableServices(table, metadata,clusteringInstant);
// Update outstanding metadata indexes
if (config.isLayoutOptimizationEnabled()
&& !config.getClusteringSortColumns().isEmpty()) {
// 更新元数据索引
table.updateMetadataIndexes(context, writeStats, clusteringCommitTime);
}
LOG.info("Committing Clustering" + clusteringCommitTime + ". Finished with result" + metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {throw new HoodieClusteringException("unable to transition clustering inflight to complete:" + clusteringCommitTime, e);
} finally {this.txnManager.endTransaction();
}
WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (clusteringTimer != null) {long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction"
+ config.getBasePath() + "at time" + clusteringCommitTime, e);
}
}
LOG.info("Clustering successfully on commit" + clusteringCommitTime);
}
HoodieSparkCopyOnWriteTable 的 updateZIndex 办法
private void updateZIndex(
@Nonnull HoodieEngineContext context,
@Nonnull List<HoodieWriteStat> updatedFilesStats,
@Nonnull String instantTime
) throws Exception {String sortColsList = config.getClusteringSortColumns();
String basePath = metaClient.getBasePath();
String indexPath = metaClient.getZindexPath();
// 获取所有的 commit 和 replaceCommit 的 instant 工夫
List<String> completedCommits =
metaClient.getCommitsTimeline()
.filterCompletedInstants()
.getInstants()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
// 获取新写入数据文件的门路
List<String> touchedFiles =
updatedFilesStats.stream()
.map(s -> new Path(basePath, s.getPath()).toString())
.collect(Collectors.toList());
if (touchedFiles.isEmpty() || StringUtils.isNullOrEmpty(sortColsList) || StringUtils.isNullOrEmpty(indexPath)) {return;}
LOG.info(String.format("Updating Z-index table (%s)", indexPath));
List<String> sortCols = Arrays.stream(sortColsList.split(","))
.map(String::trim)
.collect(Collectors.toList());
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context;
// Fetch table schema to appropriately construct Z-index schema
Schema tableWriteSchema =
HoodieAvroUtils.createHoodieWriteSchema(new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields());
// 更新索引文件
ZOrderingIndexHelper.updateZIndexFor(sparkEngineContext.getSqlContext().sparkSession(),
AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema),
touchedFiles,
sortCols,
indexPath,
instantTime,
completedCommits
);
LOG.info(String.format("Successfully updated Z-index at instant (%s)", instantTime));
}
ZOrderingIndexHelper.updateZIndexFor
public static void updateZIndexFor(
@Nonnull SparkSession sparkSession,
@Nonnull StructType sourceTableSchema,
@Nonnull List<String> sourceBaseFiles,
@Nonnull List<String> zorderedCols,
@Nonnull String zindexFolderPath,
@Nonnull String commitTime,
@Nonnull List<String> completedCommits
) {FileSystem fs = FSUtils.getFs(zindexFolderPath, sparkSession.sparkContext().hadoopConfiguration());
// Compose new Z-index table for the given source base files
// 读取新写入数据文件的 metadata 信息,用来构建索引
Dataset<Row> newZIndexDf =
buildZIndexTableFor(
sparkSession,
sourceBaseFiles,
zorderedCols.stream()
.map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)])
.collect(Collectors.toList())
);
try {
//
// Z-Index has the following folder structure:
//
// .hoodie/
// ├── .zindex/
// │ ├── <instant>/
// │ │ ├── <part-...>.parquet
// │ │ └── ...
//
// If index is currently empty (no persisted tables), we simply create one
// using clustering operation's commit instance as it's name
Path newIndexTablePath = new Path(zindexFolderPath, commitTime);
// 如果 .zindex 本来没有,首次写入,则间接笼罩写入,否则合并历史追加
if (!fs.exists(new Path(zindexFolderPath))) {newZIndexDf.repartition(1)
.write()
.format("parquet")
.mode("overwrite")
.save(newIndexTablePath.toString());
return;
}
// Filter in all index tables (w/in {@code .zindex} folder)
// 获取 .zindex 目录下所有 instant 文件夹
List<String> allIndexTables =
Arrays.stream(fs.listStatus(new Path(zindexFolderPath))
)
.filter(FileStatus::isDirectory)
.map(f -> f.getPath().getName())
.collect(Collectors.toList());
// Compile list of valid index tables that were produced as part
// of previously successfully committed iterations
List<String> validIndexTables =
allIndexTables.stream()
.filter(completedCommits::contains)
.sorted()
.collect(Collectors.toList());
List<String> tablesToCleanup =
allIndexTables.stream()
.filter(f -> !completedCommits.contains(f))
.collect(Collectors.toList());
Dataset<Row> finalZIndexDf;
// Before writing out new version of the Z-index table we need to merge it
// with the most recent one that were successfully persisted previously
if (validIndexTables.isEmpty()) {finalZIndexDf = newZIndexDf;} else {
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
// prior to merging, since merging might fail otherwise due to schemas incompatibility
finalZIndexDf =
tryMergeMostRecentIndexTableInto(
sparkSession,
newZIndexDf,
// Load current most recent Z-index table
sparkSession.read().load(new Path(zindexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString())
);
// Clean up all index tables (after creation of the new index)
tablesToCleanup.addAll(validIndexTables);
}
// Persist new Z-index table
finalZIndexDf
.repartition(1)
.write()
.format("parquet")
.save(newIndexTablePath.toString());
// Clean up residual Z-index tables that have might have been dangling since
// previous iterations (due to intermittent failures during previous clean up)
tablesToCleanup.forEach(f -> {
try {fs.delete(new Path(zindexFolderPath, f), true);
} catch (IOException ie) {
// NOTE: Exception is deliberately swallowed to not affect overall clustering operation,
// since failing Z-index table will be attempted to be cleaned up upon subsequent
// clustering iteration
LOG.warn(String.format("Failed to cleanup residual Z-index table: %s", f), ie);
}
});
} catch (IOException e) {LOG.error("Failed to build new Z-index table", e);
throw new HoodieException("Failed to build new Z-index table", e);
}
}
总结
读取:
- 查问类型为优化读取时,cow 和 mor 都能应用 zorder 索引优化,查问类型为快照时,只有 cow 能够
- scan 的文件 = 所有文件 – 应用了 zindex 的数据却没有命中的文件
写入:
- 不影响工作主流程的写入,数据写完 commit 后生成上图 20220206105117793.commit,其它生产端失常查问
- 后续的排序,索引生成,能够在每次 write 完同步执行,或者起个 scheduler 工作异步后盾执行,本次测试 inline 每次 write 完结后 cluster,最终生成 20220206105126886.replacecommit instant
相干
iceberg 自身元数据就有统计每个文件列的 Min/Max,所以实现上只须要对数据进行重排序,近期有相干的 pr Spark: Spark3 ZOrder Rewrite Strategy。仍然是须要手动触发的 Action,新增 Zorder 的排序策略,具体细节会专门写一篇来阐明,次要也是相熟 iceberg 架构和 hudi 架构上的区别
参考链接
- Hudi Z-Order and Hilbert Space Filling Curves
- parquet-mr
- Spark: Spark3 ZOrder Rewrite Strategy