Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand Down Expand Up @@ -78,8 +80,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -88,6 +92,7 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static org.apache.hudi.hadoop.CachingPath.getPathWithoutSchemeAndAuthority;

/**
* A validator with spark-submit to compare information, such as partitions, file listing, index, etc.,
Expand Down Expand Up @@ -750,6 +755,8 @@ private void validateFileSlices(
if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) {
mismatch = true;
} else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) {
// In-memory cache for the set of committed files of commits of interest
Map<String, Set<String>> committedFilesMap = new HashMap<>();
for (int i = 0; i < fileSliceListFromMetadataTable.size(); i++) {
FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
FileSlice fileSlice2 = fileSliceListFromFS.get(i);
Expand All @@ -759,7 +766,8 @@ private void validateFileSlices(
mismatch = true;
break;
}
if (!areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient)) {
if (!areFileSliceCommittedLogFilesMatching(
fileSlice1, fileSlice2, metaClient, committedFilesMap)) {
mismatch = true;
break;
} else {
Expand All @@ -783,13 +791,17 @@ private void validateFileSlices(
/**
* Compares committed log files from two file slices.
*
* @param fs1 File slice 1
* @param fs2 File slice 2
* @param metaClient {@link HoodieTableMetaClient} instance
* @param fs1 File slice 1
* @param fs2 File slice 2
* @param metaClient {@link HoodieTableMetaClient} instance
* @param committedFilesMap In-memory map for caching committed files of commits
* @return {@code true} if matching; {@code false} otherwise.
*/
private boolean areFileSliceCommittedLogFilesMatching(
FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient) {
FileSlice fs1,
FileSlice fs2,
HoodieTableMetaClient metaClient,
Map<String, Set<String>> committedFilesMap) {
Set<String> fs1LogPathSet =
fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
Set<String> fs2LogPathSet =
Expand All @@ -802,26 +814,31 @@ private boolean areFileSliceCommittedLogFilesMatching(
// Check if the remaining log files are uncommitted. If there is any log file
// that is committed, the committed log files of two file slices are different
FileSystem fileSystem = metaClient.getFs();
HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, commitsTimeline)) {
LOG.error("The first file slice has committed log files that cause mismatching: "
+ fs1);

if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, metaClient, committedFilesMap)) {
LOG.error("The first file slice has committed log files that cause mismatching: " + fs1
+ "; Different log files are: " + fs1LogPathSet);
return false;
}
if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, commitsTimeline)) {
LOG.error("The second file slice has committed log files that cause mismatching: "
+ fs2);
if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, metaClient, committedFilesMap)) {
LOG.error("The second file slice has committed log files that cause mismatching: " + fs2
+ "; Different log files are: " + fs2LogPathSet);
return false;
}
return true;
}

private boolean hasCommittedLogFiles(
FileSystem fs, Set<String> logFilePathSet, HoodieTimeline commitsTimeline) {
FileSystem fs,
Set<String> logFilePathSet,
HoodieTableMetaClient metaClient,
Map<String, Set<String>> committedFilesMap) {
if (logFilePathSet.isEmpty()) {
return false;
}

String basePath = metaClient.getBasePathV2().toString();
HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
AvroSchemaConverter converter = new AvroSchemaConverter();
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
Expand All @@ -843,13 +860,56 @@ private boolean hasCommittedLogFiles(
if (reader.hasNext()) {
HoodieLogBlock block = reader.next();
final String instantTime = block.getLogBlockHeader().get(INSTANT_TIME);
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
|| inflightInstantsTimeline.containsInstant(instantTime)) {
if (completedInstantsTimeline.containsInstant(instantTime)) {
// The instant is completed, in active timeline
// Checking commit metadata only as log files can only be written by COMMIT or DELTA_COMMIT
if (!committedFilesMap.containsKey(instantTime)) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
completedInstantsTimeline.getInstantDetails(
completedInstantsTimeline.filter(i -> i.getTimestamp().equals(instantTime))
.firstInstant().get()
).get(),
HoodieCommitMetadata.class
);
committedFilesMap.put(
instantTime,
commitMetadata.getWriteStats().stream()
.map(HoodieWriteStat::getPath).collect(Collectors.toSet())
);
}

// Here we check if the commit metadata contains the log file.
// Note that, a log file may be written by a successful write transaction
// leading to a delta commit, but such a log file can be uncommitted and
// not be part of any snapshot, due to Spark task retries for example.
// In such a case, the log file can stay in the file system, but the metadata
// table does not contain the log file for file listing, which is an expected
// behavior.
String relativeLogFilePathStr = getRelativePath(basePath, logFilePathStr);
if (committedFilesMap.get(instantTime).contains(relativeLogFilePathStr)) {
LOG.warn("Log file is committed in an instant in active timeline: instantTime="
+ instantTime + " " + logFilePathStr);
return true;
} else {
LOG.warn("Log file is uncommitted in a completed instant, likely due to retry: "
+ "instantTime=" + instantTime + " " + logFilePathStr);
}
} else if (completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) {
// The instant is in archived timeline
LOG.warn("Log file is committed in an instant in archived timeline: instantTime="
+ instantTime + " " + logFilePathStr);
return true;
} else if (inflightInstantsTimeline.containsInstant(instantTime)) {
// The instant is inflight in active timeline
// hit an uncommitted block possibly from a failed write
LOG.warn("Log file is uncommitted: " + logFilePathStr);
LOG.warn("Log file is uncommitted because of an inflight instant: instantTime="
+ instantTime + " " + logFilePathStr);
} else {
LOG.warn("Log file is committed: " + logFilePathStr);
return true;
// The instant is after the start of the active timeline,
// but it cannot be found in the active timeline
LOG.warn("Log file is uncommitted because the instant is after the start of the "
+ "active timeline but absent or in requested in the active timeline: instantTime="
+ instantTime + " " + logFilePathStr);
}
} else {
LOG.warn("There is no log block in " + logFilePathStr);
Expand All @@ -865,6 +925,19 @@ private boolean hasCommittedLogFiles(
return false;
}

private String getRelativePath(String basePath, String absoluteFilePath) {
String basePathStr = getPathWithoutSchemeAndAuthority(new Path(basePath)).toString();
String absoluteFilePathStr = getPathWithoutSchemeAndAuthority(new Path(absoluteFilePath)).toString();

if (!absoluteFilePathStr.startsWith(basePathStr)) {
throw new IllegalArgumentException("File path does not belong to the base path! basePath="
+ basePathStr + " absoluteFilePathStr=" + absoluteFilePathStr);
}

String relativePathStr = absoluteFilePathStr.substring(basePathStr.length());
return relativePathStr.startsWith("/") ? relativePathStr.substring(1) : relativePathStr;
}

public class AsyncMetadataTableValidateService extends HoodieAsyncService {
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

Expand Down