Flink : 0.12 (引擎版本影响不大)

hudi : 0.11.0-SNAPSHOT

Time: 2022/03/14

spark 适配同理

整体流程

  1. flink 对每一行数据进行解决,结构 recorderKey(蕴含分区门路)
  2. 通过 Hudi Metadata 获取指定分区门路所有满足条件的小文件(fileId)
  3. 对小文件进行结构生成 AssignState,通过计算历史均匀每一行数据的大小,计算每个小文件还能再存入多少条数据。将 AssignState 用分区门路缓存。
  4. 对每行数据反复上述操作,如果是曾经缓存过的分区门路,间接获取 AssginState,更新每个小文件残余存入个数
  5. 如果小文件残余容量有余,就会创立新的 fileId 进行写入
  6. 待 checkpoint 触发写出

源码剖析

BucketAssignFunction.java

flink processElement,hudi 会通过其计算的每一条数据的 recordKey 失去 partitionPath

private HoodieRecordLocation getNewRecordLocation(String partitionPath) {    // // 通过 recordKey 失去 partitionPath,获取对应分区的小文件信息,察看下一个代码块    final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);    final HoodieRecordLocation location;    switch (bucketInfo.getBucketType()) {      case INSERT:        // This is an insert bucket, use HoodieRecordLocation instant time as "I".        // Downstream operators can then check the instant time to know whether        // a record belongs to an insert bucket.        location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());        break;      case UPDATE:        location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());        break;      default:        throw new AssertionError();    }    return location;  }

BucketAssigner.java

public BucketInfo addInsert(String partitionPath) {    // 获取小文件,察看下一个代码块,而后回来    SmallFileAssign smallFileAssign = getSmallFileAssign(partitionPath);    // assign 判断小文件是否还能再调配,不能超过 totalAssgin    if (smallFileAssign != null && smallFileAssign.assign()) {      return new BucketInfo(BucketType.UPDATE, smallFileAssign.getFileId(), partitionPath);    }    // 上面就是创立新的 fileId 写入    ...  }private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) {    // 判断是否缓存了对应分区门路的小文件信息    if (smallFileAssignMap.containsKey(partitionPath)) {      return smallFileAssignMap.get(partitionPath);    }      // writeProfile.getSmallFiles 获取小文件,察看下一个代码块,而后回来    List<SmallFile> smallFiles = smallFilesOfThisTask(writeProfile.getSmallFiles(partitionPath));    if (smallFiles.size() > 0) {      LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);      // 重点关注:      //       //留神这里:小文件返回后结构 assignState, 在初始化 SmallFileAssignState 时,会通过计算历史的均匀每行数据的大小,如果没有,默认 1024 byte 作为每一行大小。      // 而后 (文件最大配置 - 小文件大小) / 均匀行大小 = 这个小文件还能调配的行数(totalUnassigned      // 后续的 processElement 每次就会调用 SmallFileAssignState assigned 调配(如果 partitionUrl 雷同),直到调配完      //      // // 重点关注:      SmallFileAssignState[] states = smallFiles.stream()          .map(smallFile -> new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, writeProfile.getAvgSize()))          .toArray(SmallFileAssignState[]::new);      SmallFileAssign assign = new SmallFileAssign(states);      smallFileAssignMap.put(partitionPath, assign);      return assign;    }    smallFileAssignMap.put(partitionPath, null);    return null;  }

WriteProfile.java

public synchronized List<SmallFile> getSmallFiles(String partitionPath) {    // lookup the cache first    if (smallFilesMap.containsKey(partitionPath)) {      return smallFilesMap.get(partitionPath);    }    List<SmallFile> smallFiles = new ArrayList<>();    if (config.getParquetSmallFileLimit() <= 0) {      this.smallFilesMap.put(partitionPath, smallFiles);      return smallFiles;    }    // 获取小文件,调用 smallFilesProfile    smallFiles = smallFilesProfile(partitionPath);    this.smallFilesMap.put(partitionPath, smallFiles);    return smallFiles;  }// 非 MOR 表实现,MOR 表调用 DeltaWriteProfile.smallFIleProfileprotected List<SmallFile> smallFilesProfile(String partitionPath) {    // smallFiles only for partitionPath    List<SmallFile> smallFileLocations = new ArrayList<>();    HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();    if (!commitTimeline.empty()) { // if we have some commits      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();      // 获取指定分区下的所有文件(应用 Metadata 获取的 fsView)      List<HoodieBaseFile> allFiles = fsView          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());      for (HoodieBaseFile file : allFiles) {        // 过滤出满足条件的文件        // 小于 hoodie.parquet.small.file.limit 默认 100M,并且大于 0 的文件        if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) {          String filename = file.getFileName();          SmallFile sf = new SmallFile();          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));          sf.sizeBytes = file.getFileSize();          smallFileLocations.add(sf);        }      }    }    return smallFileLocations;  }

如果是 MOR 表,DeltaWriteProfile.java

@Override  protected List<SmallFile> smallFilesProfile(String partitionPath) {    // smallFiles only for partitionPath    List<SmallFile> smallFileLocations = new ArrayList<>();    // Init here since this class (and member variables) might not have been initialized    HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();    // Find out all eligible small file slices    if (!commitTimeline.empty()) {      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();      // find the smallest file in partition and append to it      List<FileSlice> allSmallFileSlices = new ArrayList<>();      // If we can index log files, we can add more inserts to log files for fileIds including those under      // pending compaction.      // 获取 base_file + log_file 的文件偏      List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)          .collect(Collectors.toList());      for (FileSlice fileSlice : allFileSlices) {        // 判断是否满足小文件的条件         // (baseFileSize + totalLogFileSize * ratio) < hoodie.parquet.max.file.size(120M)        // 这里的 ratio 是 hoodie.logfile.to.parquet.compression.ratio,默认 0.35        if (isSmallFile(fileSlice)) {          allSmallFileSlices.add(fileSlice);        }      }      // Create SmallFiles from the eligible file slices      for (FileSlice smallFileSlice : allSmallFileSlices) {        SmallFile sf = new SmallFile();        if (smallFileSlice.getBaseFile().isPresent()) {          // TODO : Move logic of file name, file id, base commit time handling inside file slice          String filename = smallFileSlice.getBaseFile().get().getFileName();          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));          sf.sizeBytes = getTotalFileSize(smallFileSlice);          smallFileLocations.add(sf);        } else {          smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> {            // in case there is something error, and the file slice has no log file            sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),                FSUtils.getFileIdFromLogPath(logFile.getPath()));            sf.sizeBytes = getTotalFileSize(smallFileSlice);            smallFileLocations.add(sf);          });        }      }    }    return smallFileLocations;  }

留神点

  • 解决小文件的形式不是追加写文件,而是应用雷同的 fileId 生成新的版本号,所以可能会有文件数并没有升高的疑难。设置适合的版本历史和 clean service 主动清理历史版本数据

欢送关注公众号