diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 17c46d14bc985..0b3c072c92a03 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -30,11 +30,13 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -78,8 +80,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -88,6 +92,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; +import static org.apache.hudi.hadoop.CachingPath.getPathWithoutSchemeAndAuthority; /** * A validator with spark-submit to compare information, such as partitions, file listing, index, etc., @@ -750,6 +755,8 @@ private void validateFileSlices( if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) { mismatch = true; } else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) { + // In-memory cache for the set of committed files of commits of interest + Map> committedFilesMap = new HashMap<>(); for (int i = 0; i < fileSliceListFromMetadataTable.size(); i++) { FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i); FileSlice fileSlice2 = fileSliceListFromFS.get(i); @@ -759,7 +766,8 @@ private void validateFileSlices( mismatch = true; break; } - if (!areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient)) { + if (!areFileSliceCommittedLogFilesMatching( + fileSlice1, fileSlice2, metaClient, committedFilesMap)) { mismatch = true; break; } else { @@ -783,13 +791,17 @@ private void validateFileSlices( /** * Compares committed log files from two file slices. * - * @param fs1 File slice 1 - * @param fs2 File slice 2 - * @param metaClient {@link HoodieTableMetaClient} instance + * @param fs1 File slice 1 + * @param fs2 File slice 2 + * @param metaClient {@link HoodieTableMetaClient} instance + * @param committedFilesMap In-memory map for caching committed files of commits * @return {@code true} if matching; {@code false} otherwise. */ private boolean areFileSliceCommittedLogFilesMatching( - FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient) { + FileSlice fs1, + FileSlice fs2, + HoodieTableMetaClient metaClient, + Map> committedFilesMap) { Set fs1LogPathSet = fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet()); Set fs2LogPathSet = @@ -802,26 +814,31 @@ private boolean areFileSliceCommittedLogFilesMatching( // Check if the remaining log files are uncommitted. If there is any log file // that is committed, the committed log files of two file slices are different FileSystem fileSystem = metaClient.getFs(); - HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline(); - if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, commitsTimeline)) { - LOG.error("The first file slice has committed log files that cause mismatching: " - + fs1); + + if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, metaClient, committedFilesMap)) { + LOG.error("The first file slice has committed log files that cause mismatching: " + fs1 + + "; Different log files are: " + fs1LogPathSet); return false; } - if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, commitsTimeline)) { - LOG.error("The second file slice has committed log files that cause mismatching: " - + fs2); + if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, metaClient, committedFilesMap)) { + LOG.error("The second file slice has committed log files that cause mismatching: " + fs2 + + "; Different log files are: " + fs2LogPathSet); return false; } return true; } private boolean hasCommittedLogFiles( - FileSystem fs, Set logFilePathSet, HoodieTimeline commitsTimeline) { + FileSystem fs, + Set logFilePathSet, + HoodieTableMetaClient metaClient, + Map> committedFilesMap) { if (logFilePathSet.isEmpty()) { return false; } + String basePath = metaClient.getBasePathV2().toString(); + HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline(); AvroSchemaConverter converter = new AvroSchemaConverter(); HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); @@ -843,13 +860,56 @@ private boolean hasCommittedLogFiles( if (reader.hasNext()) { HoodieLogBlock block = reader.next(); final String instantTime = block.getLogBlockHeader().get(INSTANT_TIME); - if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) - || inflightInstantsTimeline.containsInstant(instantTime)) { + if (completedInstantsTimeline.containsInstant(instantTime)) { + // The instant is completed, in active timeline + // Checking commit metadata only as log files can only be written by COMMIT or DELTA_COMMIT + if (!committedFilesMap.containsKey(instantTime)) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + completedInstantsTimeline.getInstantDetails( + completedInstantsTimeline.filter(i -> i.getTimestamp().equals(instantTime)) + .firstInstant().get() + ).get(), + HoodieCommitMetadata.class + ); + committedFilesMap.put( + instantTime, + commitMetadata.getWriteStats().stream() + .map(HoodieWriteStat::getPath).collect(Collectors.toSet()) + ); + } + + // Here we check if the commit metadata contains the log file. + // Note that, a log file may be written by a successful write transaction + // leading to a delta commit, but such a log file can be uncommitted and + // not be part of any snapshot, due to Spark task retries for example. + // In such a case, the log file can stay in the file system, but the metadata + // table does not contain the log file for file listing, which is an expected + // behavior. + String relativeLogFilePathStr = getRelativePath(basePath, logFilePathStr); + if (committedFilesMap.get(instantTime).contains(relativeLogFilePathStr)) { + LOG.warn("Log file is committed in an instant in active timeline: instantTime=" + + instantTime + " " + logFilePathStr); + return true; + } else { + LOG.warn("Log file is uncommitted in a completed instant, likely due to retry: " + + "instantTime=" + instantTime + " " + logFilePathStr); + } + } else if (completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) { + // The instant is in archived timeline + LOG.warn("Log file is committed in an instant in archived timeline: instantTime=" + + instantTime + " " + logFilePathStr); + return true; + } else if (inflightInstantsTimeline.containsInstant(instantTime)) { + // The instant is inflight in active timeline // hit an uncommitted block possibly from a failed write - LOG.warn("Log file is uncommitted: " + logFilePathStr); + LOG.warn("Log file is uncommitted because of an inflight instant: instantTime=" + + instantTime + " " + logFilePathStr); } else { - LOG.warn("Log file is committed: " + logFilePathStr); - return true; + // The instant is after the start of the active timeline, + // but it cannot be found in the active timeline + LOG.warn("Log file is uncommitted because the instant is after the start of the " + + "active timeline but absent or in requested in the active timeline: instantTime=" + + instantTime + " " + logFilePathStr); } } else { LOG.warn("There is no log block in " + logFilePathStr); @@ -865,6 +925,19 @@ private boolean hasCommittedLogFiles( return false; } + private String getRelativePath(String basePath, String absoluteFilePath) { + String basePathStr = getPathWithoutSchemeAndAuthority(new Path(basePath)).toString(); + String absoluteFilePathStr = getPathWithoutSchemeAndAuthority(new Path(absoluteFilePath)).toString(); + + if (!absoluteFilePathStr.startsWith(basePathStr)) { + throw new IllegalArgumentException("File path does not belong to the base path! basePath=" + + basePathStr + " absoluteFilePathStr=" + absoluteFilePathStr); + } + + String relativePathStr = absoluteFilePathStr.substring(basePathStr.length()); + return relativePathStr.startsWith("/") ? relativePathStr.substring(1) : relativePathStr; + } + public class AsyncMetadataTableValidateService extends HoodieAsyncService { private final transient ExecutorService executor = Executors.newSingleThreadExecutor();