From d53a320408f02a2c87fed46e1776b40bc218ea84 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 21 Feb 2022 17:11:03 -0800 Subject: [PATCH 1/2] [HUDI-3465] Add validation of column stats and bloom filters in HoodieMetadataTableValidator --- .../model/HoodieColumnRangeMetadata.java | 2 +- .../apache/hudi/common/util/ParquetUtils.java | 32 +- .../HoodieMetadataTableValidator.java | 364 +++++++++++++++--- .../hudi/utilities/util/BloomFilterData.java | 104 +++++ 4 files changed, 423 insertions(+), 79 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/util/BloomFilterData.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index f25d76813357e..acf5b2298987a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -105,7 +105,7 @@ public int hashCode() { public String toString() { return "HoodieColumnRangeMetadata{" + "filePath ='" + filePath + '\'' - + "columnName='" + columnName + '\'' + + ", columnName='" + columnName + '\'' + ", minValue=" + minValue + ", maxValue=" + maxValue + ", nullCount=" + nullCount diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index f67cf65258371..e74f4f77703d0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -18,10 +18,6 @@ package org.apache.hudi.common.util; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -30,6 +26,11 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.MetadataNotFoundException; import org.apache.hudi.keygen.BaseKeyGenerator; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; @@ -46,6 +47,7 @@ import org.apache.parquet.schema.PrimitiveType; import javax.annotation.Nonnull; + import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; @@ -80,6 +82,17 @@ public Set filterRowKeys(Configuration configuration, Path filePath, Set return filterParquetRowKeys(configuration, filePath, filter, HoodieAvroUtils.getRecordKeySchema()); } + public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { + ParquetMetadata footer; + try { + // TODO(vc): Should we use the parallel reading version here? + footer = ParquetFileReader.readFooter(FSUtils.getFs(parquetFilePath.toString(), conf).getConf(), parquetFilePath); + } catch (IOException e) { + throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e); + } + return footer; + } + /** * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will * return all the rowkeys. @@ -184,17 +197,6 @@ public List fetchHoodieKeys(Configuration configuration, Path filePat } } - public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { - ParquetMetadata footer; - try { - // TODO(vc): Should we use the parallel reading version here? - footer = ParquetFileReader.readFooter(FSUtils.getFs(parquetFilePath.toString(), conf).getConf(), parquetFilePath); - } catch (IOException e) { - throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e); - } - return footer; - } - /** * Get the schema of the given parquet file. */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index b67a7ff07b5ba..7d3a5cc234f30 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -20,29 +20,44 @@ import org.apache.hudi.async.HoodieAsyncService; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.utilities.util.BloomFilterData; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import jline.internal.Log; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import java.io.IOException; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -54,10 +69,17 @@ import java.util.stream.Collectors; /** - * A validator with spark-submit to compare list partitions and list files between metadata table and filesystem. - * + * A validator with spark-submit to compare information, such as partitions, file listing, index, etc., + * between metadata table and filesystem. + *

+ * There are five validation tasks, that can be enabled independently through the following CLI options: + * - `--validate-latest-file-slices`: validate latest file slices for all partitions. + * - `--validate-latest-base-files`: validate latest base files for all partitions. + * - `--validate-all-file-groups`: validate all file groups, and all file slices within file groups. + * - `--validate-all-column-stats`: validate column stats for all columns in the schema + * - `--validate-bloom-filters`: validate bloom filters of base files *

- * - Default : This validator will compare the result of listing partitions/listing files between metadata table and filesystem only once. + * - Default : This validator will compare the results between metadata table and filesystem only once. *

* Example command: * ``` @@ -160,6 +182,12 @@ public static class Config implements Serializable { @Parameter(names = {"--validate-all-file-groups"}, description = "Validate all file groups, and all file slices within file groups.", required = false) public boolean validateAllFileGroups = false; + @Parameter(names = {"--validate-all-column-stats"}, description = "Validate column stats for all columns in the schema", required = false) + public boolean validateAllColumnStats = false; + + @Parameter(names = {"--validate-bloom-filters"}, description = "Validate bloom filters of base files", required = false) + public boolean validateBloomFilters = false; + @Parameter(names = {"--min-validate-interval-seconds"}, description = "the min validate interval of each validate when set --continuous, default is 10 minutes.") public Integer minValidateIntervalSeconds = 10 * 60; @@ -199,6 +227,8 @@ public String toString() { + " --validate-latest-file-slices " + validateLatestFileSlices + ", \n" + " --validate-latest-base-files " + validateLatestBaseFiles + ", \n" + " --validate-all-file-groups " + validateAllFileGroups + ", \n" + + " --validate-all-column-stats " + validateAllColumnStats + ", \n" + + " --validate-bloom-filters " + validateBloomFilters + ", \n" + " --continuous " + continuous + ", \n" + " --ignore-failed " + ignoreFailed + ", \n" + " --min-validate-interval-seconds " + minValidateIntervalSeconds + ", \n" @@ -225,6 +255,8 @@ public boolean equals(Object o) { && Objects.equals(validateLatestFileSlices, config.validateLatestFileSlices) && Objects.equals(validateLatestBaseFiles, config.validateLatestBaseFiles) && Objects.equals(validateAllFileGroups, config.validateAllFileGroups) + && Objects.equals(validateAllColumnStats, config.validateAllColumnStats) + && Objects.equals(validateBloomFilters, config.validateBloomFilters) && Objects.equals(minValidateIntervalSeconds, config.minValidateIntervalSeconds) && Objects.equals(parallelism, config.parallelism) && Objects.equals(ignoreFailed, config.ignoreFailed) @@ -237,8 +269,9 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles, validateAllFileGroups, - minValidateIntervalSeconds, parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help); + return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles, + validateAllFileGroups, validateAllColumnStats, validateBloomFilters, minValidateIntervalSeconds, + parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help); } } @@ -314,12 +347,14 @@ public void doMetadataTableValidation() { String basePath = metaClient.getBasePath(); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); List allPartitions = validatePartitions(engineContext, basePath); - HoodieTableFileSystemView metaFsView = createHoodieTableFileSystemView(engineContext, true); - HoodieTableFileSystemView fsView = createHoodieTableFileSystemView(engineContext, false); + HoodieMetadataValidationContext metadataTableBasedContext = + new HoodieMetadataValidationContext(engineContext, cfg, metaClient, true); + HoodieMetadataValidationContext fsBasedContext = + new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false); List result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> { try { - validateFilesInPartition(metaFsView, fsView, partitionPath); + validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath); LOG.info("Metadata table validation succeeded for " + partitionPath); return true; } catch (HoodieValidationException e) { @@ -364,65 +399,73 @@ private List validatePartitions(HoodieSparkEngineContext engineContext, } /** - * Compare the listing files result between metadata table and fileSystem. - * For now, validate two kinds of apis: - * 1. getLatestFileSlices - * 2. getLatestBaseFiles - * 3. getAllFileGroups and getAllFileSlices - * @param metaFsView - * @param fsView - * @param partitionPath + * Compare the file listing and index data between metadata table and fileSystem. + * For now, validate five kinds of apis: + * 1. HoodieMetadataFileSystemView::getLatestFileSlices + * 2. HoodieMetadataFileSystemView::getLatestBaseFiles + * 3. HoodieMetadataFileSystemView::getAllFileGroups and HoodieMetadataFileSystemView::getAllFileSlices + * 4. HoodieBackedTableMetadata::getColumnStats + * 5. HoodieBackedTableMetadata::getBloomFilters + * + * @param metadataTableBasedContext Validation context containing information based on metadata table + * @param fsBasedContext Validation context containing information based on the file system + * @param partitionPath Partition path String */ - private void validateFilesInPartition(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) { + private void validateFilesInPartition( + HoodieMetadataValidationContext metadataTableBasedContext, + HoodieMetadataValidationContext fsBasedContext, String partitionPath) { if (cfg.validateLatestFileSlices) { - validateLatestFileSlices(metaFsView, fsView, partitionPath); + validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath); } if (cfg.validateLatestBaseFiles) { - validateLatestBaseFiles(metaFsView, fsView, partitionPath); + validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath); } if (cfg.validateAllFileGroups) { - validateAllFileGroups(metaFsView, fsView, partitionPath); + validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath); } - } - private void validateAllFileGroups(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) { - List fileGroupsFromMetadata = metaFsView.getAllFileGroups(partitionPath).sorted(new HoodieFileGroupCompactor()).collect(Collectors.toList()); - List fileGroupsFromFS = fsView.getAllFileGroups(partitionPath).sorted(new HoodieFileGroupCompactor()).collect(Collectors.toList()); + if (cfg.validateAllColumnStats) { + validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath); + } - List allFileSlicesFromMeta = fileGroupsFromMetadata.stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceCompactor()).collect(Collectors.toList()); - List allFileSlicesFromFS = fileGroupsFromFS.stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceCompactor()).collect(Collectors.toList()); + if (cfg.validateBloomFilters) { + validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath); + } + } - LOG.info("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath); - LOG.info("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath); - validateFileSlice(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath); + private void validateAllFileGroups( + HoodieMetadataValidationContext metadataTableBasedContext, + HoodieMetadataValidationContext fsBasedContext, String partitionPath) { + List allFileSlicesFromMeta = metadataTableBasedContext + .getSortedAllFileGroupList(partitionPath).stream() + .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) + .collect(Collectors.toList()); + List allFileSlicesFromFS = fsBasedContext + .getSortedAllFileGroupList(partitionPath).stream() + .flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator()) + .collect(Collectors.toList()); + + LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath); + LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath); + validate(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, "file slices"); LOG.info("Validation of all file groups succeeded for partition " + partitionPath); } - private void validateFileSlice(List fileSlicesFromMeta, List fileSlicesFromFS, String partitionPath) { - if (fileSlicesFromMeta.size() != fileSlicesFromFS.size() || !fileSlicesFromMeta.equals(fileSlicesFromFS)) { - String message = "Validation of metadata file slices for partition " + partitionPath + " failed. " - + "File slices from metadata: " + fileSlicesFromMeta - + "File slices from direct listing: " + fileSlicesFromFS; - LOG.error(message); - throw new HoodieValidationException(message); - } else { - LOG.info("Validation of file slices succeeded for partition " + partitionPath); - } - } - /** * Compare getLatestBaseFiles between metadata table and fileSystem. */ - private void validateLatestBaseFiles(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) { + private void validateLatestBaseFiles( + HoodieMetadataValidationContext metadataTableBasedContext, + HoodieMetadataValidationContext fsBasedContext, String partitionPath) { - List latestFilesFromMetadata = metaFsView.getLatestBaseFiles(partitionPath).sorted(new HoodieBaseFileCompactor()).collect(Collectors.toList()); - List latestFilesFromFS = fsView.getLatestBaseFiles(partitionPath).sorted(new HoodieBaseFileCompactor()).collect(Collectors.toList()); + List latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath); + List latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath); - LOG.info("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath); - LOG.info("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath); + LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath); + LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath); if (latestFilesFromMetadata.size() != latestFilesFromFS.size() || !latestFilesFromMetadata.equals(latestFilesFromFS)) { String message = "Validation of metadata get latest base file for partition " + partitionPath + " failed. " @@ -438,27 +481,62 @@ private void validateLatestBaseFiles(HoodieTableFileSystemView metaFsView, Hoodi /** * Compare getLatestFileSlices between metadata table and fileSystem. */ - private void validateLatestFileSlices(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) { + private void validateLatestFileSlices( + HoodieMetadataValidationContext metadataTableBasedContext, + HoodieMetadataValidationContext fsBasedContext, String partitionPath) { - List latestFileSlicesFromMetadataTable = metaFsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList()); - List latestFileSlicesFromFS = fsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList()); + List latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath); + List latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath); - LOG.info("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath); - LOG.info("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath); + LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath); + LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath); - validateFileSlice(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath); + validate(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, "file slices"); LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath); } - private HoodieTableFileSystemView createHoodieTableFileSystemView(HoodieSparkEngineContext engineContext, boolean enableMetadataTable) { + private void validateAllColumnStats( + HoodieMetadataValidationContext metadataTableBasedContext, + HoodieMetadataValidationContext fsBasedContext, String partitionPath) { + List latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath) + .stream().map(BaseFile::getFileName).collect(Collectors.toList()); + List> metadataBasedColStats = metadataTableBasedContext + .getSortedColumnStatsList(partitionPath, latestBaseFilenameList); + List> fsBasedColStats = fsBasedContext + .getSortedColumnStatsList(partitionPath, latestBaseFilenameList); - HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() - .enable(enableMetadataTable) - .withAssumeDatePartitioning(cfg.assumeDatePartitioning) - .build(); + validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats"); - return FileSystemViewManager.createInMemoryFileSystemView(engineContext, - metaClient, metadataConfig); + LOG.info("Validation of column stats succeeded for partition " + partitionPath); + } + + private void validateBloomFilters( + HoodieMetadataValidationContext metadataTableBasedContext, + HoodieMetadataValidationContext fsBasedContext, String partitionPath) { + List latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath) + .stream().map(BaseFile::getFileName).collect(Collectors.toList()); + List metadataBasedBloomFilters = metadataTableBasedContext + .getSortedBloomFilterList(partitionPath, latestBaseFilenameList); + List fsBasedBloomFilters = fsBasedContext + .getSortedBloomFilterList(partitionPath, latestBaseFilenameList); + + validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters"); + + LOG.info("Validation of bloom filters succeeded for partition " + partitionPath); + } + + private void validate( + List infoListFromMetadataTable, List infoListFromFS, String partitionPath, String label) { + if (infoListFromMetadataTable.size() != infoListFromFS.size() + || !infoListFromMetadataTable.equals(infoListFromFS)) { + String message = String.format("Validation of %s for partition %s failed." + + "\n%s from metadata: %s\n%s from file system and base files: %s", + label, partitionPath, label, infoListFromMetadataTable, label, infoListFromFS); + LOG.error(message); + throw new HoodieValidationException(message); + } else { + LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath)); + } } public class AsyncMetadataTableValidateService extends HoodieAsyncService { @@ -491,7 +569,7 @@ protected Pair startService() { } } - public static class FileSliceCompactor implements Comparator, Serializable { + public static class FileSliceComparator implements Comparator, Serializable { @Override public int compare(FileSlice o1, FileSlice o2) { @@ -500,7 +578,7 @@ public int compare(FileSlice o1, FileSlice o2) { } } - public static class HoodieBaseFileCompactor implements Comparator, Serializable { + public static class HoodieBaseFileComparator implements Comparator, Serializable { @Override public int compare(HoodieBaseFile o1, HoodieBaseFile o2) { @@ -508,11 +586,171 @@ public int compare(HoodieBaseFile o1, HoodieBaseFile o2) { } } - public static class HoodieFileGroupCompactor implements Comparator, Serializable { + public static class HoodieFileGroupComparator implements Comparator, Serializable { @Override public int compare(HoodieFileGroup o1, HoodieFileGroup o2) { return o1.getFileGroupId().compareTo(o2.getFileGroupId()); } } -} \ No newline at end of file + + public static class HoodieColumnRangeMetadataComparator + implements Comparator>, Serializable { + + @Override + public int compare(HoodieColumnRangeMetadata o1, HoodieColumnRangeMetadata o2) { + return o1.toString().compareTo(o2.toString()); + } + } + + /** + * Class for storing relevant information for metadata table validation. + *

+ * If metadata table is disabled, the APIs provide the information, e.g., file listing, + * index, from the file system and base files. If metadata table is enabled, the APIs + * provide the information from the metadata table. The same API is expected to return + * the same information regardless of whether metadata table is enabled, which is + * verified in the {@link HoodieMetadataTableValidator}. + */ + private static class HoodieMetadataValidationContext implements Serializable { + private HoodieTableMetaClient metaClient; + private HoodieTableFileSystemView fileSystemView; + private HoodieTableMetadata tableMetadata; + private boolean enableMetadataTable; + private List allColumnNameList; + + public HoodieMetadataValidationContext( + HoodieEngineContext engineContext, Config cfg, HoodieTableMetaClient metaClient, + boolean enableMetadataTable) { + this.metaClient = metaClient; + this.enableMetadataTable = enableMetadataTable; + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() + .enable(enableMetadataTable) + .withMetadataIndexBloomFilter(enableMetadataTable) + .withMetadataIndexColumnStats(enableMetadataTable) + .withMetadataIndexForAllColumns(enableMetadataTable) + .withAssumeDatePartitioning(cfg.assumeDatePartitioning) + .build(); + this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, + metaClient, metadataConfig); + this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(), + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); + if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) { + this.allColumnNameList = getAllColumnNames(); + } + } + + public List getSortedLatestBaseFileList(String partitionPath) { + return fileSystemView.getLatestBaseFiles(partitionPath) + .sorted(new HoodieBaseFileComparator()).collect(Collectors.toList()); + } + + public List getSortedLatestFileSliceList(String partitionPath) { + return fileSystemView.getLatestFileSlices(partitionPath) + .sorted(new FileSliceComparator()).collect(Collectors.toList()); + } + + public List getSortedAllFileGroupList(String partitionPath) { + return fileSystemView.getAllFileGroups(partitionPath) + .sorted(new HoodieFileGroupComparator()).collect(Collectors.toList()); + } + + public List> getSortedColumnStatsList( + String partitionPath, List baseFileNameList) { + LOG.info("All column names for getting column stats: " + allColumnNameList); + if (enableMetadataTable) { + List> partitionFileNameList = baseFileNameList.stream() + .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList()); + return allColumnNameList.stream() + .flatMap(columnName -> + tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream() + .map(stats -> new HoodieColumnRangeMetadata<>( + stats.getFileName(), + columnName, + stats.getMinValue(), + stats.getMaxValue(), + stats.getNullCount(), + stats.getValueCount(), + stats.getTotalSize(), + stats.getTotalUncompressedSize())) + .collect(Collectors.toList()) + .stream()) + .sorted(new HoodieColumnRangeMetadataComparator()) + .collect(Collectors.toList()); + } else { + return baseFileNameList.stream().flatMap(filename -> + new ParquetUtils().readRangeFromParquetMetadata( + metaClient.getHadoopConf(), + new Path(new Path(metaClient.getBasePath(), partitionPath), filename), + allColumnNameList).stream()) + .map(rangeMetadata -> new HoodieColumnRangeMetadata( + rangeMetadata.getFilePath(), + rangeMetadata.getColumnName(), + // Note: here we ignore the type in the validation, + // since column stats from metadata table store the min/max values as String + rangeMetadata.getMinValue().toString(), + rangeMetadata.getMaxValue().toString(), + rangeMetadata.getNullCount(), + rangeMetadata.getValueCount(), + rangeMetadata.getTotalSize(), + rangeMetadata.getTotalUncompressedSize() + )) + .sorted(new HoodieColumnRangeMetadataComparator()) + .collect(Collectors.toList()); + } + } + + public List getSortedBloomFilterList( + String partitionPath, List baseFileNameList) { + if (enableMetadataTable) { + List> partitionFileNameList = baseFileNameList.stream() + .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList()); + return tableMetadata.getBloomFilters(partitionFileNameList).entrySet().stream() + .map(entry -> BloomFilterData.builder() + .setPartitionPath(entry.getKey().getKey()) + .setFilename(entry.getKey().getValue()) + .setBloomFilter(entry.getValue()) + .build()) + .sorted() + .collect(Collectors.toList()); + } else { + return baseFileNameList.stream() + .map(filename -> readBloomFilterFromFile(partitionPath, filename)) + .filter(Option::isPresent) + .map(Option::get) + .collect(Collectors.toList()); + } + } + + private List getAllColumnNames() { + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); + try { + return schemaResolver.getTableAvroSchema().getFields().stream() + .map(entry -> entry.name()).collect(Collectors.toList()); + } catch (Exception e) { + throw new HoodieException("Failed to get all column names for " + metaClient.getBasePath()); + } + } + + private Option readBloomFilterFromFile(String partitionPath, String filename) { + Path path = new Path(new Path(metaClient.getBasePath(), partitionPath), filename); + HoodieFileReader fileReader; + try { + fileReader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path); + } catch (IOException e) { + Log.error("Failed to get file reader for " + path + " " + e.getMessage()); + return Option.empty(); + } + final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); + if (fileBloomFilter == null) { + Log.error("Failed to read bloom filter for " + path); + return Option.empty(); + } + return Option.of(BloomFilterData.builder() + .setPartitionPath(partitionPath) + .setFilename(filename) + .setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes())) + .build()); + } + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/util/BloomFilterData.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/util/BloomFilterData.java new file mode 100644 index 0000000000000..1d4f0539136b4 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/util/BloomFilterData.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.util; + +import org.jetbrains.annotations.NotNull; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * Includes partition path, filename and bloom filter for validation + */ +public class BloomFilterData implements Comparable { + private final String partitionPath; + private final String filename; + private final ByteBuffer bloomFilter; + + private BloomFilterData( + String partitionPath, String filename, ByteBuffer bloomFilter) { + this.partitionPath = partitionPath; + this.filename = filename; + this.bloomFilter = bloomFilter; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public int compareTo(@NotNull BloomFilterData o) { + return this.toString().compareTo(o.toString()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BloomFilterData that = (BloomFilterData) o; + return partitionPath.equals(that.partitionPath) && filename.equals(that.filename) + && bloomFilter.equals(that.bloomFilter); + } + + @Override + public int hashCode() { + return Objects.hash(partitionPath, filename, bloomFilter); + } + + @Override + public String toString() { + String bloomFilterString = new String(bloomFilter.array()); + return "BloomFilterData{" + + "partitionPath='" + partitionPath + '\'' + + ", filename='" + filename + '\'' + + ", bloomFilter=" + + (bloomFilterString.length() > 50 ? bloomFilterString.substring(0, 50) + "..." : bloomFilterString) + + '}'; + } + + public static class Builder { + private String partitionPath; + private String filename; + private ByteBuffer bloomFilter; + + public Builder setPartitionPath(String partitionPath) { + this.partitionPath = partitionPath; + return this; + } + + public Builder setFilename(String filename) { + this.filename = filename; + return this; + } + + public Builder setBloomFilter(ByteBuffer bloomFilter) { + this.bloomFilter = bloomFilter; + return this; + } + + public BloomFilterData build() { + return new BloomFilterData(partitionPath, filename, bloomFilter); + } + } +} From c8c95ab2e3028486524bcaf48d2f0e34a2846aca Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 28 Feb 2022 11:06:19 -0800 Subject: [PATCH 2/2] Fix sorting --- .../org/apache/hudi/utilities/HoodieMetadataTableValidator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 7d3a5cc234f30..f9b0e1a86d6af 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -718,6 +718,7 @@ public List getSortedBloomFilterList( .map(filename -> readBloomFilterFromFile(partitionPath, filename)) .filter(Option::isPresent) .map(Option::get) + .sorted() .collect(Collectors.toList()); } }