diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 2f88809879eb0..8dcc3904d5451 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -32,15 +32,19 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.StringUtils; @@ -57,10 +61,13 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import jline.internal.Log; +import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -70,6 +77,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -78,6 +86,8 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; + /** * A validator with spark-submit to compare information, such as partitions, file listing, index, etc., * between metadata table and filesystem. @@ -578,9 +588,9 @@ private void validateAllFileGroups( LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath); LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath); - validate(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, "file slices"); - - LOG.info("Validation of all file groups succeeded for partition " + partitionPath); + validateFileSlices( + allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, + fsBasedContext.getMetaClient(), "all file groups"); } /** @@ -605,16 +615,8 @@ private void validateLatestBaseFiles( LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath); LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath); - if (latestFilesFromMetadata.size() != latestFilesFromFS.size() - || !latestFilesFromMetadata.equals(latestFilesFromFS)) { - String message = "Validation of metadata get latest base file for partition " + partitionPath + " failed. " - + "Latest base file from metadata: " + latestFilesFromMetadata - + "Latest base file from direct listing: " + latestFilesFromFS; - LOG.error(message); - throw new HoodieValidationException(message); - } else { - LOG.info("Validation of getLatestBaseFiles succeeded for partition " + partitionPath); - } + + validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, "latest base files"); } /** @@ -639,8 +641,9 @@ private void validateLatestFileSlices( LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath); LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath); - validate(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, "file slices"); - LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath); + validateFileSlices( + latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, + fsBasedContext.getMetaClient(), "latest file slices"); } private List filterFileSliceBasedOnInflightCleaning(List sortedLatestFileSliceList, Set baseDataFilesForCleaning) { @@ -675,8 +678,6 @@ private void validateAllColumnStats( .getSortedColumnStatsList(partitionPath, latestBaseFilenameList); validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats"); - - LOG.info("Validation of column stats succeeded for partition " + partitionPath); } private void validateBloomFilters( @@ -692,8 +693,6 @@ private void validateBloomFilters( .getSortedBloomFilterList(partitionPath, latestBaseFilenameList); validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters"); - - LOG.info("Validation of bloom filters succeeded for partition " + partitionPath); } private List getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set baseDataFilesForCleaning) { @@ -723,6 +722,121 @@ private void validate( } } + private void validateFileSlices( + List fileSliceListFromMetadataTable, List fileSliceListFromFS, + String partitionPath, HoodieTableMetaClient metaClient, String label) { + boolean mismatch = false; + if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) { + mismatch = true; + } else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) { + for (int i = 0; i < fileSliceListFromMetadataTable.size(); i++) { + FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i); + FileSlice fileSlice2 = fileSliceListFromFS.get(i); + if (!Objects.equals(fileSlice1.getFileGroupId(), fileSlice2.getFileGroupId()) + || !Objects.equals(fileSlice1.getBaseInstantTime(), fileSlice2.getBaseInstantTime()) + || !Objects.equals(fileSlice1.getBaseFile(), fileSlice2.getBaseFile())) { + mismatch = true; + break; + } + if (!areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient)) { + mismatch = true; + break; + } else { + LOG.warn(String.format("There are uncommitted log files in the latest file slices " + + "but the committed log files match: %s %s", fileSlice1, fileSlice2)); + } + } + } + + if (mismatch) { + String message = String.format("Validation of %s for partition %s failed." + + "\n%s from metadata: %s\n%s from file system and base files: %s", + label, partitionPath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS); + LOG.error(message); + throw new HoodieValidationException(message); + } else { + LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath)); + } + } + + /** + * Compares committed log files from two file slices. + * + * @param fs1 File slice 1 + * @param fs2 File slice 2 + * @param metaClient {@link HoodieTableMetaClient} instance + * @return {@code true} if matching; {@code false} otherwise. + */ + private boolean areFileSliceCommittedLogFilesMatching( + FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient) { + Set fs1LogPathSet = + fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet()); + Set fs2LogPathSet = + fs2.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet()); + Set commonLogPathSet = new HashSet<>(fs1LogPathSet); + commonLogPathSet.retainAll(fs2LogPathSet); + // Only keep log file paths that differ + fs1LogPathSet.removeAll(commonLogPathSet); + fs2LogPathSet.removeAll(commonLogPathSet); + // Check if the remaining log files are uncommitted. If there is any log file + // that is committed, the committed log files of two file slices are different + FileSystem fileSystem = metaClient.getFs(); + HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline(); + if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, commitsTimeline)) { + LOG.error("The first file slice has committed log files that cause mismatching: " + + fs1); + return false; + } + if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, commitsTimeline)) { + LOG.error("The second file slice has committed log files that cause mismatching: " + + fs2); + return false; + } + return true; + } + + private boolean hasCommittedLogFiles( + FileSystem fs, Set logFilePathSet, HoodieTimeline commitsTimeline) { + if (logFilePathSet.isEmpty()) { + return false; + } + + AvroSchemaConverter converter = new AvroSchemaConverter(); + HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); + HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); + + for (String logFilePathStr : logFilePathSet) { + HoodieLogFormat.Reader reader = null; + try { + Schema readerSchema = + converter.convert(Objects.requireNonNull( + TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr)))); + reader = + HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathStr)), readerSchema); + // read the avro blocks + if (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + final String instantTime = block.getLogBlockHeader().get(INSTANT_TIME); + if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) + || inflightInstantsTimeline.containsInstant(instantTime)) { + // hit an uncommitted block possibly from a failed write + LOG.warn("Log file is uncommitted: " + logFilePathStr); + } else { + LOG.warn("Log file is committed: " + logFilePathStr); + return true; + } + } else { + LOG.warn("There is no log block in " + logFilePathStr); + } + } catch (IOException e) { + throw new HoodieValidationException("Validation failed due to IOException", e); + } finally { + FileIOUtils.closeQuietly(reader); + } + } + return false; + } + public class AsyncMetadataTableValidateService extends HoodieAsyncService { private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -824,6 +938,10 @@ public HoodieMetadataValidationContext( } } + public HoodieTableMetaClient getMetaClient() { + return metaClient; + } + public List getSortedLatestBaseFileList(String partitionPath) { return fileSystemView.getLatestBaseFiles(partitionPath) .sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());