Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.utilities;

import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
Expand All @@ -29,16 +30,21 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -63,6 +69,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -173,6 +180,9 @@ public static class Config implements Serializable {
+ "Can use --min-validate-interval-seconds to control validation frequency", required = false)
public boolean continuous = false;

@Parameter(names = {"--skip-data-files-for-cleaning"}, description = "Skip to compare the data files which are under deletion by cleaner", required = false)
public boolean skipDataFilesForCleaning = false;

@Parameter(names = {"--validate-latest-file-slices"}, description = "Validate latest file slices for all partitions.", required = false)
public boolean validateLatestFileSlices = false;

Expand Down Expand Up @@ -230,6 +240,7 @@ public String toString() {
+ " --validate-all-column-stats " + validateAllColumnStats + ", \n"
+ " --validate-bloom-filters " + validateBloomFilters + ", \n"
+ " --continuous " + continuous + ", \n"
+ " --skip-data-files-for-cleaning " + skipDataFilesForCleaning + ", \n"
+ " --ignore-failed " + ignoreFailed + ", \n"
+ " --min-validate-interval-seconds " + minValidateIntervalSeconds + ", \n"
+ " --parallelism " + parallelism + ", \n"
Expand All @@ -252,6 +263,7 @@ public boolean equals(Object o) {
Config config = (Config) o;
return basePath.equals(config.basePath)
&& Objects.equals(continuous, config.continuous)
&& Objects.equals(skipDataFilesForCleaning, config.skipDataFilesForCleaning)
&& Objects.equals(validateLatestFileSlices, config.validateLatestFileSlices)
&& Objects.equals(validateLatestBaseFiles, config.validateLatestBaseFiles)
&& Objects.equals(validateAllFileGroups, config.validateAllFileGroups)
Expand All @@ -269,7 +281,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles,
return Objects.hash(basePath, continuous, skipDataFilesForCleaning, validateLatestFileSlices, validateLatestBaseFiles,
validateAllFileGroups, validateAllColumnStats, validateBloomFilters, minValidateIntervalSeconds,
parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help);
}
Expand Down Expand Up @@ -345,16 +357,44 @@ public void doMetadataTableValidation() {
boolean finalResult = true;
metaClient.reloadActiveTimeline();
String basePath = metaClient.getBasePath();
Set<String> baseFilesForCleaning = Collections.emptySet();

if (cfg.skipDataFilesForCleaning) {
HoodieTimeline inflightCleaningTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterInflights();

baseFilesForCleaning = inflightCleaningTimeline.getInstants().flatMap(instant -> {
try {
// convert inflight instant to requested and get clean plan
instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp());
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, instant);

return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> {
return cleanerFileInfoList.stream().map(fileInfo -> {
return new Path(fileInfo.getFilePath()).getName();
});
});

} catch (IOException e) {
throw new HoodieIOException("Error reading cleaner metadata for " + instant);
}
// only take care of base files here.
}).filter(path -> {
String fileExtension = FSUtils.getFileExtension(path);
return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(fileExtension);
}).collect(Collectors.toSet());
}

HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
List<String> allPartitions = validatePartitions(engineContext, basePath);
HoodieMetadataValidationContext metadataTableBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, true);
HoodieMetadataValidationContext fsBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false);

Set<String> finalBaseFilesForCleaning = baseFilesForCleaning;
List<Boolean> result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> {
try {
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath);
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning);
LOG.info("Metadata table validation succeeded for " + partitionPath);
return true;
} catch (HoodieValidationException e) {
Expand Down Expand Up @@ -410,42 +450,64 @@ private List<String> validatePartitions(HoodieSparkEngineContext engineContext,
* @param metadataTableBasedContext Validation context containing information based on metadata table
* @param fsBasedContext Validation context containing information based on the file system
* @param partitionPath Partition path String
* @param baseDataFilesForCleaning Base files for un-complete cleaner action
*/
private void validateFilesInPartition(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
HoodieMetadataValidationContext fsBasedContext, String partitionPath,
Set<String> baseDataFilesForCleaning) {
if (cfg.validateLatestFileSlices) {
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath);
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateLatestBaseFiles) {
validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath);
validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateAllFileGroups) {
validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath);
validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateAllColumnStats) {
validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath);
validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateBloomFilters) {
validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath);
validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}
}

