diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 3b5d12332145..a70bfd256c08 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -41,7 +41,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -118,17 +120,23 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName()); - Map>> cleanOpsWithPartitionMeta = context - .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism) - .stream() - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - Map> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))); - - List partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey) - .collect(Collectors.toList()); + Map> cleanOps = new HashMap<>(); + List partitionsToDelete = new ArrayList<>(); + for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) { + // Handles at most 'cleanerParallelism' number of partitions once at a time to avoid overlarge memory pressure to the timeline server + // (remote or local embedded), thus to reduce the risk of an OOM exception. + List subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size())); + Map>> cleanOpsWithPartitionMeta = context + .map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism) + .stream() + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + cleanOps.putAll(cleanOpsWithPartitionMeta.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())))); + + partitionsToDelete.addAll(cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey) + .collect(Collectors.toList())); + } return new HoodieCleanerPlan(earliestInstant .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 112034fd877a..efbca863e507 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -254,7 +254,7 @@ private Pair> getFilesToCleanKeepingLatestVersions( // In other words, the file versions only apply to the active file groups. deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty())); boolean toDeletePartition = false; - List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + List fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { int keepVersions = config.getCleanerFileVersionsRetained(); // do not cleanup slice required for pending compaction @@ -329,7 +329,7 @@ private Pair> getFilesToCleanKeepingLatestCommits(S // all replaced file groups before earliestCommitToRetain are eligible to clean deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetain)); // add active files - List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + List fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java index 2d2c2a36643d..0d07bed531a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java @@ -121,7 +121,7 @@ protected Stream getFileSlicesEligibleForClustering(String partition) .collect(Collectors.toSet()); fgIdsInPendingCompactionLogCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet())); - return hoodieTable.getSliceView().getLatestFileSlices(partition) + return hoodieTable.getSliceView().getLatestFileSlicesStateless(partition) // file ids already in clustering are not eligible .filter(slice -> !fgIdsInPendingCompactionLogCompactionAndClustering.contains(slice.getFileGroupId())); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java index 0f3beb136b22..2626bc599186 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java @@ -117,7 +117,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr Option instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); List operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView - .getLatestFileSlices(partitionPath) + .getLatestFileSlicesStateless(partitionPath) .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering, instantRange)) .map(s -> { List logFiles = s.getLogFiles() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index b3dc0fbce0ad..14f55b76889d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -433,6 +433,19 @@ protected Map, FileStatus[]> listPartitions( return fileStatusMap; } + /** + * Returns all files situated at the given partition. + */ + private FileStatus[] getAllFilesInPartition(String relativePartitionPath) throws IOException { + Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), relativePartitionPath); + long beginLsTs = System.currentTimeMillis(); + FileStatus[] statuses = listPartition(partitionPath); + long endLsTs = System.currentTimeMillis(); + LOG.debug("#files found in partition (" + relativePartitionPath + ") =" + statuses.length + ", Time taken =" + + (endLsTs - beginLsTs)); + return statuses; + } + /** * Allows lazily loading the partitions if needed. * @@ -449,15 +462,7 @@ private void ensurePartitionLoadedCorrectly(String partition) { // Not loaded yet try { LOG.info("Building file system view for partition (" + partitionPathStr + ")"); - - Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr); - long beginLsTs = System.currentTimeMillis(); - FileStatus[] statuses = listPartition(partitionPath); - long endLsTs = System.currentTimeMillis(); - LOG.debug("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken =" - + (endLsTs - beginLsTs)); - List groups = addFilesToView(statuses); - + List groups = addFilesToView(getAllFilesInPartition(partitionPathStr)); if (groups.isEmpty()) { storePartitionView(partitionPathStr, new ArrayList<>()); } @@ -598,24 +603,32 @@ private FileSlice filterUncommittedLogs(FileSlice fileSlice) { } protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) { + return addBootstrapBaseFileIfPresent(fileGroup, this::getBootstrapBaseFile); + } + + protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup, Function> bootstrapBaseFileMappingFunc) { boolean hasBootstrapBaseFile = fileGroup.getAllFileSlices() .anyMatch(fs -> fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)); if (hasBootstrapBaseFile) { HoodieFileGroup newFileGroup = new HoodieFileGroup(fileGroup); newFileGroup.getAllFileSlices().filter(fs -> fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) .forEach(fs -> fs.setBaseFile( - addBootstrapBaseFileIfPresent(fs.getFileGroupId(), fs.getBaseFile().get()))); + addBootstrapBaseFileIfPresent(fs.getFileGroupId(), fs.getBaseFile().get(), bootstrapBaseFileMappingFunc))); return newFileGroup; } return fileGroup; } protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice) { + return addBootstrapBaseFileIfPresent(fileSlice, this::getBootstrapBaseFile); + } + + protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice, Function> bootstrapBaseFileMappingFunc) { if (fileSlice.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) { FileSlice copy = new FileSlice(fileSlice); copy.getBaseFile().ifPresent(dataFile -> { Option edf = getBootstrapBaseFile(copy.getFileGroupId()); - edf.ifPresent(e -> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile())); + bootstrapBaseFileMappingFunc.apply(copy.getFileGroupId()).ifPresent(e -> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile())); }); return copy; } @@ -623,10 +636,16 @@ protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice) { } protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGroupId, HoodieBaseFile baseFile) { + return addBootstrapBaseFileIfPresent(fileGroupId, baseFile, this::getBootstrapBaseFile); + } + + protected HoodieBaseFile addBootstrapBaseFileIfPresent( + HoodieFileGroupId fileGroupId, + HoodieBaseFile baseFile, + Function> bootstrapBaseFileMappingFunc) { if (baseFile.getCommitTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) { HoodieBaseFile copy = new HoodieBaseFile(baseFile); - Option edf = getBootstrapBaseFile(fileGroupId); - edf.ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile())); + bootstrapBaseFileMappingFunc.apply(fileGroupId).ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile())); return copy; } return baseFile; @@ -706,7 +725,6 @@ public final Stream getLatestBaseFilesBeforeOrOn(String partitio public final Map> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) { try { readLock.lock(); - List formattedPartitionList = ensureAllPartitionsLoadedCorrectly(); return formattedPartitionList.stream().collect(Collectors.toMap( Function.identity(), @@ -824,6 +842,31 @@ public final Stream getLatestFileSlices(String partitionStr) { } } + @Override + public final Stream getLatestFileSlicesStateless(String partitionStr) { + String partition = formatPartitionKey(partitionStr); + if (isPartitionAvailableInStore(partition)) { + return getLatestFileSlices(partition); + } else { + try { + Stream fileSliceStream = buildFileGroups(getAllFilesInPartition(partition), visibleCommitsAndCompactionTimeline, true).stream() + .filter(fg -> !isFileGroupReplaced(fg)) + .map(HoodieFileGroup::getLatestFileSlice) + .filter(Option::isPresent).map(Option::get) + .flatMap(slice -> this.filterUncommittedFiles(slice, true)); + if (bootstrapIndex.useIndex()) { + final Map bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition); + if (!bootstrapBaseFileMappings.isEmpty()) { + return fileSliceStream.map(fileSlice -> addBootstrapBaseFileIfPresent(fileSlice, fileGroupId -> Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId)))); + } + } + return fileSliceStream; + } catch (IOException e) { + throw new HoodieIOException("Failed to fetch all files in partition " + partition, e); + } + } + } + /** * Get Latest File Slice for a given fileId in a given partition. */ @@ -1014,6 +1057,39 @@ public final Stream getAllFileGroups(String partitionStr) { return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg)); } + @Override + public final Stream getAllFileGroupsStateless(String partitionStr) { + String partition = formatPartitionKey(partitionStr); + if (isPartitionAvailableInStore(partition)) { + return getAllFileGroups(partition); + } else { + try { + Stream fileGroupStream = buildFileGroups(getAllFilesInPartition(partition), visibleCommitsAndCompactionTimeline, true).stream() + .filter(fg -> !isFileGroupReplaced(fg)); + if (bootstrapIndex.useIndex()) { + final Map bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition); + if (!bootstrapBaseFileMappings.isEmpty()) { + return fileGroupStream.map(fileGroup -> addBootstrapBaseFileIfPresent(fileGroup, fileGroupId -> Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId)))); + } + } + return fileGroupStream; + } catch (IOException e) { + throw new HoodieIOException("Failed to fetch all files in partition " + partition, e); + } + } + } + + private Map getBootstrapBaseFileMappings(String partition) { + try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) { + LOG.info("Bootstrap Index available for partition " + partition); + List sourceFileMappings = + reader.getSourceFileMappingForPartition(partition); + return sourceFileMappings.stream() + .map(s -> new BootstrapBaseFileMapping(new HoodieFileGroupId(s.getPartitionPath(), + s.getFileId()), s.getBootstrapFileStatus())).collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId, s -> s)); + } + } + private Stream getAllFileGroupsIncludingReplaced(final String partitionStr) { try { readLock.lock(); @@ -1029,22 +1105,38 @@ private Stream getAllFileGroupsIncludingReplaced(final String p @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { - return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime)); + String partition = formatPartitionKey(partitionPath); + if (hasReplacedFilesInPartition(partition)) { + return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime)); + } + return Stream.empty(); } @Override public Stream getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) { - return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime)); + String partition = formatPartitionKey(partitionPath); + if (hasReplacedFilesInPartition(partition)) { + return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime)); + } + return Stream.empty(); } @Override public Stream getReplacedFileGroupsAfterOrOn(String minCommitTime, String partitionPath) { - return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime)); + String partition = formatPartitionKey(partitionPath); + if (hasReplacedFilesInPartition(partition)) { + return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime)); + } + return Stream.empty(); } @Override public Stream getAllReplacedFileGroups(String partitionPath) { - return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplaced(fg.getFileGroupId())); + String partition = formatPartitionKey(partitionPath); + if (hasReplacedFilesInPartition(partition)) { + return getAllFileGroupsIncludingReplaced(partition).filter(fg -> isFileGroupReplaced(fg.getFileGroupId())); + } + return Stream.empty(); } @Override @@ -1263,6 +1355,11 @@ protected abstract Option> getPendingLogCompac */ protected abstract void removeReplacedFileIdsAtInstants(Set instants); + /** + * Returns whether there are replaced files within the given partition. + */ + protected abstract boolean hasReplacedFilesInPartition(String partitionPath); + /** * Track instant time for file groups replaced. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index bb98c97e28d5..f1b56ebe5196 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -408,6 +408,11 @@ protected void removeReplacedFileIdsAtInstants(Set instants) { fgIdToReplaceInstants.entrySet().removeIf(entry -> instants.contains(entry.getValue().getTimestamp())); } + @Override + protected boolean hasReplacedFilesInPartition(String partitionPath) { + return fgIdToReplaceInstants.keySet().stream().anyMatch(fg -> fg.getPartitionPath().equals(partitionPath)); + } + @Override protected Option getReplaceInstant(final HoodieFileGroupId fileGroupId) { return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index e30b9f425d28..56d7c7cc25cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -182,6 +182,11 @@ public Stream getLatestFileSlices(String partitionPath) { return execute(partitionPath, preferredView::getLatestFileSlices, secondaryView::getLatestFileSlices); } + @Override + public Stream getLatestFileSlicesStateless(String partitionPath) { + return execute(partitionPath, preferredView::getLatestFileSlicesStateless, secondaryView::getLatestFileSlicesStateless); + } + @Override public Stream getLatestUnCompactedFileSlices(String partitionPath) { return execute(partitionPath, preferredView::getLatestUnCompactedFileSlices, @@ -222,6 +227,11 @@ public Stream getAllFileGroups(String partitionPath) { return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); } + @Override + public Stream getAllFileGroupsStateless(String partitionPath) { + return execute(partitionPath, preferredView::getAllFileGroupsStateless, secondaryView::getAllFileGroupsStateless); + } + @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index a6318608bcf7..4363a7daf271 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -68,6 +68,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, private static final String BASE_URL = "/v1/hoodie/view"; public static final String LATEST_PARTITION_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/"); + public static final String LATEST_PARTITION_SLICES_STATELESS_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/"); public static final String LATEST_PARTITION_SLICE_URL = String.format("%s/%s", BASE_URL, "slices/file/latest/"); public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/"); @@ -101,6 +102,9 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public static final String ALL_FILEGROUPS_FOR_PARTITION_URL = String.format("%s/%s", BASE_URL, "filegroups/all/partition/"); + public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL = + String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/"); + public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON = String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/"); @@ -332,6 +336,18 @@ public Stream getLatestFileSlices(String partitionPath) { } } + @Override + public Stream getLatestFileSlicesStateless(String partitionPath) { + Map paramsMap = getParamsWithPartitionPath(partitionPath); + try { + List dataFiles = executeRequest(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap, + new TypeReference>() {}, RequestMethod.GET); + return dataFiles.stream().map(FileSliceDTO::toFileSlice); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + @Override public Option getLatestFileSlice(String partitionPath, String fileId) { Map paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId); @@ -438,6 +454,18 @@ public Stream getAllFileGroups(String partitionPath) { } } + @Override + public Stream getAllFileGroupsStateless(String partitionPath) { + Map paramsMap = getParamsWithPartitionPath(partitionPath); + try { + List fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap, + new TypeReference>() {}, RequestMethod.GET); + return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index 267956563836..b2b05e324810 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -553,6 +553,12 @@ protected void removeReplacedFileIdsAtInstants(Set instants) { ); } + @Override + protected boolean hasReplacedFilesInPartition(String partitionPath) { + return rocksDB.prefixSearch(schemaHelper.getColFamilyForReplacedFileGroups(), schemaHelper.getPrefixForReplacedFileGroup(partitionPath)) + .findAny().isPresent(); + } + @Override protected Option getReplaceInstant(final HoodieFileGroupId fileGroupId) { String lookupKey = schemaHelper.getKeyForReplacedFileGroup(fileGroupId); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 6fedb8684c98..1bcd1de61bc5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -107,6 +107,19 @@ interface SliceViewWithLatestSlice { */ Stream getLatestFileSlices(String partitionPath); + /** + * Stream all the latest file slices in the given partition + * without caching the file group mappings. + * + *

This is useful for some table services such as compaction and clustering, these services may search around the files to clean + * within some ancient data partitions, if there triggers a full table service for enormous number of partitions, the cache could + * cause a huge memory pressure to the timeline server which induces an OOM exception. + * + *

The caching of these file groups does not benefit to writers most often because the writers + * write to recent data partitions usually. + */ + Stream getLatestFileSlicesStateless(String partitionPath); + /** * Get Latest File Slice for a given fileId in a given partition. */ @@ -168,6 +181,18 @@ interface SliceView extends SliceViewWithLatestSlice { */ Stream getAllFileGroups(String partitionPath); + /** + * Stream all the file groups for a given partition without caching the file group mappings. + * + *

This is useful for some table services such as cleaning, the cleaning service may search around the files to clean + * within some ancient data partitions, if there triggers a full table cleaning for enormous number of partitions, the cache could + * cause a huge memory pressure to the timeline server which induces an OOM exception. + * + *

The caching of these file groups does not benefit to writers most often because the writers + * write to recent data partitions usually. + */ + Stream getAllFileGroupsStateless(String partitionPath); + /** * Return Pending Compaction Operations. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java index 45b2a13eb72a..ff924e450135 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java @@ -87,6 +87,10 @@ public String getKeyForReplacedFileGroup(HoodieFileGroupId fgId) { return getPartitionFileIdBasedLookup(fgId); } + public String getPrefixForReplacedFileGroup(String partitionPath) { + return String.format("part=%s,id=", partitionPath); + } + public String getKeyForFileGroupsInPendingClustering(HoodieFileGroupId fgId) { return getPartitionFileIdBasedLookup(fgId); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index ab8a1fd3aa28..558dc59f40c9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -90,11 +90,12 @@ import static org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -328,6 +329,109 @@ public void testViewForFileSlicesWithPartitionMetadataFile() throws Exception { assertEquals(2, fsView.getAllFileGroups(partitionPath).count()); } + @Test + public void testViewForGetAllFileGroupsStateless() throws Exception { + String partitionPath1 = "2023/11/22"; + new File(basePath + "/" + partitionPath1).mkdirs(); + new File(basePath + "/" + partitionPath1 + "/" + HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs(); + String partitionPath2 = "2023/11/23"; + new File(basePath + "/" + partitionPath2).mkdirs(); + new File(basePath + "/" + partitionPath2 + "/" + HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs(); + + // create 2 fileId in partition1 + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + String commitTime1 = "1"; + String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId1); + String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId2); + new File(basePath + "/" + partitionPath1 + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath1 + "/" + fileName2).createNewFile(); + + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); + saveAsComplete(commitTimeline, instant1, Option.empty()); + + // create 2 fileId in partition2 + String fileId3 = UUID.randomUUID().toString(); + String fileId4 = UUID.randomUUID().toString(); + String commitTime2 = "2"; + String fileName3 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, fileId3); + String fileName4 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, fileId4); + new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath2 + "/" + fileName4).createNewFile(); + + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2); + saveAsComplete(commitTimeline, instant2, Option.empty()); + + fsView.sync(); + // invokes the stateless API first then the normal API, assert the result equality with different file group objects + List actual1 = fsView.getAllFileGroupsStateless(partitionPath1).collect(Collectors.toList()); + List expected1 = fsView.getAllFileGroups(partitionPath1).collect(Collectors.toList()); + for (int i = 0; i < expected1.size(); i++) { + assertThat("The stateless API should return the same result", actual1.get(i).toString(), is(expected1.get(i).toString())); + assertNotSame(actual1.get(i), expected1.get(i), "The stateless API does not cache"); + } + + List expected2 = fsView.getAllFileGroupsStateless(partitionPath2).collect(Collectors.toList()); + List actual2 = fsView.getAllFileGroups(partitionPath2).collect(Collectors.toList()); + for (int i = 0; i < expected2.size(); i++) { + assertThat("The stateless API should return the same result", actual2.get(i).toString(), is(expected2.get(i).toString())); + assertNotSame(actual2.get(i), expected2.get(i), "The stateless API does not cache"); + } + } + + @Test + public void testViewForGetLatestFileSlicesStateless() throws Exception { + String partitionPath1 = "2023/11/22"; + new File(basePath + "/" + partitionPath1).mkdirs(); + new File(basePath + "/" + partitionPath1 + "/" + HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs(); + String partitionPath2 = "2023/11/23"; + new File(basePath + "/" + partitionPath2).mkdirs(); + new File(basePath + "/" + partitionPath2 + "/" + HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs(); + + // create 2 fileId in partition1 + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + String commitTime1 = "1"; + String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId1); + String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId2); + new File(basePath + "/" + partitionPath1 + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath1 + "/" + fileName2).createNewFile(); + + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); + saveAsComplete(commitTimeline, instant1, Option.empty()); + + // create 2 fileId in partition2 + String fileId3 = UUID.randomUUID().toString(); + String fileId4 = UUID.randomUUID().toString(); + String commitTime2 = "2"; + String fileName3 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, fileId3); + String fileName4 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, fileId4); + new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath2 + "/" + fileName4).createNewFile(); + + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2); + saveAsComplete(commitTimeline, instant2, Option.empty()); + + fsView.sync(); + + // invokes the stateless API first then the normal API, assert the result equality with different file slice objects + List actual1 = fsView.getLatestFileSlicesStateless(partitionPath1).collect(Collectors.toList()); + List expected1 = fsView.getLatestFileSlices(partitionPath1).collect(Collectors.toList()); + for (int i = 0; i < expected1.size(); i++) { + assertThat("The stateless API should return the same result", actual1.get(i), is(expected1.get(i))); + assertNotSame(actual1.get(i), expected1.get(i), "The stateless API does not cache"); + } + + List expected2 = fsView.getLatestFileSlicesStateless(partitionPath2).collect(Collectors.toList()); + List actual2 = fsView.getLatestFileSlices(partitionPath2).collect(Collectors.toList()); + for (int i = 0; i < expected2.size(); i++) { + assertThat("The stateless API should return the same result", actual2.get(i), is(expected2.get(i))); + assertNotSame(actual2.get(i), expected2.get(i), "The stateless API does not cache"); + } + } + @Test protected void testInvalidLogFiles() throws Exception { String partitionPath = "2016/05/01"; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 91adda4ee858..c72491341fe4 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -337,6 +337,14 @@ private void registerFileSlicesAPI() { writeValueAsString(ctx, dtos); }, true)); + app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1); + List dtos = sliceHandler.getLatestFileSlicesStateless( + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + writeValueAsString(ctx, dtos); + }, true)); + app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_SLICE", 1); List dtos = sliceHandler.getLatestFileSlice( @@ -421,6 +429,14 @@ private void registerFileSlicesAPI() { writeValueAsString(ctx, dtos); }, true)); + app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, new ViewHandler(ctx -> { + metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION_STATELESS", 1); + List dtos = sliceHandler.getAllFileGroupsStateless( + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + writeValueAsString(ctx, dtos); + }, true)); + app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new ViewHandler(ctx -> { metricsRegistry.add("REFRESH_TABLE", 1); boolean success = sliceHandler diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java index e8af55e69b38..c2b739c9f8bb 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java @@ -18,9 +18,6 @@ package org.apache.hudi.timeline.service.handlers; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; - import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO; import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO; @@ -30,6 +27,9 @@ import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.timeline.service.TimelineService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -90,6 +90,11 @@ public List getLatestFileSlices(String basePath, String partitionP .collect(Collectors.toList()); } + public List getLatestFileSlicesStateless(String basePath, String partitionPath) { + return viewManager.getFileSystemView(basePath).getLatestFileSlicesStateless(partitionPath).map(FileSliceDTO::fromFileSlice) + .collect(Collectors.toList()); + } + public List getLatestFileSlice(String basePath, String partitionPath, String fileId) { return viewManager.getFileSystemView(basePath).getLatestFileSlice(partitionPath, fileId) .map(FileSliceDTO::fromFileSlice).map(Arrays::asList).orElse(new ArrayList<>()); @@ -113,6 +118,12 @@ public List getAllFileGroups(String basePath, String partitionPath return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups); } + public List getAllFileGroupsStateless(String basePath, String partitionPath) { + List fileGroups = viewManager.getFileSystemView(basePath).getAllFileGroupsStateless(partitionPath) + .collect(Collectors.toList()); + return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups); + } + public List getReplacedFileGroupsBeforeOrOn(String basePath, String maxCommitTime, String partitionPath) { List fileGroups = viewManager.getFileSystemView(basePath).getReplacedFileGroupsBeforeOrOn(maxCommitTime, partitionPath) .collect(Collectors.toList());