diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index be0044f275ca2..832d942c86afc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.async.HoodieAsyncService; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.HoodieMetadataConfig; @@ -29,16 +30,21 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -63,6 +69,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -173,6 +180,9 @@ public static class Config implements Serializable { + "Can use --min-validate-interval-seconds to control validation frequency", required = false) public boolean continuous = false; + @Parameter(names = {"--skip-data-files-for-cleaning"}, description = "Skip to compare the data files which are under deletion by cleaner", required = false) + public boolean skipDataFilesForCleaning = false; + @Parameter(names = {"--validate-latest-file-slices"}, description = "Validate latest file slices for all partitions.", required = false) public boolean validateLatestFileSlices = false; @@ -230,6 +240,7 @@ public String toString() { + " --validate-all-column-stats " + validateAllColumnStats + ", \n" + " --validate-bloom-filters " + validateBloomFilters + ", \n" + " --continuous " + continuous + ", \n" + + " --skip-data-files-for-cleaning " + skipDataFilesForCleaning + ", \n" + " --ignore-failed " + ignoreFailed + ", \n" + " --min-validate-interval-seconds " + minValidateIntervalSeconds + ", \n" + " --parallelism " + parallelism + ", \n" @@ -252,6 +263,7 @@ public boolean equals(Object o) { Config config = (Config) o; return basePath.equals(config.basePath) && Objects.equals(continuous, config.continuous) + && Objects.equals(skipDataFilesForCleaning, config.skipDataFilesForCleaning) && Objects.equals(validateLatestFileSlices, config.validateLatestFileSlices) && Objects.equals(validateLatestBaseFiles, config.validateLatestBaseFiles) && Objects.equals(validateAllFileGroups, config.validateAllFileGroups) @@ -269,7 +281,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles, + return Objects.hash(basePath, continuous, skipDataFilesForCleaning, validateLatestFileSlices, validateLatestBaseFiles, validateAllFileGroups, validateAllColumnStats, validateBloomFilters, minValidateIntervalSeconds, parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help); } @@ -345,6 +357,33 @@ public void doMetadataTableValidation() { boolean finalResult = true; metaClient.reloadActiveTimeline(); String basePath = metaClient.getBasePath(); + Set baseFilesForCleaning = Collections.emptySet(); + + if (cfg.skipDataFilesForCleaning) { + HoodieTimeline inflightCleaningTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterInflights(); + + baseFilesForCleaning = inflightCleaningTimeline.getInstants().flatMap(instant -> { + try { + // convert inflight instant to requested and get clean plan + instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp()); + HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, instant); + + return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> { + return cleanerFileInfoList.stream().map(fileInfo -> { + return new Path(fileInfo.getFilePath()).getName(); + }); + }); + + } catch (IOException e) { + throw new HoodieIOException("Error reading cleaner metadata for " + instant); + } + // only take care of base files here. + }).filter(path -> { + String fileExtension = FSUtils.getFileExtension(path); + return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(fileExtension); + }).collect(Collectors.toSet()); + } + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); List allPartitions = validatePartitions(engineContext, basePath); HoodieMetadataValidationContext metadataTableBasedContext = @@ -352,9 +391,10 @@ public void doMetadataTableValidation() { HoodieMetadataValidationContext fsBasedContext = new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false); + Set finalBaseFilesForCleaning = baseFilesForCleaning; List result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> { try { - validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath); + validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning); LOG.info("Metadata table validation succeeded for " + partitionPath); return true; } catch (HoodieValidationException e) { @@ -410,42 +450,64 @@ private List validatePartitions(HoodieSparkEngineContext engineContext, * @param metadataTableBasedContext Validation context containing information based on metadata table * @param fsBasedContext Validation context containing information based on the file system * @param partitionPath Partition path String + * @param baseDataFilesForCleaning Base files for un-complete cleaner action */ private void validateFilesInPartition( HoodieMetadataValidationContext metadataTableBasedContext, - HoodieMetadataValidationContext fsBasedContext, String partitionPath) { + HoodieMetadataValidationContext fsBasedContext, String partitionPath, + Set baseDataFilesForCleaning) { if (cfg.validateLatestFileSlices) { - validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath); + validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning); } if (cfg.validateLatestBaseFiles) { - validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath); + validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning); } if (cfg.validateAllFileGroups) { - validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath); + validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning); } if (cfg.validateAllColumnStats) { - validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath); + validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning); } if (cfg.validateBloomFilters) { - validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath); + validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning); } } private void validateAllFileGroups( HoodieMetadataValidationContext metadataTableBasedContext, - HoodieMetadataValidationContext fsBasedContext, String partitionPath) { - List allFileSlicesFromMeta = metadataTableBasedContext - .getSortedAllFileGroupList(partitionPath).stream() - .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) - .collect(Collectors.toList()); - List allFileSlicesFromFS = fsBasedContext - .getSortedAllFileGroupList(partitionPath).stream() - .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) - .collect(Collectors.toList()); + HoodieMetadataValidationContext fsBasedContext, + String partitionPath, + Set baseDataFilesForCleaning) { + + List allFileSlicesFromMeta; + List allFileSlicesFromFS; + + if (!baseDataFilesForCleaning.isEmpty()) { + List fileSlicesFromMeta = metadataTableBasedContext + .getSortedAllFileGroupList(partitionPath).stream() + .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) + .collect(Collectors.toList()); + List fileSlicesFromFS = fsBasedContext + .getSortedAllFileGroupList(partitionPath).stream() + .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) + .collect(Collectors.toList()); + + allFileSlicesFromMeta = filterFileSliceBasedOnInflightCleaning(fileSlicesFromMeta, baseDataFilesForCleaning); + allFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fileSlicesFromFS, baseDataFilesForCleaning); + } else { + allFileSlicesFromMeta = metadataTableBasedContext + .getSortedAllFileGroupList(partitionPath).stream() + .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) + .collect(Collectors.toList()); + allFileSlicesFromFS = fsBasedContext + .getSortedAllFileGroupList(partitionPath).stream() + .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) + .collect(Collectors.toList()); + } LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath); LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath); @@ -459,10 +521,20 @@ private void validateAllFileGroups( */ private void validateLatestBaseFiles( HoodieMetadataValidationContext metadataTableBasedContext, - HoodieMetadataValidationContext fsBasedContext, String partitionPath) { + HoodieMetadataValidationContext fsBasedContext, + String partitionPath, + Set baseDataFilesForCleaning) { - List latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath); - List latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath); + List latestFilesFromMetadata; + List latestFilesFromFS; + + if (!baseDataFilesForCleaning.isEmpty()) { + latestFilesFromMetadata = filterBaseFileBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning); + latestFilesFromFS = filterBaseFileBasedOnInflightCleaning(fsBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning); + } else { + latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath); + latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath); + } LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath); LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath); @@ -483,10 +555,19 @@ private void validateLatestBaseFiles( */ private void validateLatestFileSlices( HoodieMetadataValidationContext metadataTableBasedContext, - HoodieMetadataValidationContext fsBasedContext, String partitionPath) { - - List latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath); - List latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath); + HoodieMetadataValidationContext fsBasedContext, + String partitionPath, + Set baseDataFilesForCleaning) { + List latestFileSlicesFromMetadataTable; + List latestFileSlicesFromFS; + + if (!baseDataFilesForCleaning.isEmpty()) { + latestFileSlicesFromMetadataTable = filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning); + latestFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning); + } else { + latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath); + latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath); + } LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath); LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath); @@ -495,11 +576,31 @@ private void validateLatestFileSlices( LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath); } + private List filterFileSliceBasedOnInflightCleaning(List sortedLatestFileSliceList, Set baseDataFilesForCleaning) { + return sortedLatestFileSliceList.stream() + .filter(fileSlice -> { + if (!fileSlice.getBaseFile().isPresent()) { + return true; + } else { + return !baseDataFilesForCleaning.contains(fileSlice.getBaseFile().get().getFileName()); + } + }).collect(Collectors.toList()); + } + + private List filterBaseFileBasedOnInflightCleaning(List sortedBaseFileList, Set baseDataFilesForCleaning) { + return sortedBaseFileList.stream() + .filter(baseFile -> { + return !baseDataFilesForCleaning.contains(baseFile.getFileName()); + }).collect(Collectors.toList()); + } + private void validateAllColumnStats( HoodieMetadataValidationContext metadataTableBasedContext, - HoodieMetadataValidationContext fsBasedContext, String partitionPath) { - List latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath) - .stream().map(BaseFile::getFileName).collect(Collectors.toList()); + HoodieMetadataValidationContext fsBasedContext, + String partitionPath, + Set baseDataFilesForCleaning) { + + List latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning); List> metadataBasedColStats = metadataTableBasedContext .getSortedColumnStatsList(partitionPath, latestBaseFilenameList); List> fsBasedColStats = fsBasedContext @@ -512,9 +613,11 @@ private void validateAllColumnStats( private void validateBloomFilters( HoodieMetadataValidationContext metadataTableBasedContext, - HoodieMetadataValidationContext fsBasedContext, String partitionPath) { - List latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath) - .stream().map(BaseFile::getFileName).collect(Collectors.toList()); + HoodieMetadataValidationContext fsBasedContext, + String partitionPath, + Set baseDataFilesForCleaning) { + + List latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning); List metadataBasedBloomFilters = metadataTableBasedContext .getSortedBloomFilterList(partitionPath, latestBaseFilenameList); List fsBasedBloomFilters = fsBasedContext @@ -525,6 +628,19 @@ private void validateBloomFilters( LOG.info("Validation of bloom filters succeeded for partition " + partitionPath); } + private List getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set baseDataFilesForCleaning) { + List latestBaseFilenameList; + if (!baseDataFilesForCleaning.isEmpty()) { + List sortedLatestBaseFileList = fsBasedContext.getSortedLatestBaseFileList(partitionPath); + latestBaseFilenameList = filterBaseFileBasedOnInflightCleaning(sortedLatestBaseFileList, baseDataFilesForCleaning) + .stream().map(BaseFile::getFileName).collect(Collectors.toList()); + } else { + latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath) + .stream().map(BaseFile::getFileName).collect(Collectors.toList()); + } + return latestBaseFilenameList; + } + private void validate( List infoListFromMetadataTable, List infoListFromFS, String partitionPath, String label) { if (infoListFromMetadataTable.size() != infoListFromFS.size()