private void validateAllFileGroups(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<FileSlice> allFileSlicesFromMeta = metadataTableBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
List<FileSlice> allFileSlicesFromFS = fsBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<FileSlice> allFileSlicesFromMeta;
List<FileSlice> allFileSlicesFromFS;

if (!baseDataFilesForCleaning.isEmpty()) {
List<FileSlice> fileSlicesFromMeta = metadataTableBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
List<FileSlice> fileSlicesFromFS = fsBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());

allFileSlicesFromMeta = filterFileSliceBasedOnInflightCleaning(fileSlicesFromMeta, baseDataFilesForCleaning);
allFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fileSlicesFromFS, baseDataFilesForCleaning);
} else {
allFileSlicesFromMeta = metadataTableBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
allFileSlicesFromFS = fsBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
}

LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
Expand All @@ -459,10 +521,20 @@ private void validateAllFileGroups(
*/
private void validateLatestBaseFiles(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<HoodieBaseFile> latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
List<HoodieBaseFile> latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
List<HoodieBaseFile> latestFilesFromMetadata;
List<HoodieBaseFile> latestFilesFromFS;

if (!baseDataFilesForCleaning.isEmpty()) {
latestFilesFromMetadata = filterBaseFileBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
latestFilesFromFS = filterBaseFileBasedOnInflightCleaning(fsBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
} else {
latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
}

LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
Expand All @@ -483,10 +555,19 @@ private void validateLatestBaseFiles(
*/
private void validateLatestFileSlices(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {

List<FileSlice> latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
List<FileSlice> latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {
List<FileSlice> latestFileSlicesFromMetadataTable;
List<FileSlice> latestFileSlicesFromFS;

if (!baseDataFilesForCleaning.isEmpty()) {
latestFileSlicesFromMetadataTable = filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
latestFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
} else {
latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
}

LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
Expand All @@ -495,11 +576,31 @@ private void validateLatestFileSlices(
LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
}

private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
return sortedLatestFileSliceList.stream()
.filter(fileSlice -> {
if (!fileSlice.getBaseFile().isPresent()) {
return true;
} else {
return !baseDataFilesForCleaning.contains(fileSlice.getBaseFile().get().getFileName());
}
}).collect(Collectors.toList());
}

private List<HoodieBaseFile> filterBaseFileBasedOnInflightCleaning(List<HoodieBaseFile> sortedBaseFileList, Set<String> baseDataFilesForCleaning) {
return sortedBaseFileList.stream()
.filter(baseFile -> {
return !baseDataFilesForCleaning.contains(baseFile.getFileName());
}).collect(Collectors.toList());
}

private void validateAllColumnStats(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
Expand All @@ -512,9 +613,11 @@ private void validateAllColumnStats(

private void validateBloomFilters(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext
.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
List<BloomFilterData> fsBasedBloomFilters = fsBasedContext
Expand All @@ -525,6 +628,19 @@ private void validateBloomFilters(
LOG.info("Validation of bloom filters succeeded for partition " + partitionPath);
}

private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
List<String> latestBaseFilenameList;
if (!baseDataFilesForCleaning.isEmpty()) {
List<HoodieBaseFile> sortedLatestBaseFileList = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
latestBaseFilenameList = filterBaseFileBasedOnInflightCleaning(sortedLatestBaseFileList, baseDataFilesForCleaning)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
} else {
latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
}
return latestBaseFilenameList;
}

private <T> void validate(
List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) {
if (infoListFromMetadataTable.size() != infoListFromFS.size()
Expand Down