Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,19 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
Stream<HoodieInstant> instantsStream;

HoodieTimeline timeline;
HoodieTimeline writeTimeline;
if (basefileOnly) {
timeline = metaClient.getActiveTimeline().getCommitTimeline();
writeTimeline = metaClient.getActiveTimeline().getCommitTimeline();
} else if (excludeCompaction) {
timeline = metaClient.getActiveTimeline().getCommitsTimeline();
writeTimeline = metaClient.getActiveTimeline().getCommitsTimeline();
} else {
timeline = metaClient.getActiveTimeline().getWriteTimeline();
writeTimeline = metaClient.getActiveTimeline().getWriteTimeline();
}

if (!includeInflight) {
timeline = timeline.filterCompletedInstants();
timeline = writeTimeline.filterCompletedInstants();
} else {
timeline = writeTimeline;
}

instantsStream = timeline.getInstantsAsStream();
Expand All @@ -270,6 +273,6 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m

HoodieTimeline filteredTimeline = new HoodieDefaultTimeline(instantsStream,
(Function<HoodieInstant, Option<byte[]>> & Serializable) metaClient.getActiveTimeline()::getInstantDetails);
return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new FileStatus[0]));
return new HoodieTableFileSystemView(metaClient, filteredTimeline, writeTimeline, statuses.toArray(new FileStatus[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DedupeSparkJob(basePath: String,
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()

val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles)
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), metadata.getActiveTimeline.getCommitsTimeline(), allFiles)
val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
val filteredStatuses = latestFiles.map(f => f.getPath)
LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}")
Expand Down Expand Up @@ -187,7 +187,7 @@ class DedupeSparkJob(basePath: String,
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()

val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles)
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), metadata.getActiveTimeline.getCommitsTimeline, allFiles)

val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public List<ValidationOpResult> validateCompactionPlan(HoodieTableMetaClient met
int parallelism) throws IOException {
HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant);
HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline());

if (plan.getOperations() != null) {
List<CompactionOperation> ops = plan.getOperations().stream()
Expand Down Expand Up @@ -203,7 +203,7 @@ public List<RenameOpResult> repairCompaction(String compactionInstant, int paral
}

final HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline());
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
failed.stream().flatMap(v -> getRenamingActionsToAlignWithCompactionOperation(metaClient, compactionInstant,
v.getOperation(), Option.of(fsView)).stream()).collect(Collectors.toList());
Expand Down Expand Up @@ -233,7 +233,7 @@ protected static List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsToAl
HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation op,
Option<HoodieTableFileSystemView> fsViewOpt) {
HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get()
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline());
HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get();
FileSlice merged =
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp())
Expand Down Expand Up @@ -280,7 +280,7 @@ protected static void renameLogFile(HoodieTableMetaClient metaClient, HoodieLogF
private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient metaClient, String compactionInstant,
CompactionOperation operation, Option<HoodieTableFileSystemView> fsViewOpt) throws IOException {
HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get()
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline());
Option<HoodieInstant> lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant();
try {
if (lastInstant.isPresent()) {
Expand Down Expand Up @@ -388,7 +388,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
HoodieTableMetaClient metaClient, String compactionInstant, int parallelism,
Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get()
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline());
HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant);
if (plan.getOperations() != null) {
LOG.info(
Expand Down Expand Up @@ -428,7 +428,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
List<Pair<HoodieLogFile, HoodieLogFile>> result = new ArrayList<>();
HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get()
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline());
if (!skipValidation) {
validateCompactionOperation(metaClient, compactionInstant, operation, Option.of(fileSystemView));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public Configuration getHadoopConf() {
* Get the view of the file system for this table.
*/
public TableFileSystemView getFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline(), metaClient.getCommitsTimeline());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface HoodieMetaClientProvider {
HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException;

default HoodieTableFileSystemView getHoodieTableFileSystemView(
HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileStatus[] fileStatuses) {
return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedTimeline, HoodieTimeline visibleWriteTimeline, FileStatus[] fileStatuses) {
return new HoodieTableFileSystemView(metaClient, visibleCompletedTimeline, visibleWriteTimeline, fileStatuses);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(Compacti

Set<HoodieLogFile> gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet());
final HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline());
Set<HoodieLogFile> expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0])
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).flatMap(FileSlice::getLogFiles)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -272,7 +272,7 @@ private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(Compacti

metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline());
// Expect each file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
Expand Down Expand Up @@ -312,7 +312,7 @@ private void validateUnScheduleFileId(CompactionAdminClient client, String inges

Set<HoodieLogFile> gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet());
final HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline());
Set<HoodieLogFile> expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0])
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId())).flatMap(FileSlice::getLogFiles)
Expand All @@ -333,7 +333,7 @@ private void validateUnScheduleFileId(CompactionAdminClient client, String inges

metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I

final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = new HashMap<>();
metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline(), metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
if (COLUMN_STATS.getPartitionPath().equals(partition)) {
Expand Down Expand Up @@ -2522,7 +2522,7 @@ private void validateMetadata(SparkRDDWriteClient testClient) throws IOException
// Metadata table should automatically compact and clean
// versions are +1 as autoclean / compaction happens end of commits
int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline(), metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
Expand Down Expand Up @@ -59,7 +61,9 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.ClusteringTestUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
Expand Down Expand Up @@ -2699,6 +2703,48 @@ public void testMultiOperationsPerCommit(boolean populateMetaFields) throws IOEx
"Must contain " + totalRecords + " records");
}

@Test
public void testFailedFirstCommit() throws IOException {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
HoodieWriteConfig cfg = cfgBuilder.withAutoCommit(false)
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).withAsyncClean(true).build())
.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
String firstInstantTime = "10000";
client.startCommitWithTime(firstInstantTime);
int numRecords = 100;
// do not commit first commit
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1);
JavaRDD<WriteStatus> result1 = client.bulkInsert(writeRecords1, firstInstantTime);
assertTrue(client.commit(firstInstantTime, result1), "Commit should succeed");
// remove complete meta file to mimic partial failure.
metaClient.getFs().delete(new Path(basePath + "/.hoodie/" + firstInstantTime + ".commit"));
metaClient = HoodieTableMetaClient.reload(metaClient);

client = getHoodieWriteClient(cfg);
// lets add 2nd commit which succeeds.
String secondInstantTime = "20000";
client.startCommitWithTime(secondInstantTime);
// do not commit first commit
JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(dataGen.generateInserts(secondInstantTime, numRecords), 1);
JavaRDD<WriteStatus> result2 = client.bulkInsert(writeRecords2, secondInstantTime);
assertTrue(client.commit(secondInstantTime, result2), "Commit should succeed");

// File listing using fs based listing.
HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(context, basePath, false, true);

HoodieTableFileSystemView fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(context,
metaClient, HoodieMetadataConfig.newBuilder().enable(false).build());

for (String partitionPath: allPartitionPathsFromFS) {
List<HoodieBaseFile> baseFiles = fileSystemView.getLatestBaseFiles(partitionPath).collect(Collectors.toList());
boolean invalidFilesPresent = baseFiles.stream().anyMatch(baseFile -> baseFile.getCommitTime().equals(firstInstantTime));
assertFalse(invalidFilesPresent); // no data files from firstCommit should be returned.
}
}

/**
* Build Hoodie Write Config for small data file sizes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
// FILENAME_METADATA_FIELD payload (entailing that corresponding metadata is in-sync with
// the state of the table
HoodieTableFileSystemView tableView =
getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseFiles());
getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseFiles());

Set<String> latestBaseFileNames = tableView.getLatestBaseFiles()
.map(BaseFile::getFileName)
Expand Down
Loading