本篇文章意在通过某个性能逐渐相熟 hudi 整体架构上的实现,不会探讨算法的实现细节
hudi 新人,有问题欢送斧正
spark : version, 3.1.2
hudi : branch, master
Time: 2022/02/06 第一版
目标:通过扭转数据布局的形式,缩小 data scan 的数据量。
举个简略的栗子:
- 一张 text 表,蕴含 id,name 两个字段
有两个数据文件 a.parquet 和 b.parquet
- a.parquet 数据 2,zs、1,ls、4,wu, 3,ts
- b.parquet 数据 1,ls、2,zs、4,wu 5,ts
- 这时候咱们须要对 id = 2 做过滤数量统计,须要扫描 a.parquet 和 b.parquet 两个文件
对数据进行排序并进行 Min/Max 索引记录后
- a.parquet 数据 1,ls、1,ls、2,zs、2,zs Min-id:1 | Max-id:2
- b.parquet 数据 3,ts、4,wu、4,wu、5,ts Min-id:3 | Max-id:5
- 这时候咱们须要对 id = 2 做过滤数量统计,只须要扫描 a.parquet 一个文件
查问阶段
入口类,DefaultSource,createRelation办法 创立 Relation
type 条件:
- hoodie.datasource.query.type 为 read_optimized 时,hoodie.table.type 为 cow 或者 mor
- hoodie.datasource.query.type 为 snapshot 时,hoodie.table.type 为 cow
满足下面的条件后,会创立 HadoopFsRelation。其中,须要创立 HoodieFileIndex 并初始化,在初始化 HoodieFileIndex 的时候,会调用 refresh0() 构建须要查问的 fileSice,如果是分区表,仍然会读取 list 所有分区目录下的文件门路以及相干信息,缓存在 cachedAllInputFileSlices
private def refresh0(): Unit = { val startTime = System.currentTimeMillis() // 加载所有分区的 file val partitionFiles = loadPartitionPathFiles() val allFiles = partitionFiles.values.reduceOption(_ ++ _) .getOrElse(Array.empty[FileStatus]) metaClient.reloadActiveTimeline() val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants val latestInstant = activeInstants.lastInstant() fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) val queryInstant = if (specifiedQueryInstant.isDefined) { specifiedQueryInstant } else if (latestInstant.isPresent) { Some(latestInstant.get.getTimestamp) } else { None } (tableType, queryType) match { case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => // Fetch and store latest base and log files, and their sizes // 所有的 FileSlices 会存储在这 cachedAllInputFileSlices = partitionFiles.map(p => { val latestSlices = if (latestInstant.isPresent) { fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) .iterator().asScala.toSeq } else { Seq() } (p._1, latestSlices) }) cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { if (fileSlice.getBaseFile.isPresent) { fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum } else { fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum } }).sum case (_, _) => // Fetch and store latest base files and its sizes cachedAllInputFileSlices = partitionFiles.map(p => { val fileSlices = specifiedQueryInstant .map(instant => fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true)) .getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath)) .iterator().asScala.toSeq (p._1, fileSlices) }) cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum }
HoodieFileIndex 继承 org.apache.spark.sql.execution.datasources.FileIndex 实现 listFiles 接口,用来读取 zIndex 索引以及达到分区过滤的成果。HoodieFileIndex 作为 HadoopFsRelation 的 location 的实现作为 data input 的数据
HoodieFileIndex 的 listFiles 实现
lookupCandidateFilesInZIndex 会应用 ZIndex 找到须要读取的文件
// 代码片段override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { // Look up candidate files names in the Z-index, if all of the following conditions are true // - Data-skipping is enabled // - Z-index is present // - List of predicates (filters) is present val candidateFilesNamesOpt: Option[Set[String]] = lookupCandidateFilesInZIndex(dataFilters) match { case Success(opt) => opt case Failure(e) => if (e.isInstanceOf[AnalysisException]) { logDebug("Failed to relay provided data filters to Z-index lookup", e) } else { logError("Failed to lookup candidate files in Z-index", e) } Option.empty } ...}
先看 .zindex 索引的文件数据 /.zindex/20220202225600359/part-00000-3b9cdcd9-28ed-4cef-8f97-2bb6097b1445-c000.snappy.parquet(这里的例子是按 name,id 进行 zorder 排序),能够看出,每一行数据对应一个数据文件,并且记录了每个数据文件列 name 和列 id 的最大最小值,以实现数据过滤。
(注:数据是测试数据,可能不合乎过滤逻辑,因为每一个数据文件只有一行数据,并且没有达到 compact 的条件)
{"file": "859ccf12-253f-40d5-ba0b-a831e48e4f16-0_0-45-468_20220202155755496.parquet", "name_minValue": "cql", "name_maxValue": "wlq", "name_num_nulls": 0, "id_minValue": 1, "id_maxValue": 9, "id_num_nulls": 0}{"file": "9f3054c8-5f57-45c8-8bdf-400707edd2d3-0_0-26-29_20220202223215267.parquet", "name_minValue": "cql", "name_maxValue": "wlq_update", "name_num_nulls": 0, "id_minValue": 1, "id_maxValue": 1, "id_num_nulls": 0}{"file": "9c76a87b-8263-41c5-a830-08698b27ec0f-0_0-49-440_20220202160249241.parquet", "name_minValue": "cql", "name_maxValue": "cql", "name_num_nulls": 0, "id_minValue": 1, "id_maxValue": 1, "id_num_nulls": 0}
lookupCandidateFilesInZIndex 实现
private def lookupCandidateFilesInZIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { // .zindex 门路,在 tableName/.hoodie/.zindex val indexPath = metaClient.getZindexPath val fs = metaClient.getFs if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) { // scalastyle:off return return Success(Option.empty) // scalastyle:on return } // Collect all index tables present in `.zindex` folder val candidateIndexTables = fs.listStatus(new Path(indexPath)) .filter(_.isDirectory) .map(_.getPath.getName) .filter(f => completedCommits.contains(f)) .sortBy(x => x) if (candidateIndexTables.isEmpty) { // scalastyle:off return return Success(Option.empty) // scalastyle:on return } val dataFrameOpt = try { Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString)) } catch { case t: Throwable => logError("Failed to read Z-index; skipping", t) None } dataFrameOpt.map(df => { val indexSchema = df.schema // 通过索引文件的 schema 和下推的 querySchame 构建出过滤表达式 val indexFilter = queryFilters.map(createZIndexLookupFilter(_, indexSchema)) .reduce(And) logInfo(s"Index filter condition: $indexFilter") df.persist() // 获取所有的 file val allIndexedFileNames = df.select("file") .collect() .map(_.getString(0)) .toSet // 过滤出满足条件的的 file val prunedCandidateFileNames = df.where(new Column(indexFilter)) .select("file") .collect() .map(_.getString(0)) .toSet df.unpersist() // NOTE: Z-index isn't guaranteed to have complete set of statistics for every // base-file: since it's bound to clustering, which could occur asynchronously // at arbitrary point in time, and is not likely to touching all of the base files. // // To close that gap, we manually compute the difference b/w all indexed (Z-index) // files and all outstanding base-files, and make sure that all base files not // represented w/in Z-index are included in the output of this method // 不是所有的历史文件,或者分区应用 zOrder 索引,所以下推失去的 file 不肯定是所有的查问后果,比方有历史文件 a.parquet,这时候新加了 zorder 优化,没有刷历史,新增数据文件 b.parquet 和 c.parquet,这时候 zindex 索引只有 b.parquet 的信息,没有 a.parquet,如果间接应用会导致数据都是。所以,假如 zindex 命中了 b.parquet,只须要排除 c.parquet 就能够了,应用下面 cachedAllInputFileSlices - c.parquet,查问的就是 a.parquet + b.parquet 文件,过滤掉 c.parqeut val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames) prunedCandidateFileNames ++ notIndexedFileNames }) }
生成阶段
// 每次 write 完执行hoodie.clustering.inline = 'true'hoodie.clustering.inline.max.commits = '1'hoodie.layout.optimize.strategy = 'z-order'hoodie.layout.optimize.enable = 'true'hoodie.clustering.plan.strategy.sort.columns = 'name,id'
类 AbstractHoodieWriteClient 在 commitStats 开释 lock 后,会 runTableServicesInline 执行相干的 compact 等操作,包含 zindex 的生成流程。流程次要分成两个步骤,首先是排序,而后再依据 replace 生成 zindex file
留神:下面的排序并不会产生在 spark 工作的外围流程中(而且能够异步执行),不会影响下次的 spark data write 的 commit 提交
runTableServicesInline 办法中,判断 inline_cluster 是否开启
重点关注 inlineCluster 办法,最终应用子类 SparkRDDWriteClient 实现的 cluster
@Overridepublic HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) { HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled()); preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient()); HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); if (pendingClusteringTimeline.containsInstant(inflightInstant)) { rollbackInflightClustering(inflightInstant, table); table.getMetaClient().reloadActiveTimeline(); } clusteringTimer = metrics.getClusteringCtx(); LOG.info("Starting clustering at " + clusteringInstant); // 依据须要构建 Partitioner 对数据进行排序,而后返回元数据 HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = table.cluster(context, clusteringInstant); JavaRDD<WriteStatus> statuses = clusteringMetadata.getWriteStatuses(); // TODO : Where is shouldComplete used ? if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { // 生成 zindex completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), statuses, table, clusteringInstant); } return clusteringMetadata;}
排序阶段
table.cluster 最终会创立 SparkExecuteClusteringCommitActionExecutor 执行相干操作
@Override public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() { HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime); // Mark instant as clustering inflight table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); table.getMetaClient().reloadActiveTimeline(); final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); // 应用性能 SparkSortAndSizeExecutionStrategy 进行排序优化,performClustering HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>) ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config)) .performClustering(clusteringPlan, schema, instantTime); JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses(); JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect()); writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata)); commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); writeMetadata.setCommitMetadata(Option.of(commitMetadata)); } return writeMetadata; }
SparkSortAndSizeExecutionStrategy 的 performClusteringWithRecordsRDD 实现
@Override public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime, final Map<String, String> strategyParams, final Schema schema, final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); Properties props = getWriteConfig().getProps(); props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); // 这里关注 getPartitioner 办法,如果开启 hoodie.layout.optimize.enable,就会返回 RDDSpatialCurveOptimizationSortPartitioner,最终调用分区器的 repartitionRecords return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); }
RDDSpatialCurveOptimizationSortPartitioner 的 repartitionRecords 内会依据 hoodie.layout.optimize.curve.build.method 调用 OrderingIndexHelper 的 createOptimizedDataFrameByXXX 办法
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, List<String> sortCols, int fileNum, String sortMode) { Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); int fieldNum = df.schema().fields().length; List<String> checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); if (sortCols.size() != checkCols.size()) { return df; } // only one col to sort, no need to use z-order if (sortCols.size() == 1) { return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(sortCols.get(0))); } Map<Integer, StructField> fieldMap = sortCols .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); // do optimize // 能够看出目前反对两种排序曲线,z 和 hilbert JavaRDD<Row> sortedRDD = null; switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) { case ZORDER: sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); break; case HILBERT: sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); break; default: throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode)); } // create new StructType List<StructField> newFields = new ArrayList<>(); newFields.addAll(Arrays.asList(df.schema().fields())); newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty())); // create new DataFrame return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index"); }
生成 Index 阶段
回到 SparkRDDWriteClient 的 cluster 办法,最终调用 completeTableService 执行 commit 操作
private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, String clusteringCommitTime) { List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { throw new HoodieClusteringException("Clustering failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } try { HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); finalizeWrite(table, clusteringCommitTime, writeStats); writeTableMetadataForTableServices(table, metadata,clusteringInstant); // Update outstanding metadata indexes if (config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) { // 更新元数据索引 table.updateMetadataIndexes(context, writeStats, clusteringCommitTime); } LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); table.getActiveTimeline().transitionReplaceInflightToComplete( HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); } finally { this.txnManager.endTransaction(); } WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (clusteringTimer != null) { long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); try { metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); } catch (ParseException e) { throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + config.getBasePath() + " at time " + clusteringCommitTime, e); } } LOG.info("Clustering successfully on commit " + clusteringCommitTime); }
HoodieSparkCopyOnWriteTable 的 updateZIndex 办法
private void updateZIndex( @Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> updatedFilesStats, @Nonnull String instantTime ) throws Exception { String sortColsList = config.getClusteringSortColumns(); String basePath = metaClient.getBasePath(); String indexPath = metaClient.getZindexPath(); // 获取所有的 commit 和 replaceCommit 的 instant 工夫 List<String> completedCommits = metaClient.getCommitsTimeline() .filterCompletedInstants() .getInstants() .map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); // 获取新写入数据文件的门路 List<String> touchedFiles = updatedFilesStats.stream() .map(s -> new Path(basePath, s.getPath()).toString()) .collect(Collectors.toList()); if (touchedFiles.isEmpty() || StringUtils.isNullOrEmpty(sortColsList) || StringUtils.isNullOrEmpty(indexPath)) { return; } LOG.info(String.format("Updating Z-index table (%s)", indexPath)); List<String> sortCols = Arrays.stream(sortColsList.split(",")) .map(String::trim) .collect(Collectors.toList()); HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context; // Fetch table schema to appropriately construct Z-index schema Schema tableWriteSchema = HoodieAvroUtils.createHoodieWriteSchema( new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields() ); // 更新索引文件 ZOrderingIndexHelper.updateZIndexFor( sparkEngineContext.getSqlContext().sparkSession(), AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema), touchedFiles, sortCols, indexPath, instantTime, completedCommits ); LOG.info(String.format("Successfully updated Z-index at instant (%s)", instantTime)); }
ZOrderingIndexHelper.updateZIndexFor
public static void updateZIndexFor( @Nonnull SparkSession sparkSession, @Nonnull StructType sourceTableSchema, @Nonnull List<String> sourceBaseFiles, @Nonnull List<String> zorderedCols, @Nonnull String zindexFolderPath, @Nonnull String commitTime, @Nonnull List<String> completedCommits ) { FileSystem fs = FSUtils.getFs(zindexFolderPath, sparkSession.sparkContext().hadoopConfiguration()); // Compose new Z-index table for the given source base files // 读取新写入数据文件的 metadata 信息,用来构建索引 Dataset<Row> newZIndexDf = buildZIndexTableFor( sparkSession, sourceBaseFiles, zorderedCols.stream() .map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)]) .collect(Collectors.toList()) ); try { // // Z-Index has the following folder structure: // // .hoodie/ // ├── .zindex/ // │ ├── <instant>/ // │ │ ├── <part-...>.parquet // │ │ └── ... // // If index is currently empty (no persisted tables), we simply create one // using clustering operation's commit instance as it's name Path newIndexTablePath = new Path(zindexFolderPath, commitTime); // 如果 .zindex 本来没有,首次写入,则间接笼罩写入,否则合并历史追加 if (!fs.exists(new Path(zindexFolderPath))) { newZIndexDf.repartition(1) .write() .format("parquet") .mode("overwrite") .save(newIndexTablePath.toString()); return; } // Filter in all index tables (w/in {@code .zindex} folder) // 获取 .zindex 目录下所有 instant 文件夹 List<String> allIndexTables = Arrays.stream( fs.listStatus(new Path(zindexFolderPath)) ) .filter(FileStatus::isDirectory) .map(f -> f.getPath().getName()) .collect(Collectors.toList()); // Compile list of valid index tables that were produced as part // of previously successfully committed iterations List<String> validIndexTables = allIndexTables.stream() .filter(completedCommits::contains) .sorted() .collect(Collectors.toList()); List<String> tablesToCleanup = allIndexTables.stream() .filter(f -> !completedCommits.contains(f)) .collect(Collectors.toList()); Dataset<Row> finalZIndexDf; // Before writing out new version of the Z-index table we need to merge it // with the most recent one that were successfully persisted previously if (validIndexTables.isEmpty()) { finalZIndexDf = newZIndexDf; } else { // NOTE: That Parquet schema might deviate from the original table schema (for ex, // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it // prior to merging, since merging might fail otherwise due to schemas incompatibility finalZIndexDf = tryMergeMostRecentIndexTableInto( sparkSession, newZIndexDf, // Load current most recent Z-index table sparkSession.read().load( new Path(zindexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString() ) ); // Clean up all index tables (after creation of the new index) tablesToCleanup.addAll(validIndexTables); } // Persist new Z-index table finalZIndexDf .repartition(1) .write() .format("parquet") .save(newIndexTablePath.toString()); // Clean up residual Z-index tables that have might have been dangling since // previous iterations (due to intermittent failures during previous clean up) tablesToCleanup.forEach(f -> { try { fs.delete(new Path(zindexFolderPath, f), true); } catch (IOException ie) { // NOTE: Exception is deliberately swallowed to not affect overall clustering operation, // since failing Z-index table will be attempted to be cleaned up upon subsequent // clustering iteration LOG.warn(String.format("Failed to cleanup residual Z-index table: %s", f), ie); } }); } catch (IOException e) { LOG.error("Failed to build new Z-index table", e); throw new HoodieException("Failed to build new Z-index table", e); } }
总结
读取:
- 查问类型为优化读取时,cow 和 mor 都能应用 zorder 索引优化,查问类型为快照时,只有 cow 能够
- scan 的文件 = 所有文件 - 应用了 zindex 的数据却没有命中的文件
写入:
- 不影响工作主流程的写入,数据写完 commit 后生成上图 20220206105117793.commit,其它生产端失常查问
- 后续的排序,索引生成,能够在每次 write 完同步执行,或者起个 scheduler 工作异步后盾执行,本次测试 inline 每次 write 完结后 cluster,最终生成 20220206105126886.replacecommit instant
相干
iceberg 自身元数据就有统计每个文件列的 Min/Max,所以实现上只须要对数据进行重排序,近期有相干的 pr Spark: Spark3 ZOrder Rewrite Strategy。仍然是须要手动触发的 Action,新增 Zorder 的排序策略,具体细节会专门写一篇来阐明,次要也是相熟 iceberg 架构和 hudi 架构上的区别
参考链接
- Hudi Z-Order and Hilbert Space Filling Curves
- parquet-mr
- Spark: Spark3 ZOrder Rewrite Strategy