Skip to content

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,38 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadStat;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -53,19 +68,31 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import scala.Tuple2;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* The test harness for resource initialization and cleanup.
*/
Expand Down Expand Up @@ -149,7 +176,7 @@ protected void initSparkContexts(String appName) {
}

/**
* Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext})
* Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext})
* with a default name matching the name of the class.
*/
protected void initSparkContexts() {
Expand Down Expand Up @@ -376,9 +403,9 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String base
}

public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileStatus[] fileStatuses) {
FileStatus[] fileStatuses) {
if (tableView == null) {
tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
} else {
tableView.init(metaClient, visibleActiveTimeline, fileStatuses);
}
Expand Down Expand Up @@ -418,4 +445,175 @@ public static Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(Jav
}
return Pair.of(partitionPathStatMap, globalStat);
}

/**
* Validate the metadata tables contents to ensure it matches what is on the file system.
*/
public void validateMetadata(HoodieTestTable testTable, List<String> inflightCommits, HoodieWriteConfig writeConfig,
String metadataTableBasePath, boolean doFullValidation) throws IOException {
HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
assertNotNull(tableMetadata, "MetadataReader should have been initialized");
if (!writeConfig.isMetadataTableEnabled() || !writeConfig.getMetadataConfig().validateFileListingMetadata()) {
return;
}

assertEquals(inflightCommits, testTable.inflightCommits());

HoodieTimer timer = new HoodieTimer().startTimer();
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

// Partitions should match
List<java.nio.file.Path> fsPartitionPaths = testTable.getAllPartitionPaths();
List<String> fsPartitions = new ArrayList<>();
fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString()));
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();

Collections.sort(fsPartitions);
Collections.sort(metadataPartitions);

assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match");
assertEquals(fsPartitions, metadataPartitions, "Partitions should match");

// Files within each partition should match
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
assertEquals(fsPartitions.size(), partitionToFilesMap.size());

fsPartitions.forEach(partition -> {
try {
validateFilesPerPartition(testTable, tableMetadata, tableView, partitionToFilesMap, partition);
} catch (IOException e) {
fail("Exception should not be raised: " + e);
}
});
if (doFullValidation) {
runFullValidation(writeConfig, metadataTableBasePath, engineContext);
}

LOG.info("Validation time=" + timer.endTimer());
}

public void syncTableMetadata(HoodieWriteConfig writeConfig) {
if (!writeConfig.getMetadataConfig().enableSync()) {
return;
}
// Open up the metadata table again, for syncing
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context)) {
LOG.info("Successfully synced to metadata table");
} catch (Exception e) {
throw new HoodieMetadataException("Error syncing to metadata table.", e);
}
}

public HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig clientConfig) {
return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter
.create(hadoopConf, clientConfig, new HoodieSparkEngineContext(jsc));
}

public HoodieTableMetadata metadata(HoodieWriteConfig clientConfig, HoodieEngineContext hoodieEngineContext) {
return HoodieTableMetadata.create(hoodieEngineContext, clientConfig.getMetadataConfig(), clientConfig.getBasePath(),
clientConfig.getSpillableMapBasePath());
}

private void validateFilesPerPartition(HoodieTestTable testTable, HoodieTableMetadata tableMetadata, TableFileSystemView tableView,
Map<String, FileStatus[]> partitionToFilesMap, String partition) throws IOException {
Path partitionPath;
if (partition.equals("")) {
// Should be the non-partitioned case
partitionPath = new Path(basePath);
} else {
partitionPath = new Path(basePath, partition);
}

FileStatus[] fsStatuses = testTable.listAllFilesInPartition(partition);
FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath);
List<String> fsFileNames = Arrays.stream(fsStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
List<String> metadataFilenames = Arrays.stream(metaStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);

assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);

if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));

for (String fileName : fsFileNames) {
if (!metadataFilenames.contains(fileName)) {
LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
}
}
for (String fileName : metadataFilenames) {
if (!fsFileNames.contains(fileName)) {
LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
}
}
}

// Block sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
assertEquals(fsBlockSizes, metadataBlockSizes);

assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
assertEquals(fsFileNames, metadataFilenames, "Files within partition " + partition + " should match");

// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));

fileGroups.forEach(g -> LogManager.getLogger(getClass()).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(getClass()).info(b)));
fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(getClass()).info(s)));

long numFiles = fileGroups.stream()
.mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
.sum();
assertEquals(metadataFilenames.size(), numFiles);
}

private void runFullValidation(HoodieWriteConfig writeConfig, String metadataTableBasePath, HoodieSparkEngineContext engineContext) {
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(writeConfig);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");

// Validate write config for metadata table
HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
assertFalse(metadataWriteConfig.isMetadataTableEnabled(), "No metadata table for metadata table");

// Metadata table should be in sync with the dataset
assertTrue(metadata(writeConfig, engineContext).isInSync());
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();

// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");

// Metadata table is HFile format
assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE,
"Metadata Table base file format should be HFile");

// Metadata table has a fixed number of partitions
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath),
false, false, false);
Assertions.assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());

// Metadata table should automatically compact and clean
// versions are +1 as autoclean / compaction happens end of commits
int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file");
assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice");
assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to "
+ numFileVersions + " but was " + latestSlices.size());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ public static String getCommitFromCommitFile(String commitFileName) {
}

public static String getCommitTime(String fullFileName) {
if (isLogFile(new Path(fullFileName))) {
return fullFileName.split("_")[1].split("\\.")[0];
}
return fullFileName.split("_")[2].split("\\.")[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -157,6 +158,9 @@ public void testGetCommitTime() {
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
// test log file name
fullFileName = FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), instantTime, 1, TEST_WRITE_TOKEN);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.hudi.exception.HoodieException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.RandomAccessFile;
Expand All @@ -48,8 +50,11 @@
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan;
Expand All @@ -59,6 +64,8 @@

public class FileCreateUtils {

private static final Logger LOG = LogManager.getLogger(FileCreateUtils.class);

private static final String WRITE_TOKEN = "1-0-1";
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();

Expand Down Expand Up @@ -216,6 +223,10 @@ public static void createRequestedCompaction(String basePath, String instantTime
createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
}

public static void createInflightCompaction(String basePath, String instantTime) throws IOException {
createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
}

public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
Expand Down Expand Up @@ -307,6 +318,13 @@ public static long getTotalMarkerFileCount(String basePath, String partitionPath
.endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count();
}

public static List<Path> getPartitionPaths(Path basePath) throws IOException {
if (Files.notExists(basePath)) {
return Collections.emptyList();
}
return Files.list(basePath).filter(entry -> !entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList());
}

/**
* Find total basefiles for passed in paths.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public static void deleteFile(File fileToDelete) throws IOException {
}

public static List<FileStatus> listRecursive(FileSystem fs, Path path) throws IOException {
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
return listFiles(fs, path, true);
}

public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive) throws IOException {
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, recursive);
List<FileStatus> statuses = new ArrayList<>();
while (itr.hasNext()) {
statuses.add(itr.next());
Expand Down
Loading