Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,19 @@
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -57,10 +61,13 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import jline.internal.Log;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

Expand All @@ -70,6 +77,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand All @@ -78,6 +86,8 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;

/**
* A validator with spark-submit to compare information, such as partitions, file listing, index, etc.,
* between metadata table and filesystem.
Expand Down Expand Up @@ -578,9 +588,9 @@ private void validateAllFileGroups(

LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
validate(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, "file slices");

LOG.info("Validation of all file groups succeeded for partition " + partitionPath);
validateFileSlices(
allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath,
fsBasedContext.getMetaClient(), "all file groups");
}

/**
Expand All @@ -605,16 +615,8 @@ private void validateLatestBaseFiles(

LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
if (latestFilesFromMetadata.size() != latestFilesFromFS.size()
|| !latestFilesFromMetadata.equals(latestFilesFromFS)) {
String message = "Validation of metadata get latest base file for partition " + partitionPath + " failed. "
+ "Latest base file from metadata: " + latestFilesFromMetadata
+ "Latest base file from direct listing: " + latestFilesFromFS;
LOG.error(message);
throw new HoodieValidationException(message);
} else {
LOG.info("Validation of getLatestBaseFiles succeeded for partition " + partitionPath);
}

validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, "latest base files");
}

/**
Expand All @@ -639,8 +641,9 @@ private void validateLatestFileSlices(
LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);

validate(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, "file slices");
LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
validateFileSlices(
latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath,
fsBasedContext.getMetaClient(), "latest file slices");
}

private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
Expand Down Expand Up @@ -675,8 +678,6 @@ private void validateAllColumnStats(
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);

validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");

LOG.info("Validation of column stats succeeded for partition " + partitionPath);
}

private void validateBloomFilters(
Expand All @@ -692,8 +693,6 @@ private void validateBloomFilters(
.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);

validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters");

LOG.info("Validation of bloom filters succeeded for partition " + partitionPath);
}

private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
Expand Down Expand Up @@ -723,6 +722,121 @@ private <T> void validate(
}
}

private void validateFileSlices(
List<FileSlice> fileSliceListFromMetadataTable, List<FileSlice> fileSliceListFromFS,
String partitionPath, HoodieTableMetaClient metaClient, String label) {
boolean mismatch = false;
if (fileSliceListFromMetadataTable.size() != fileSliceListFromFS.size()) {
mismatch = true;
} else if (!fileSliceListFromMetadataTable.equals(fileSliceListFromFS)) {
for (int i = 0; i < fileSliceListFromMetadataTable.size(); i++) {
FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
FileSlice fileSlice2 = fileSliceListFromFS.get(i);
if (!Objects.equals(fileSlice1.getFileGroupId(), fileSlice2.getFileGroupId())
|| !Objects.equals(fileSlice1.getBaseInstantTime(), fileSlice2.getBaseInstantTime())
|| !Objects.equals(fileSlice1.getBaseFile(), fileSlice2.getBaseFile())) {
mismatch = true;
break;
}
if (!areFileSliceCommittedLogFilesMatching(fileSlice1, fileSlice2, metaClient)) {
mismatch = true;
break;
} else {
LOG.warn(String.format("There are uncommitted log files in the latest file slices "
+ "but the committed log files match: %s %s", fileSlice1, fileSlice2));
}
}
}

if (mismatch) {
String message = String.format("Validation of %s for partition %s failed."
+ "\n%s from metadata: %s\n%s from file system and base files: %s",
label, partitionPath, label, fileSliceListFromMetadataTable, label, fileSliceListFromFS);
LOG.error(message);
throw new HoodieValidationException(message);
} else {
LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath));
}
}

/**
* Compares committed log files from two file slices.
*
* @param fs1 File slice 1
* @param fs2 File slice 2
* @param metaClient {@link HoodieTableMetaClient} instance
* @return {@code true} if matching; {@code false} otherwise.
*/
private boolean areFileSliceCommittedLogFilesMatching(
FileSlice fs1, FileSlice fs2, HoodieTableMetaClient metaClient) {
Set<String> fs1LogPathSet =
fs1.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
Set<String> fs2LogPathSet =
fs2.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toSet());
Set<String> commonLogPathSet = new HashSet<>(fs1LogPathSet);
commonLogPathSet.retainAll(fs2LogPathSet);
// Only keep log file paths that differ
fs1LogPathSet.removeAll(commonLogPathSet);
fs2LogPathSet.removeAll(commonLogPathSet);
// Check if the remaining log files are uncommitted. If there is any log file
// that is committed, the committed log files of two file slices are different
FileSystem fileSystem = metaClient.getFs();
HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, commitsTimeline)) {
LOG.error("The first file slice has committed log files that cause mismatching: "
+ fs1);
return false;
}
if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, commitsTimeline)) {
LOG.error("The second file slice has committed log files that cause mismatching: "
+ fs2);
return false;
}
return true;
}

private boolean hasCommittedLogFiles(
FileSystem fs, Set<String> logFilePathSet, HoodieTimeline commitsTimeline) {
if (logFilePathSet.isEmpty()) {
return false;
}

AvroSchemaConverter converter = new AvroSchemaConverter();
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();

for (String logFilePathStr : logFilePathSet) {
HoodieLogFormat.Reader reader = null;
try {
Schema readerSchema =
converter.convert(Objects.requireNonNull(
TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr))));
reader =
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathStr)), readerSchema);
// read the avro blocks
if (reader.hasNext()) {
HoodieLogBlock block = reader.next();
final String instantTime = block.getLogBlockHeader().get(INSTANT_TIME);
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
|| inflightInstantsTimeline.containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write
LOG.warn("Log file is uncommitted: " + logFilePathStr);
} else {
LOG.warn("Log file is committed: " + logFilePathStr);
return true;
}
} else {
LOG.warn("There is no log block in " + logFilePathStr);
}
} catch (IOException e) {
throw new HoodieValidationException("Validation failed due to IOException", e);
} finally {
FileIOUtils.closeQuietly(reader);
}
}
return false;
}

public class AsyncMetadataTableValidateService extends HoodieAsyncService {
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

Expand Down Expand Up @@ -824,6 +938,10 @@ public HoodieMetadataValidationContext(
}
}

public HoodieTableMetaClient getMetaClient() {
return metaClient;
}

public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
return fileSystemView.getLatestBaseFiles(partitionPath)
.sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
Expand Down