diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index 08c892dde4bb8..cad55560adac1 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -244,16 +244,19 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m Stream instantsStream; HoodieTimeline timeline; + HoodieTimeline writeTimeline; if (basefileOnly) { - timeline = metaClient.getActiveTimeline().getCommitTimeline(); + writeTimeline = metaClient.getActiveTimeline().getCommitTimeline(); } else if (excludeCompaction) { - timeline = metaClient.getActiveTimeline().getCommitsTimeline(); + writeTimeline = metaClient.getActiveTimeline().getCommitsTimeline(); } else { - timeline = metaClient.getActiveTimeline().getWriteTimeline(); + writeTimeline = metaClient.getActiveTimeline().getWriteTimeline(); } if (!includeInflight) { - timeline = timeline.filterCompletedInstants(); + timeline = writeTimeline.filterCompletedInstants(); + } else { + timeline = writeTimeline; } instantsStream = timeline.getInstantsAsStream(); @@ -270,6 +273,6 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m HoodieTimeline filteredTimeline = new HoodieDefaultTimeline(instantsStream, (Function> & Serializable) metaClient.getActiveTimeline()::getInstantDetails); - return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new FileStatus[0])); + return new HoodieTableFileSystemView(metaClient, filteredTimeline, writeTimeline, statuses.toArray(new FileStatus[0])); } } diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala index 00e96a3487504..17ae748ce1e9a 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala @@ -78,7 +78,7 @@ class DedupeSparkJob(basePath: String, val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath")) - val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), metadata.getActiveTimeline.getCommitsTimeline(), allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) val filteredStatuses = latestFiles.map(f => f.getPath) LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}") @@ -187,7 +187,7 @@ class DedupeSparkJob(basePath: String, val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath")) - val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), allFiles) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitsTimeline.filterCompletedInstants(), metadata.getActiveTimeline.getCommitsTimeline, allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index a394c6d905543..cf39a85839d16 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -81,7 +81,7 @@ public List validateCompactionPlan(HoodieTableMetaClient met int parallelism) throws IOException { HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant); HoodieTableFileSystemView fsView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline()); if (plan.getOperations() != null) { List ops = plan.getOperations().stream() @@ -203,7 +203,7 @@ public List repairCompaction(String compactionInstant, int paral } final HoodieTableFileSystemView fsView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline()); List> renameActions = failed.stream().flatMap(v -> getRenamingActionsToAlignWithCompactionOperation(metaClient, compactionInstant, v.getOperation(), Option.of(fsView)).stream()).collect(Collectors.toList()); @@ -233,7 +233,7 @@ protected static List> getRenamingActionsToAl HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation op, Option fsViewOpt) { HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() - : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline()); HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get(); FileSlice merged = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) @@ -280,7 +280,7 @@ protected static void renameLogFile(HoodieTableMetaClient metaClient, HoodieLogF private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation operation, Option fsViewOpt) throws IOException { HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() - : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline()); Option lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant(); try { if (lastInstant.isPresent()) { @@ -388,7 +388,7 @@ public List> getRenamingActionsForUnschedulin HoodieTableMetaClient metaClient, String compactionInstant, int parallelism, Option fsViewOpt, boolean skipValidation) throws IOException { HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get() - : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline()); HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant); if (plan.getOperations() != null) { LOG.info( @@ -428,7 +428,7 @@ public List> getRenamingActionsForUnschedulin Option fsViewOpt, boolean skipValidation) throws IOException { List> result = new ArrayList<>(); HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() - : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + : new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getCommitsAndCompactionTimeline()); if (!skipValidation) { validateCompactionOperation(metaClient, compactionInstant, operation, Option.of(fileSystemView)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 3a31c80d4289e..a83f05c5d4a90 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -296,7 +296,7 @@ public Configuration getHadoopConf() { * Get the view of the file system for this table. */ public TableFileSystemView getFileSystemView() { - return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline()); + return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline(), metaClient.getCommitsTimeline()); } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java index 0cd7ed5a71504..682acc0ae5048 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java @@ -34,7 +34,7 @@ public interface HoodieMetaClientProvider { HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException; default HoodieTableFileSystemView getHoodieTableFileSystemView( - HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileStatus[] fileStatuses) { - return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); + HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedTimeline, HoodieTimeline visibleWriteTimeline, FileStatus[] fileStatuses) { + return new HoodieTableFileSystemView(metaClient, visibleCompletedTimeline, visibleWriteTimeline, fileStatuses); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index 477c377b63645..6899079524d06 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -240,7 +240,7 @@ private List> validateUnSchedulePlan(Compacti Set gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet()); final HoodieTableFileSystemView fsView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline()); Set expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).flatMap(FileSlice::getLogFiles) .collect(Collectors.toSet()); @@ -272,7 +272,7 @@ private List> validateUnSchedulePlan(Compacti metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); final HoodieTableFileSystemView newFsView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline()); // Expect each file-slice whose base-commit is same as compaction commit to contain no new Log files newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) @@ -312,7 +312,7 @@ private void validateUnScheduleFileId(CompactionAdminClient client, String inges Set gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet()); final HoodieTableFileSystemView fsView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline()); Set expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) .filter(fs -> fs.getFileId().equals(op.getFileId())).flatMap(FileSlice::getLogFiles) @@ -333,7 +333,7 @@ private void validateUnScheduleFileId(CompactionAdminClient client, String inges metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); final HoodieTableFileSystemView newFsView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline(), metaClient.getActiveTimeline().getWriteTimeline()); // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 26c8ef3273437..d5b65dfcb2098 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -711,7 +711,7 @@ public void testMetadataTableDeletePartition(HoodieTableType tableType) throws I final Map metadataEnabledPartitionTypes = new HashMap<>(); metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e)); - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline(), metadataMetaClient.getActiveTimeline()); metadataTablePartitions.forEach(partition -> { List latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList()); if (COLUMN_STATS.getPartitionPath().equals(partition)) { @@ -2522,7 +2522,7 @@ private void validateMetadata(SparkRDDWriteClient testClient) throws IOException // Metadata table should automatically compact and clean // versions are +1 as autoclean / compaction happens end of commits int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1; - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline(), metadataMetaClient.getActiveTimeline()); metadataTablePartitions.forEach(partition -> { List latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList()); assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index ba32aea0b3463..a2b44fae01a58 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -28,9 +28,11 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy; import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.validator.SparkPreCommitValidator; import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator; import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; @@ -59,7 +61,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.ClusteringTestUtils; import org.apache.hudi.common.testutils.FileCreateUtils; @@ -2699,6 +2703,48 @@ public void testMultiOperationsPerCommit(boolean populateMetaFields) throws IOEx "Must contain " + totalRecords + " records"); } + @Test + public void testFailedFirstCommit() throws IOException { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()); + HoodieWriteConfig cfg = cfgBuilder.withAutoCommit(false) + .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).withAsyncClean(true).build()) + .build(); + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + String firstInstantTime = "10000"; + client.startCommitWithTime(firstInstantTime); + int numRecords = 100; + // do not commit first commit + JavaRDD writeRecords1 = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1); + JavaRDD result1 = client.bulkInsert(writeRecords1, firstInstantTime); + assertTrue(client.commit(firstInstantTime, result1), "Commit should succeed"); + // remove complete meta file to mimic partial failure. + metaClient.getFs().delete(new Path(basePath + "/.hoodie/" + firstInstantTime + ".commit")); + metaClient = HoodieTableMetaClient.reload(metaClient); + + client = getHoodieWriteClient(cfg); + // lets add 2nd commit which succeeds. + String secondInstantTime = "20000"; + client.startCommitWithTime(secondInstantTime); + // do not commit first commit + JavaRDD writeRecords2 = jsc.parallelize(dataGen.generateInserts(secondInstantTime, numRecords), 1); + JavaRDD result2 = client.bulkInsert(writeRecords2, secondInstantTime); + assertTrue(client.commit(secondInstantTime, result2), "Commit should succeed"); + + // File listing using fs based listing. + HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc); + List allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(context, basePath, false, true); + + HoodieTableFileSystemView fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(context, + metaClient, HoodieMetadataConfig.newBuilder().enable(false).build()); + + for (String partitionPath: allPartitionPathsFromFS) { + List baseFiles = fileSystemView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()); + boolean invalidFilesPresent = baseFiles.stream().anyMatch(baseFile -> baseFile.getCommitTime().equals(firstInstantTime)); + assertFalse(invalidFilesPresent); // no data files from firstCommit should be returned. + } + } + /** * Build Hoodie Write Config for small data file sizes. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 05a4de483c11f..7c7f085d886e5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -235,7 +235,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // FILENAME_METADATA_FIELD payload (entailing that corresponding metadata is in-sync with // the state of the table HoodieTableFileSystemView tableView = - getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseFiles()); + getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseFiles()); Set latestBaseFileNames = tableView.getLatestBaseFiles() .map(BaseFile::getFileName) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 3dfcf24ff5981..e8a26512bb7d8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -159,12 +159,12 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient, - metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles); + metaClient.getCommitsTimeline().filterCompletedInstants(), metaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); Map fileIdToSize = dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize)); - roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), metaClient.getCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List dataFilesList = dataFilesToRead.collect(Collectors.toList()); assertTrue(dataFilesList.size() > 0, @@ -193,7 +193,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { allFiles = listAllBaseFilesInPath(hoodieTable); roView = getHoodieTableFileSystemView(metaClient, - hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles); + hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), hoodieTable.getActiveTimeline().reload().getCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List newDataFilesList = dataFilesToRead.collect(Collectors.toList()); Map fileIdToNewSize = @@ -635,11 +635,11 @@ public void testHandleUpdateWithMultiplePartitions() throws Exception { FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); BaseFileOnlyView roView = - getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), metaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), metaClient.getCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the base files we wrote in the delta commit"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index 551533bb894cd..bac89da4494ea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -258,13 +258,13 @@ protected List createNextDeltaCommit(String instantTime, List getCurrentLatestBaseFiles(HoodieTable table) throws IOException { FileStatus[] allBaseFiles = HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(); HoodieTableFileSystemView view = - getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allBaseFiles); + getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), table.getMetaClient().getActiveTimeline().getWriteTimeline(), allBaseFiles); return view.getLatestBaseFiles().collect(Collectors.toList()); } protected List getCurrentLatestFileSlices(HoodieTable table) { HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), - table.getMetaClient().getActiveTimeline().reload().getWriteTimeline()); + table.getMetaClient().getActiveTimeline().reload().getWriteTimeline(), table.getMetaClient().getActiveTimeline().reload().getWriteTimeline()); return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) .flatMap(view::getLatestFileSlices).collect(Collectors.toList()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 73b1da95648e2..b8c7f8b06604c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -118,7 +118,8 @@ public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean popul HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); hoodieTable.getHoodieView().sync(); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), + metaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); @@ -271,11 +272,13 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E assertFalse(commit.isPresent()); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), + metaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), + metaClient.getCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the base files we wrote in the delta commit"); @@ -312,7 +315,7 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E assertFalse(commit.isPresent()); allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), metaClient.getCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index d2927f2be5a94..56806b19c8aaa 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -139,7 +139,8 @@ void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exc metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), + metaClient.getCommitsTimeline(), allFiles); final String absentCommit = newCommitTime; assertAll(tableView.getLatestBaseFiles().map(file -> () -> assertNotEquals(absentCommit, file.getCommitTime()))); @@ -191,11 +192,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro assertFalse(commit.isPresent()); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), + metaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), + metaClient.getCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the base files we wrote in the delta commit"); @@ -274,7 +277,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), metaClient.getCommitsTimeline(), allFiles); inputPaths = tableView.getLatestBaseFiles() .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) .collect(Collectors.toList()); @@ -311,7 +314,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime)); allFiles = listAllBaseFilesInPath(hoodieTable); metaClient = HoodieTableMetaClient.reload(metaClient); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), metaClient.getCommitsTimeline(), allFiles); assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); assertAll(tableView.getLatestBaseFiles().map(file -> () -> assertNotEquals(compactedCommitTime, file.getCommitTime()))); @@ -369,11 +372,12 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), + metaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), metaClient.getCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "Should list the base files we wrote in the delta commit"); @@ -459,7 +463,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { allFiles = listAllBaseFilesInPath(hoodieTable); metaClient = HoodieTableMetaClient.reload(metaClient); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), metaClient.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); @@ -488,10 +492,10 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { metaClient = HoodieTableMetaClient.reload(metaClient); allFiles = listAllBaseFilesInPath(hoodieTable); - tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), metaClient.getCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - TableFileSystemView.SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + TableFileSystemView.SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), metaClient.getCommitsTimeline(), allFiles); List fileGroups = ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList()); assertTrue(fileGroups.isEmpty()); @@ -576,7 +580,8 @@ void testRestoreWithCleanedUpCommits() throws Exception { // verify that no files are present after 002. every data file should have been cleaned up HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), + metaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.anyMatch(file -> HoodieTimeline.compareTimestamps("002", HoodieTimeline.GREATER_THAN, file.getCommitTime()))); @@ -670,7 +675,8 @@ private void validateRecords(HoodieWriteConfig cfg, HoodieTableMetaClient metaCl HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); - HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), + metaClient.getCommitsTimeline(), allFiles); List inputPaths = tableView.getLatestBaseFiles() .map(hf -> new Path(hf.getPath()).getParent().toString()) .collect(Collectors.toList()); @@ -997,6 +1003,7 @@ private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean rollbackUsi private SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) { try { return new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline(), metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseAndLogFiles() ); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 9d305777adf9d..6208bad2f6e35 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -438,12 +438,12 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String base return metaClient; } - public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, - FileStatus[] fileStatuses) { + public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, + HoodieTimeline visibleWriteTimeline, FileStatus[] fileStatuses) { if (tableView == null) { - tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); + tableView = new HoodieTableFileSystemView(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline, fileStatuses); } else { - tableView.init(metaClient, visibleActiveTimeline, fileStatuses); + tableView.init(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline, fileStatuses); } return tableView; } @@ -657,7 +657,7 @@ private void runFullValidation(HoodieMetadataConfig metadataConfig, // Metadata table should automatically compact and clean // versions are +1 as autoClean / compaction happens end of commits int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1; - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline(), metadataMetaClient.getActiveTimeline()); metadataTablePartitions.forEach(partition -> { MetadataPartitionType partitionType = partitionTypeMap.get(partition); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index f2e630eaa36d1..daf6931b247c9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -202,7 +202,7 @@ public static List getLatestBaseFiles(String basePath, FileSyste HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); for (String path : paths) { BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, - metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); + metaClient.getCommitsTimeline().filterCompletedInstants(), metaClient.getCommitsTimeline(), fs.globStatus(new Path(path))); latestFiles.addAll(fileSystemView.getLatestBaseFiles().collect(Collectors.toList())); } } catch (Exception e) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 0b4fc38dfed9e..5e3d93600ee6c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -265,11 +265,11 @@ protected Stream insertRecordsToMORTable(HoodieTableMetaClient m FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); TableFileSystemView.BaseFileOnlyView roView = - getHoodieTableFileSystemView(reloadedMetaClient, reloadedMetaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + getHoodieTableFileSystemView(reloadedMetaClient, reloadedMetaClient.getCommitTimeline().filterCompletedInstants(), reloadedMetaClient.getCommitsTimeline(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - roView = getHoodieTableFileSystemView(reloadedMetaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + roView = getHoodieTableFileSystemView(reloadedMetaClient, hoodieTable.getCompletedCommitsTimeline(), reloadedMetaClient.getCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); return dataFilesToRead; } diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 681b05f2508c0..c3afd814a1bf5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -245,7 +245,7 @@ private Map> loadFileSlicesForPartitions(List latestInstant = activeTimeline.lastInstant(); HoodieTableFileSystemView fileSystemView = - new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles); + new HoodieTableFileSystemView(metaClient, activeTimeline, metaClient.getCommitsAndCompactionTimeline(), allFiles); Option queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java index 9b5e8c1dd6f02..9c4e5f00aca58 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java @@ -54,7 +54,12 @@ public static Comparator getReverseCommitTimeComparator() { /** * Timeline, based on which all getter work. */ - private final HoodieTimeline timeline; + private final HoodieTimeline completedTimeline; + + /** + * Timeline, based on which all getter work. + */ + private final HoodieTimeline writeTimeline; /** * The last completed instant, that acts as a high watermark for all getters. @@ -62,21 +67,23 @@ public static Comparator getReverseCommitTimeComparator() { private final Option lastInstant; public HoodieFileGroup(HoodieFileGroup fileGroup) { - this.timeline = fileGroup.timeline; + this.completedTimeline = fileGroup.completedTimeline; + this.writeTimeline = fileGroup.writeTimeline; this.fileGroupId = fileGroup.fileGroupId; this.fileSlices = new TreeMap<>(fileGroup.fileSlices); this.lastInstant = fileGroup.lastInstant; } - public HoodieFileGroup(String partitionPath, String id, HoodieTimeline timeline) { - this(new HoodieFileGroupId(partitionPath, id), timeline); + public HoodieFileGroup(String partitionPath, String id, HoodieTimeline completedTimeline, HoodieTimeline writeTimeline) { + this(new HoodieFileGroupId(partitionPath, id), completedTimeline, writeTimeline); } - public HoodieFileGroup(HoodieFileGroupId fileGroupId, HoodieTimeline timeline) { + public HoodieFileGroup(HoodieFileGroupId fileGroupId, HoodieTimeline completedTimeline, HoodieTimeline writeTimeline) { this.fileGroupId = fileGroupId; this.fileSlices = new TreeMap<>(HoodieFileGroup.getReverseCommitTimeComparator()); - this.timeline = timeline; - this.lastInstant = timeline.lastInstant(); + this.completedTimeline = completedTimeline; + this.writeTimeline = writeTimeline; + this.lastInstant = completedTimeline.lastInstant(); } /** @@ -125,8 +132,16 @@ private boolean isFileSliceCommitted(FileSlice slice) { if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) { return false; } - - return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()); + // if its part of completed timeline, return true. + if (completedTimeline.containsInstant(slice.getBaseInstantTime())) { + return true; + } + // if its part of inflight, return false + if (writeTimeline.filterInflightsAndRequested().containsInstant(slice.getBaseInstantTime())) { + return false; + } + // else, if its part of archived, return true + return completedTimeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()); } /** @@ -147,7 +162,7 @@ public Option getLatestFileSlicesIncludingInflight() { * Provides a stream of committed file slices, sorted reverse base commit time. */ public Stream getAllFileSlices() { - if (!timeline.empty()) { + if (!completedTimeline.empty()) { return fileSlices.values().stream().filter(this::isFileSliceCommitted); } return Stream.empty(); @@ -223,7 +238,11 @@ public Stream getAllRawFileSlices() { return fileSlices.values().stream(); } - public HoodieTimeline getTimeline() { - return timeline; + public HoodieTimeline getCompletedTimeline() { + return completedTimeline; + } + + public HoodieTimeline getWriteTimeline() { + return writeTimeline; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java index daf475623d18c..5f743a11a333a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java @@ -191,6 +191,7 @@ private HoodieTableFileSystemView initFSView() { return new HoodieTableFileSystemView( metaClient, metaClient.getCommitsTimeline().filterCompletedInstants(), + metaClient.getCommitsTimeline(), touchedFiles.toArray(new FileStatus[0]) ); } catch (Exception e) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java index dfbd40126c07a..8ae62175c3149 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java @@ -50,13 +50,13 @@ public static FileGroupDTO fromFileGroup(HoodieFileGroup fileGroup) { dto.partition = fileGroup.getPartitionPath(); dto.id = fileGroup.getFileGroupId().getFileId(); dto.slices = fileGroup.getAllRawFileSlices().map(FileSliceDTO::fromFileSlice).collect(Collectors.toList()); - dto.timeline = TimelineDTO.fromTimeline(fileGroup.getTimeline()); + dto.timeline = TimelineDTO.fromTimeline(fileGroup.getCompletedTimeline()); return dto; } public static HoodieFileGroup toFileGroup(FileGroupDTO dto, HoodieTableMetaClient metaClient) { HoodieFileGroup fileGroup = - new HoodieFileGroup(dto.partition, dto.id, TimelineDTO.toTimeline(dto.timeline, metaClient)); + new HoodieFileGroup(dto.partition, dto.id, TimelineDTO.toTimeline(dto.timeline, metaClient), metaClient.getActiveTimeline().getWriteTimeline()); dto.slices.stream().map(FileSliceDTO::toFileSlice).forEach(fileSlice -> fileGroup.addFileSlice(fileSlice)); return fileGroup; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index de1b39dccf99b..fa92cff60e618 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -84,7 +84,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV // This is the commits timeline that will be visible for all views extending this view // This is nothing but the write timeline, which contains both ingestion and compaction(major and minor) writers. - private HoodieTimeline visibleCommitsAndCompactionTimeline; + private HoodieTimeline visibleCompletedWriteTimeline; + private HoodieTimeline visibleWriteTimeline; // Used to concurrently load and populate partition views private final ConcurrentHashMap addedPartitions = new ConcurrentHashMap<>(4096); @@ -104,10 +105,10 @@ private String getPartitionPathFor(HoodieBaseFile baseFile) { /** * Initialize the view. */ - protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { + protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, HoodieTimeline visibleWriteTimeline) { this.metaClient = metaClient; - refreshTimeline(visibleActiveTimeline); - resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline); + refreshTimeline(visibleCompletedWriteTimeline, visibleWriteTimeline); + resetFileGroupsReplaced(visibleCompletedWriteTimeline); this.bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient); // Load Pending Compaction Operations resetPendingCompactionOperations(CompactionUtils.getAllPendingCompactionOperations(metaClient).values().stream() @@ -123,10 +124,12 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi /** * Refresh commits timeline. * - * @param visibleActiveTimeline Visible Active Timeline + * @param visibleCompletedWriteTimeline Visible Active completed Timeline. + * @param visibleWriteTimeline visible active write timeline. */ - protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { - this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getWriteTimeline(); + protected void refreshTimeline(HoodieTimeline visibleCompletedWriteTimeline, HoodieTimeline visibleWriteTimeline) { + this.visibleCompletedWriteTimeline = visibleCompletedWriteTimeline.getWriteTimeline(); + this.visibleWriteTimeline = visibleWriteTimeline; } /** @@ -134,7 +137,7 @@ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { */ public List addFilesToView(FileStatus[] statuses) { HoodieTimer timer = HoodieTimer.start(); - List fileGroups = buildFileGroups(statuses, visibleCommitsAndCompactionTimeline, true); + List fileGroups = buildFileGroups(statuses, visibleCompletedWriteTimeline, visibleWriteTimeline, true); long fgBuildTimeTakenMs = timer.endTimer(); timer.startTimer(); // Group by partition for efficient updates for both InMemory and DiskBased stuctures. @@ -163,14 +166,14 @@ public List addFilesToView(FileStatus[] statuses) { /** * Build FileGroups from passed in file-status. */ - protected List buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline, + protected List buildFileGroups(FileStatus[] statuses, HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, boolean addPendingCompactionFileSlice) { - return buildFileGroups(convertFileStatusesToBaseFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline, + return buildFileGroups(convertFileStatusesToBaseFiles(statuses), convertFileStatusesToLogFiles(statuses), completedTimeline, writeTimeline, addPendingCompactionFileSlice); } protected List buildFileGroups(Stream baseFileStream, - Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) { + Stream logFileStream, HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, boolean addPendingCompactionFileSlice) { Map, List> baseFiles = baseFileStream.collect(Collectors.groupingBy(baseFile -> { String partitionPathStr = getPartitionPathFor(baseFile); @@ -190,7 +193,7 @@ protected List buildFileGroups(Stream baseFileS fileIdSet.forEach(pair -> { String fileId = pair.getValue(); String partitionPath = pair.getKey(); - HoodieFileGroup group = new HoodieFileGroup(partitionPath, fileId, timeline); + HoodieFileGroup group = new HoodieFileGroup(partitionPath, fileId, completedTimeline, writeTimeline); if (baseFiles.containsKey(pair)) { baseFiles.get(pair).forEach(group::addBaseFile); } @@ -269,7 +272,7 @@ public void reset() { writeLock.lock(); clear(); // Initialize with new Hoodie timeline. - init(metaClient, getTimeline()); + init(metaClient, getTimeline(), getWriteTimeline()); } finally { writeLock.unlock(); } @@ -612,7 +615,7 @@ public final Stream getAllBaseFiles(String partitionStr) { ensurePartitionLoadedCorrectly(partitionPath); return fetchAllBaseFiles(partitionPath) .filter(df -> !isFileGroupReplaced(partitionPath, df.getFileId())) - .filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime())) + .filter(df -> visibleCompletedWriteTimeline.containsOrBeforeTimelineStarts(df.getCommitTime())) .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)) .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df)); } finally { @@ -1251,16 +1254,22 @@ public Option getLastInstant() { @Override public HoodieTimeline getTimeline() { - return visibleCommitsAndCompactionTimeline; + return visibleCompletedWriteTimeline; + } + + public HoodieTimeline getWriteTimeline() { + return visibleWriteTimeline; } @Override public void sync() { - HoodieTimeline oldTimeline = getTimeline(); - HoodieTimeline newTimeline = metaClient.reloadActiveTimeline().filterCompletedOrMajorOrMinorCompactionInstants(); + HoodieTimeline oldTimeline = getWriteTimeline(); + HoodieTimeline newTimeline = metaClient.reloadActiveTimeline().getWriteTimeline(); + HoodieTimeline oldCompletedTimeline = getTimeline(); + HoodieTimeline newCompletedTimeline = newTimeline.filterCompletedOrMajorOrMinorCompactionInstants(); try { writeLock.lock(); - runSync(oldTimeline, newTimeline); + runSync(oldCompletedTimeline, newCompletedTimeline, oldTimeline, newTimeline); } finally { writeLock.unlock(); } @@ -1270,14 +1279,15 @@ public void sync() { * Performs complete reset of file-system view. Subsequent partition view calls will load file slices against latest * timeline * - * @param oldTimeline Old Hoodie Timeline - * @param newTimeline New Hoodie Timeline + * @param oldCompletedTimeline Old Hoodie Timeline + * @param newCompletedTimeline New Hoodie Timeline */ - protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { - refreshTimeline(newTimeline); + protected void runSync(HoodieTimeline oldCompletedTimeline, HoodieTimeline newCompletedTimeline, + HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { + refreshTimeline(newCompletedTimeline, newTimeline); clear(); // Initialize with new Hoodie timeline. - init(metaClient, newTimeline); + init(metaClient, newCompletedTimeline, newTimeline); } /** @@ -1285,7 +1295,7 @@ protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { * * @return {@code HoodieTimeline} */ - public HoodieTimeline getVisibleCommitsAndCompactionTimeline() { - return visibleCommitsAndCompactionTimeline; + public HoodieTimeline getVisibleCompletedWriteTimeline() { + return visibleCompletedWriteTimeline; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 48023d50463d2..31808cd1fe0d1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -137,8 +137,8 @@ public void close() { */ private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(SerializableConfiguration conf, FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) { - HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - return new RocksDbBasedFileSystemView(metaClient, timeline, viewConf); + return new RocksDbBasedFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), + metaClient.getActiveTimeline().getWriteTimeline(), viewConf); } /** @@ -152,8 +152,8 @@ private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(Seria private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf, FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient, HoodieCommonConfig commonConfig) { LOG.info("Creating SpillableMap based view for basePath " + metaClient.getBasePath()); - HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf, commonConfig); + return new SpillableMapBasedFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), + metaClient.getActiveTimeline().getWriteTimeline(), viewConf, commonConfig); } /** @@ -166,7 +166,7 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMeta HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); if (metadataConfig.enabled()) { ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view"); - return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), + return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), metaClient.getActiveTimeline().getWriteTimeline(), metadataSupplier.get()); } if (metaClient.getMetastoreConfig().enableMetastore()) { @@ -174,31 +174,31 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMeta new Class[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetastoreConfig.class}, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig()); } - return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled()); + return new HoodieTableFileSystemView(metaClient, timeline, metaClient.getActiveTimeline().getWriteTimeline(), viewConf.isIncrementalTimelineSyncEnabled()); } public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, HoodieMetadataConfig metadataConfig) { return createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, metadataConfig, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); - + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getActiveTimeline().getCommitsTimeline()); } public static HoodieTableFileSystemView createInMemoryFileSystemViewWithTimeline(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, HoodieMetadataConfig metadataConfig, - HoodieTimeline timeline) { + HoodieTimeline completedWriteTimeline, + HoodieTimeline writeTimeline) { LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath()); if (metadataConfig.enabled()) { - return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig); + return new HoodieMetadataFileSystemView(engineContext, metaClient, completedWriteTimeline, writeTimeline, metadataConfig); } if (metaClient.getMetastoreConfig().enableMetastore()) { return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS, new Class[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetadataConfig.class}, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig()); } - return new HoodieTableFileSystemView(metaClient, timeline); + return new HoodieTableFileSystemView(metaClient, completedWriteTimeline, writeTimeline); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index ea72f305b0738..a857329aaa972 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -94,28 +94,28 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem /** * Create a file system view, as of the given timeline. */ - public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { - this(metaClient, visibleActiveTimeline, false); + public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, HoodieTimeline visibleWriteTimeline) { + this(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline, false); } /** * Create a file system view, as of the given timeline. */ - public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, + public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, HoodieTimeline visibleWriteTimeline, boolean enableIncrementalTimelineSync) { super(enableIncrementalTimelineSync); - init(metaClient, visibleActiveTimeline); + init(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline); } @Override - public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { + public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, HoodieTimeline visibleWriteTimeline) { this.partitionToFileGroupsMap = createPartitionToFileGroups(); - super.init(metaClient, visibleActiveTimeline); + super.init(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline); } - public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, + public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, HoodieTimeline visibleWriteTimeline, FileStatus[] fileStatuses) { - init(metaClient, visibleActiveTimeline); + init(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline); addFilesToView(fileStatuses); } @@ -171,9 +171,9 @@ protected Map createFileIdToPendingClusteringM /** * Create a file system view, as of the given timeline, with the provided file statuses. */ - public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, + public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedTimeline, HoodieTimeline visibleTimeline, FileStatus[] fileStatuses) { - this(metaClient, visibleActiveTimeline); + this(metaClient, visibleCompletedTimeline, visibleTimeline); addFilesToView(fileStatuses); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java index ea49cfb54a82b..30ddf51633793 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java @@ -65,29 +65,31 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl private final boolean incrementalTimelineSyncEnabled; // This is the visible active timeline used only for incremental view syncing - private HoodieTimeline visibleActiveTimeline; + private HoodieTimeline visibleCompletedWriteTimeline; + private HoodieTimeline visibleWriteTimeline; protected IncrementalTimelineSyncFileSystemView(boolean enableIncrementalTimelineSync) { this.incrementalTimelineSyncEnabled = enableIncrementalTimelineSync; } @Override - protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { - this.visibleActiveTimeline = visibleActiveTimeline; - super.refreshTimeline(visibleActiveTimeline); + protected void refreshTimeline(HoodieTimeline visibleCompletedWriteTimeline, HoodieTimeline visibleWriteTimeline) { + this.visibleCompletedWriteTimeline = visibleCompletedWriteTimeline; + this.visibleWriteTimeline = visibleWriteTimeline; + super.refreshTimeline(visibleCompletedWriteTimeline, visibleWriteTimeline); } @Override - protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { + protected void runSync(HoodieTimeline oldCompletedTimeline, HoodieTimeline newCompletedTimeline, HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { try { if (incrementalTimelineSyncEnabled) { - TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldTimeline, newTimeline); + TimelineDiffResult diffResult = TimelineDiffHelper.getNewInstantsForIncrementalSync(oldCompletedTimeline, newCompletedTimeline); if (diffResult.canSyncIncrementally()) { LOG.info("Doing incremental sync"); - runIncrementalSync(newTimeline, diffResult); + runIncrementalSync(newCompletedTimeline, newTimeline, diffResult); LOG.info("Finished incremental sync"); // Reset timeline to latest - refreshTimeline(newTimeline); + refreshTimeline(newCompletedTimeline, newTimeline); return; } } @@ -95,23 +97,23 @@ protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { LOG.error("Got exception trying to perform incremental sync. Reverting to complete sync", ioe); } - super.runSync(oldTimeline, newTimeline); + super.runSync(oldCompletedTimeline, newCompletedTimeline, oldTimeline, newTimeline); } /** * Run incremental sync based on the diff result produced. * - * @param timeline New Timeline + * @param completedTimeline New Timeline * @param diffResult Timeline Diff Result */ - private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) { + private void runIncrementalSync(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, TimelineDiffResult diffResult) { LOG.info("Timeline Diff Result is :" + diffResult); // First remove pending compaction instants which were completed diffResult.getFinishedCompactionInstants().stream().forEach(instant -> { try { - removePendingCompactionInstant(timeline, instant); + removePendingCompactionInstant(completedTimeline, instant); } catch (IOException e) { throw new HoodieException(e); } @@ -120,7 +122,7 @@ private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diff // Now remove pending log compaction instants which were completed or removed diffResult.getFinishedOrRemovedLogCompactionInstants().stream().forEach(instant -> { try { - removePendingLogCompactionInstant(timeline, instant); + removePendingLogCompactionInstant(completedTimeline, instant); } catch (IOException e) { throw new HoodieException(e); } @@ -135,19 +137,19 @@ private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diff try { if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) || instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { - addCommitInstant(timeline, instant); + addCommitInstant(completedTimeline, writeTimeline, instant); } else if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) { - addRestoreInstant(timeline, instant); + addRestoreInstant(completedTimeline, writeTimeline, instant); } else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) { - addCleanInstant(timeline, instant); + addCleanInstant(completedTimeline, writeTimeline, instant); } else if (instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) { - addPendingCompactionInstant(timeline, instant); + addPendingCompactionInstant(completedTimeline, writeTimeline, instant); } else if (instant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION)) { addPendingLogCompactionInstant(instant); } else if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) { - addRollbackInstant(timeline, instant); + addRollbackInstant(completedTimeline, writeTimeline, instant); } else if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - addReplaceInstant(timeline, instant); + addReplaceInstant(completedTimeline, writeTimeline, instant); } } catch (IOException ioe) { throw new HoodieException(ioe); @@ -187,10 +189,10 @@ private void removePendingLogCompactionInstant(HoodieTimeline timeline, HoodieIn /** * Add newly found compaction instant. * - * @param timeline Hoodie Timeline + * @param completedTimeline Hoodie Timeline * @param instant Compaction Instant */ - private void addPendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException { + private void addPendingCompactionInstant(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, HoodieInstant instant) throws IOException { LOG.info("Syncing pending compaction instant (" + instant + ")"); HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp()); List> pendingOps = @@ -203,7 +205,7 @@ private void addPendingCompactionInstant(HoodieTimeline timeline, HoodieInstant Map>> partitionToFileGroups = pendingOps.stream().map(opPair -> { String compactionInstantTime = opPair.getKey(); - HoodieFileGroup fileGroup = new HoodieFileGroup(opPair.getValue().getFileGroupId(), timeline); + HoodieFileGroup fileGroup = new HoodieFileGroup(opPair.getValue().getFileGroupId(), completedTimeline, writeTimeline); fileGroup.addNewFileSliceAtInstant(compactionInstantTime); return Pair.of(compactionInstantTime, fileGroup); }).collect(Collectors.groupingBy(x -> x.getValue().getPartitionPath())); @@ -236,19 +238,20 @@ private void addPendingLogCompactionInstant(HoodieInstant instant) throws IOExce /** * Add newly found commit/delta-commit instant. * - * @param timeline Hoodie Timeline + * @param completedTimeline Hoodie Timeline * @param instant Instant */ - private void addCommitInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException { + private void addCommitInstant(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, HoodieInstant instant) throws IOException { LOG.info("Syncing committed instant (" + instant + ")"); HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - updatePartitionWriteFileGroups(commitMetadata.getPartitionToWriteStats(), timeline, instant); + HoodieCommitMetadata.fromBytes(completedTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + updatePartitionWriteFileGroups(commitMetadata.getPartitionToWriteStats(), completedTimeline, writeTimeline, instant); LOG.info("Done Syncing committed instant (" + instant + ")"); } private void updatePartitionWriteFileGroups(Map> partitionToWriteStats, - HoodieTimeline timeline, + HoodieTimeline completedTimeline, + HoodieTimeline writeTimeline, HoodieInstant instant) { partitionToWriteStats.entrySet().stream().forEach(entry -> { String partition = entry.getKey(); @@ -260,7 +263,7 @@ private void updatePartitionWriteFileGroups(Map> p return status; }).toArray(FileStatus[]::new); List fileGroups = - buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false); + buildFileGroups(statuses, completedTimeline.filterCompletedAndCompactionInstants(), writeTimeline, false); applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.ADD); } else { LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded"); @@ -272,13 +275,13 @@ private void updatePartitionWriteFileGroups(Map> p /** * Add newly found restore instant. * - * @param timeline Hoodie Timeline + * @param completedTimeline Hoodie Timeline * @param instant Restore Instant */ - private void addRestoreInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException { + private void addRestoreInstant(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, HoodieInstant instant) throws IOException { LOG.info("Syncing restore instant (" + instant + ")"); HoodieRestoreMetadata metadata = - TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class); + TimelineMetadataUtils.deserializeAvroMetadata(completedTimeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class); Map>> partitionFiles = metadata.getHoodieRestoreMetadata().entrySet().stream().flatMap(entry -> { @@ -287,7 +290,7 @@ private void addRestoreInstant(HoodieTimeline timeline, HoodieInstant instant) t })); }).collect(Collectors.groupingBy(Pair::getKey)); partitionFiles.entrySet().stream().forEach(e -> { - removeFileSlicesForPartition(timeline, instant, e.getKey(), + removeFileSlicesForPartition(completedTimeline, writeTimeline, instant, e.getKey(), e.getValue().stream().map(x -> x.getValue()).collect(Collectors.toList())); }); @@ -303,16 +306,16 @@ private void addRestoreInstant(HoodieTimeline timeline, HoodieInstant instant) t /** * Add newly found rollback instant. * - * @param timeline Hoodie Timeline + * @param completedTimeline Hoodie Timeline * @param instant Rollback Instant */ - private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException { + private void addRollbackInstant(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, HoodieInstant instant) throws IOException { LOG.info("Syncing rollback instant (" + instant + ")"); HoodieRollbackMetadata metadata = - TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class); + TimelineMetadataUtils.deserializeAvroMetadata(completedTimeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class); metadata.getPartitionMetadata().entrySet().stream().forEach(e -> { - removeFileSlicesForPartition(timeline, instant, e.getKey(), e.getValue().getSuccessDeleteFiles()); + removeFileSlicesForPartition(completedTimeline, writeTimeline, instant, e.getKey(), e.getValue().getSuccessDeleteFiles()); }); LOG.info("Done Syncing rollback instant (" + instant + ")"); } @@ -320,14 +323,14 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant) /** * Add newly found REPLACE instant. * - * @param timeline Hoodie Timeline + * @param completedTimeline Hoodie Timeline * @param instant REPLACE Instant */ - private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException { + private void addReplaceInstant(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, HoodieInstant instant) throws IOException { LOG.info("Syncing replace instant (" + instant + ")"); HoodieReplaceCommitMetadata replaceMetadata = - HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); - updatePartitionWriteFileGroups(replaceMetadata.getPartitionToWriteStats(), timeline, instant); + HoodieReplaceCommitMetadata.fromBytes(completedTimeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + updatePartitionWriteFileGroups(replaceMetadata.getPartitionToWriteStats(), completedTimeline, writeTimeline, instant); replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().forEach(entry -> { String partition = entry.getKey(); Map replacedFileIds = entry.getValue().stream() @@ -343,10 +346,10 @@ private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) t * Add newly found clean instant. Note that cleaner metadata (.clean.completed) * contains only relative paths unlike clean plans (.clean.requested) which contains absolute paths. * - * @param timeline Timeline + * @param completedTimeline Timeline * @param instant Clean instant */ - private void addCleanInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException { + private void addCleanInstant(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, HoodieInstant instant) throws IOException { LOG.info("Syncing cleaner instant (" + instant + ")"); HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, instant); cleanMetadata.getPartitionMetadata().entrySet().stream().forEach(entry -> { @@ -356,12 +359,12 @@ private void addCleanInstant(HoodieTimeline timeline, HoodieInstant instant) thr .stream().map(fileName -> new Path(FSUtils .getPartitionPath(basePath, partitionPath), fileName).toString()) .collect(Collectors.toList()); - removeFileSlicesForPartition(timeline, instant, entry.getKey(), fullPathList); + removeFileSlicesForPartition(completedTimeline, writeTimeline, instant, entry.getKey(), fullPathList); }); LOG.info("Done Syncing cleaner instant (" + instant + ")"); } - private void removeFileSlicesForPartition(HoodieTimeline timeline, HoodieInstant instant, String partition, + private void removeFileSlicesForPartition(HoodieTimeline completedTimeline, HoodieTimeline writeTimeline, HoodieInstant instant, String partition, List paths) { if (isPartitionAvailableInStore(partition)) { LOG.info("Removing file slices for partition (" + partition + ") for instant (" + instant + ")"); @@ -371,7 +374,7 @@ private void removeFileSlicesForPartition(HoodieTimeline timeline, HoodieInstant return status; }).toArray(FileStatus[]::new); List fileGroups = - buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false); + buildFileGroups(statuses, completedTimeline.filterCompletedAndCompactionInstants(), writeTimeline, false); applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.REMOVE); } else { LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded"); @@ -438,14 +441,15 @@ protected void applyDeltaFileSlicesToPartitionView(String partition, List getFileGroups(Stream sliceStream) { return sliceStream.map(s -> Pair.of(Pair.of(s.getPartitionPath(), s.getFileId()), s)) .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream().map(slicePair -> { HoodieFileGroup fg = new HoodieFileGroup(slicePair.getKey().getKey(), slicePair.getKey().getValue(), - getVisibleCommitsAndCompactionTimeline()); + getVisibleCompletedWriteTimeline(), getWriteTimeline()); slicePair.getValue().forEach(e -> fg.addFileSlice(e.getValue())); return fg; }); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index 3bfa8dd04bd07..cb8b808661081 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -59,8 +59,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { private final ExternalSpillableMap.DiskMapType diskMapType; private final boolean isBitCaskDiskMapCompressionEnabled; - public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, - FileSystemViewStorageConfig config, HoodieCommonConfig commonConfig) { + public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, + HoodieTimeline visibleWriteTimeline, FileSystemViewStorageConfig config, HoodieCommonConfig commonConfig) { super(config.isIncrementalTimelineSyncEnabled()); this.maxMemoryForFileGroupMap = config.getMaxMemoryForFileGroupMap(); this.maxMemoryForPendingCompaction = config.getMaxMemoryForPendingCompaction(); @@ -71,12 +71,13 @@ public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieT this.baseStoreDir = config.getSpillableDir(); diskMapType = commonConfig.getSpillableDiskMapType(); isBitCaskDiskMapCompressionEnabled = commonConfig.isBitCaskDiskMapCompressionEnabled(); - init(metaClient, visibleActiveTimeline); + init(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline); } - public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, - FileStatus[] fileStatuses, FileSystemViewStorageConfig config, HoodieCommonConfig commonConfig) { - this(metaClient, visibleActiveTimeline, config, commonConfig); + public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleCompletedWriteTimeline, + HoodieTimeline visibleWriteTimeline, FileStatus[] fileStatuses, FileSystemViewStorageConfig config, + HoodieCommonConfig commonConfig) { + this(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline, config, commonConfig); addFilesToView(fileStatuses); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index ab5b5f6b4db82..4dcbfcd5c7dbe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -39,17 +39,19 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView { private final HoodieTableMetadata tableMetadata; public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, - HoodieTimeline visibleActiveTimeline, + HoodieTimeline visibleCompletedWriteTimeline, + HoodieTimeline visibleWriteTimeline, HoodieTableMetadata tableMetadata) { - super(metaClient, visibleActiveTimeline); + super(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline); this.tableMetadata = tableMetadata; } public HoodieMetadataFileSystemView(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, - HoodieTimeline visibleActiveTimeline, + HoodieTimeline visibleCompletedWriteTimeline, + HoodieTimeline visibleWriteTimeline, HoodieMetadataConfig metadataConfig) { - super(metaClient, visibleActiveTimeline); + super(metaClient, visibleCompletedWriteTimeline, visibleWriteTimeline); this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java index 0bccf02be5342..a03c3396aeee2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java @@ -71,7 +71,7 @@ public HoodieMetadataMetrics(Registry metricsRegistry) { public Map getStats(boolean detailed, HoodieTableMetaClient metaClient, HoodieTableMetadata metadata) { try { metaClient.reloadActiveTimeline(); - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), metaClient.getActiveTimeline()); return getStats(fsView, detailed, metadata); } catch (IOException ioe) { throw new HoodieIOException("Unable to get metadata stats.", ioe); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index af5f2214cc0ff..b428810e04e64 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1034,7 +1034,7 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient HoodieActiveTimeline.createNewInstantTime()); timeline = new HoodieDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline()::getInstantDetails); } - return new HoodieTableFileSystemView(metaClient, timeline); + return new HoodieTableFileSystemView(metaClient, timeline, timeline); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java index a7cdf22f8020f..050974eb7408e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java @@ -44,7 +44,7 @@ public void testCommittedFileSlices() { Stream inflight = Arrays.asList("002").stream(); MockHoodieTimeline activeTimeline = new MockHoodieTimeline(completed, inflight); HoodieFileGroup fileGroup = new HoodieFileGroup("", "data", - activeTimeline.getCommitsTimeline().filterCompletedInstants()); + activeTimeline.getCommitsTimeline().filterCompletedInstants(), activeTimeline.getCommitsTimeline()); for (int i = 0; i < 3; i++) { HoodieBaseFile baseFile = new HoodieBaseFile("data_1_00" + i); fileGroup.addBaseFile(baseFile); @@ -65,7 +65,8 @@ public void testCommittedFileSlicesWithSavepointAndHoles() { new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") // this can be DELTA_COMMIT/REPLACE_COMMIT as well ).collect(Collectors.toList())); - HoodieFileGroup fileGroup = new HoodieFileGroup("", "data", activeTimeline.filterCompletedAndCompactionInstants()); + HoodieFileGroup fileGroup = new HoodieFileGroup("", "data", activeTimeline.filterCompletedAndCompactionInstants(), + activeTimeline.getWriteTimeline()); for (int i = 0; i < 7; i++) { HoodieBaseFile baseFile = new HoodieBaseFile("data_1_0" + i); fileGroup.addBaseFile(baseFile); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java index b297d320c7a6b..ab0e3f2868b4e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java @@ -512,7 +512,7 @@ public void testGetAllFileGroups() { String partitionPath = "/table2"; Stream expected = Collections.singleton( new HoodieFileGroup(partitionPath, "id1", - new MockHoodieTimeline(Stream.empty(), Stream.empty()))).stream(); + new MockHoodieTimeline(Stream.empty(), Stream.empty()), new MockHoodieTimeline(Stream.empty(), Stream.empty()))).stream(); when(primary.getAllFileGroups(partitionPath)).thenReturn(expected); actual = fsView.getAllFileGroups(partitionPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDBBasedIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDBBasedIncrementalFSViewSync.java index 082277e71c96a..b5ffd3589fa8b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDBBasedIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDBBasedIncrementalFSViewSync.java @@ -33,7 +33,7 @@ public class TestRocksDBBasedIncrementalFSViewSync extends TestIncrementalFSView protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline) throws IOException { String subdirPath = Files.createTempDirectory(tempDir, null).toAbsolutePath().toString(); - return new RocksDbBasedFileSystemView(metaClient, timeline, FileSystemViewStorageConfig.newBuilder() + return new RocksDbBasedFileSystemView(metaClient, timeline, timeline.getWriteTimeline(), FileSystemViewStorageConfig.newBuilder() .withRocksDBPath(subdirPath).withIncrementalTimelineSync(true).build()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDbBasedFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDbBasedFileSystemView.java index 7793178d102a9..8546845d06214 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDbBasedFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestRocksDbBasedFileSystemView.java @@ -31,7 +31,7 @@ public class TestRocksDbBasedFileSystemView extends TestHoodieTableFileSystemVie @Override protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) throws IOException { String subdirPath = Files.createTempDirectory(tempDir, null).toAbsolutePath().toString(); - return new RocksDbBasedFileSystemView(metaClient, timeline, + return new RocksDbBasedFileSystemView(metaClient, timeline, timeline.getWriteTimeline(), FileSystemViewStorageConfig.newBuilder().withRocksDBPath(subdirPath).build()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java index 8109249c19de8..f6c2423fbcc47 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java @@ -28,7 +28,7 @@ public class TestSpillableMapBasedFileSystemView extends TestHoodieTableFileSyst @Override protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { - return new SpillableMapBasedFileSystemView(metaClient, timeline, FileSystemViewStorageConfig.newBuilder() + return new SpillableMapBasedFileSystemView(metaClient, timeline, timeline.getWriteTimeline(), FileSystemViewStorageConfig.newBuilder() // pure disk base View .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).withMaxMemoryForView(0L).build(), HoodieCommonConfig.newBuilder().build()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java index c678dd2e48c12..2735520d732ec 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java @@ -29,7 +29,7 @@ public class TestSpillableMapBasedIncrementalFSViewSync extends TestIncrementalF @Override protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline) { - return new SpillableMapBasedFileSystemView(metaClient, timeline, + return new SpillableMapBasedFileSystemView(metaClient, timeline, timeline.getWriteTimeline(), FileSystemViewStorageConfig.newBuilder().withMaxMemoryForView(0L).withIncrementalTimelineSync(true).build(), HoodieCommonConfig.newBuilder().build()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 9a2dba04681a5..b14e3b7d92635 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -444,7 +444,7 @@ public static Map getBaseFileCountsForPaths(String basePath, FileS HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); for (String path : paths) { TableFileSystemView.BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, - metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new org.apache.hadoop.fs.Path(path))); + metaClient.getCommitsTimeline().filterCompletedInstants(), metaClient.getCommitsTimeline(), fs.globStatus(new org.apache.hadoop.fs.Path(path))); toReturn.put(path, fileSystemView.getLatestBaseFiles().count()); } return toReturn; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index f6e871b561c0e..0ce1d8ff0014a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -109,7 +109,7 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) thro } protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline, boolean enableIncrementalTimelineSync) { - return new HoodieTableFileSystemView(metaClient, timeline, enableIncrementalTimelineSync); + return new HoodieTableFileSystemView(metaClient, timeline, timeline, enableIncrementalTimelineSync); } protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient) throws IOException { @@ -124,7 +124,7 @@ protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaCli protected SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) { try { return new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline(), + metaClient.getActiveTimeline(), metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseAndLogFiles() ); } catch (IOException ioe) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 0a10c77c2215f..26dab52e9c70a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -134,8 +134,9 @@ public static Builder builder() { public Result inputSplits( HoodieTableMetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf) { - HoodieTimeline commitTimeline = getReadTimeline(metaClient); - if (commitTimeline.empty()) { + HoodieTimeline completedReadTimeline = getCompletedReadTimeline(metaClient); + HoodieTimeline readTimeline = getReadTimeline(metaClient); + if (completedReadTimeline.empty()) { LOG.warn("No splits found for the table under path " + path); return Result.EMPTY; } @@ -143,8 +144,8 @@ public Result inputSplits( final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT); final String endCommit = this.conf.getString(FlinkOptions.READ_END_COMMIT); final boolean startFromEarliest = FlinkOptions.START_COMMIT_EARLIEST.equalsIgnoreCase(startCommit); - final boolean startOutOfRange = startCommit != null && commitTimeline.isBeforeTimelineStarts(startCommit); - final boolean endOutOfRange = endCommit != null && commitTimeline.isBeforeTimelineStarts(endCommit); + final boolean startOutOfRange = startCommit != null && completedReadTimeline.isBeforeTimelineStarts(startCommit); + final boolean endOutOfRange = endCommit != null && completedReadTimeline.isBeforeTimelineStarts(endCommit); boolean fullTableScan = startFromEarliest || startOutOfRange || endOutOfRange; // Step1: find out the files to read, tries to read the files from the commit metadata first, @@ -155,7 +156,7 @@ public Result inputSplits( // 4. the end commit is archived Set readPartitions; final FileStatus[] fileStatuses; - List instants = filterInstantsWithRange(commitTimeline, null); + List instants = filterInstantsWithRange(completedReadTimeline, null); if (fullTableScan) { // scans the partitions and files directly. FileIndex fileIndex = getFileIndex(); @@ -172,7 +173,7 @@ public Result inputSplits( } String tableName = conf.getString(FlinkOptions.TABLE_NAME); List metadataList = instants.stream() - .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, completedReadTimeline)).collect(Collectors.toList()); readPartitions = getReadPartitions(metadataList); if (readPartitions.size() == 0) { LOG.warn("No partitions found for reading in user provided path."); @@ -223,10 +224,10 @@ public Result inputSplits( // Step3: decides the read end commit final String endInstant = fullTableScan - ? commitTimeline.lastInstant().get().getTimestamp() + ? completedReadTimeline.lastInstant().get().getTimestamp() : instants.get(instants.size() - 1).getTimestamp(); - List inputSplits = getInputSplits(metaClient, commitTimeline, + List inputSplits = getInputSplits(metaClient, completedReadTimeline, readTimeline, fileStatuses, readPartitions, endInstant, instantRange, false); return Result.instance(inputSplits, endInstant); @@ -246,12 +247,13 @@ public Result inputSplits( String issuedInstant, boolean cdcEnabled) { metaClient.reloadActiveTimeline(); - HoodieTimeline commitTimeline = getReadTimeline(metaClient); - if (commitTimeline.empty()) { + HoodieTimeline completedReadTimeline = getCompletedReadTimeline(metaClient); + HoodieTimeline readTimeline = getReadTimeline(metaClient); + if (completedReadTimeline.empty()) { LOG.warn("No splits found for the table under path " + path); return Result.EMPTY; } - List instants = filterInstantsWithRange(commitTimeline, issuedInstant); + List instants = filterInstantsWithRange(completedReadTimeline, issuedInstant); // get the latest instant that satisfies condition final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); final InstantRange instantRange; @@ -283,7 +285,7 @@ public Result inputSplits( } final String endInstant = instantToIssue.getTimestamp(); - List inputSplits = getInputSplits(metaClient, commitTimeline, + List inputSplits = getInputSplits(metaClient, completedReadTimeline, readTimeline, fileStatuses, readPartitions, endInstant, null, false); return Result.instance(inputSplits, endInstant); @@ -311,8 +313,8 @@ public Result inputSplits( // case2: normal streaming read String tableName = conf.getString(FlinkOptions.TABLE_NAME); List activeMetadataList = instants.stream() - .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); - List archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName); + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, completedReadTimeline)).collect(Collectors.toList()); + List archivedMetadataList = getArchivedMetadata(metaClient, instantRange, completedReadTimeline, tableName); if (archivedMetadataList.size() > 0) { LOG.warn("\n" + "--------------------------------------------------------------------------------\n" @@ -338,7 +340,7 @@ public Result inputSplits( } final String endInstant = instantToIssue.getTimestamp(); - List inputSplits = getInputSplits(metaClient, commitTimeline, + List inputSplits = getInputSplits(metaClient, completedReadTimeline, fileStatuses, readPartitions, endInstant, instantRange, skipCompaction); return Result.instance(inputSplits, endInstant); @@ -370,12 +372,13 @@ private InstantRange getInstantRange(String issuedInstant, String instantToIssue private List getInputSplits( HoodieTableMetaClient metaClient, HoodieTimeline commitTimeline, + HoodieTimeline writeTimeline, FileStatus[] fileStatuses, Set readPartitions, String endInstant, InstantRange instantRange, boolean skipBaseFiles) { - final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses); + final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, writeTimeline, fileStatuses); final AtomicInteger cnt = new AtomicInteger(0); final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); return readPartitions.stream() @@ -457,11 +460,16 @@ private List getArchivedMetadata( return Collections.emptyList(); } - private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) { + private HoodieTimeline getCompletedReadTimeline(HoodieTableMetaClient metaClient) { HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); return filterInstantsByCondition(timeline); } + private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) { + HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline(); + return filterInstantsByCondition(timeline); + } + private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) { HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant); HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 90369889db8e9..c3efbb3821080 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -317,7 +317,7 @@ private List buildFileIndex() { HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, // file-slice after pending compaction-requested instant-time is also considered valid - metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), fileStatuses); + metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), metaClient.getCommitsAndCompactionTimeline(), fileStatuses); String latestCommit = fsView.getLastInstant().get().getTimestamp(); final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); final AtomicInteger cnt = new AtomicInteger(0); @@ -486,7 +486,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( } HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), fileStatuses); + metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), metaClient.getCommitsAndCompactionTimeline(), fileStatuses); Path[] paths = fsView.getLatestBaseFiles() .map(HoodieBaseFile::getFileStatus) .map(FileStatus::getPath).toArray(Path[]::new); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 140e7ff5b6330..6fa55d84157c1 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; @@ -172,22 +173,22 @@ protected List listStatusForIncrementalMode(JobConf job, List inputPaths, String incrementalTable) throws IOException { Job jobContext = Job.getInstance(job); - Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); - if (!timeline.isPresent()) { + Pair, Option> timelines = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timelines.getLeft().isPresent()) { return null; } - Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get()); + Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timelines.getLeft().get()); if (!commitsToCheck.isPresent()) { return null; } - Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths); + Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timelines.getKey().get(), inputPaths); // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. if (!incrementalInputPaths.isPresent()) { return null; } setInputPaths(job, incrementalInputPaths.get()); FileStatus[] fileStatuses = doListStatus(job); - return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); + return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timelines.getLeft().get(), timelines.getRight().get(), fileStatuses, commitsToCheck.get()); } protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option virtualKeyInfoOpt) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index de1fd0055dc27..ea6ef896b5d2a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -191,7 +191,8 @@ public boolean accept(Path path) { // which contains old version files, if not specify this value, these files will be filtered. fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()), - metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key()))); + metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key())), + metaClient.getActiveTimeline().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key()))); } else { fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf())); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 95a1a74b65b91..d0c130f241a10 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; @@ -143,11 +144,11 @@ protected List listStatusForIncrementalMode(JobConf job, Job jobContext = Job.getInstance(job); // step1 - Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); - if (!timeline.isPresent()) { + Pair, Option> timelines = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timelines.getLeft().isPresent()) { return result; } - HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTableName, timeline.get()); + HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTableName, timelines.getLeft().get()); Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants()); if (!commitsToCheck.isPresent()) { return result; @@ -167,7 +168,7 @@ protected List listStatusForIncrementalMode(JobConf job, List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils .listAffectedFilesForCommits(job, new Path(tableMetaClient.getBasePath()), metadataList)); // step3 - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); // build fileGroup from fsView Path basePath = new Path(tableMetaClient.getBasePath()); // filter affectedPartition by inputPaths diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index f9c2c9ca29be8..098a4252b9fd4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; @@ -264,7 +265,7 @@ public static Option getAffectedPartitions(List commitsTo * @param tableMetaClient * @return */ - public static Option getFilteredCommitsTimeline(JobContext job, HoodieTableMetaClient tableMetaClient) { + public static Pair, Option> getFilteredCommitsTimeline(JobContext job, HoodieTableMetaClient tableMetaClient) { String tableName = tableMetaClient.getTableConfig().getTableName(); HoodieDefaultTimeline baseTimeline; if (HoodieHiveUtils.stopAtCompaction(job, tableName)) { @@ -272,7 +273,7 @@ public static Option getFilteredCommitsTimeline(JobContext job, } else { baseTimeline = tableMetaClient.getActiveTimeline(); } - return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants()); + return Pair.of(Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants()), Option.of(baseTimeline.getCommitsTimeline())); } /** @@ -364,14 +365,14 @@ public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws IOExcepti * Filter a list of FileStatus based on commitsToCheck for incremental view. * @param job * @param tableMetaClient - * @param timeline + * @param completedTimeline * @param fileStatuses * @param commitsToCheck * @return */ public static List filterIncrementalFileStatus(Job job, HoodieTableMetaClient tableMetaClient, - HoodieTimeline timeline, FileStatus[] fileStatuses, List commitsToCheck) throws IOException { - TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses); + HoodieTimeline completedTimeline, HoodieTimeline timeline, FileStatus[] fileStatuses, List commitsToCheck) throws IOException { + TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, completedTimeline, timeline, fileStatuses); List commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); List filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList()); List returns = new ArrayList<>(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 9f30db6158f88..360ca2cb656c5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -251,12 +251,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ val mandatoryFields: Seq[String] - protected def timeline: HoodieTimeline = + protected def completedTimeline: HoodieTimeline = // NOTE: We're including compaction here since it's not considering a "commit" operation metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants + protected def writeTimeline: HoodieTimeline = + // NOTE: We're including compaction here since it's not considering a "commit" operation + metaClient.getCommitsAndCompactionTimeline protected def latestInstant: Option[HoodieInstant] = - toScalaOption(timeline.lastInstant()) + toScalaOption(completedTimeline.lastInstant()) protected def queryTimestamp: Option[String] = specifiedQueryTimestamp.orElse(latestInstant.map(_.getTimestamp)) @@ -402,7 +405,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters) - val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val fsView = new HoodieTableFileSystemView(metaClient, completedTimeline, writeTimeline, partitionDirs.flatMap(_.files).toArray) val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus) @@ -416,7 +419,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { latestInstant.map { _ => val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters) - val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val fsView = new HoodieTableFileSystemView(metaClient, completedTimeline, writeTimeline, partitionDirs.flatMap(_.files).toArray) val queryTimestamp = this.queryTimestamp.get fsView.getPartitionPaths.asScala.flatMap { partitionPath => @@ -613,7 +616,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val internalSchema = internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema) val querySchemaString = SerDeHelper.toJson(internalSchema) if (!isNullOrEmpty(querySchemaString)) { - val validCommits = timeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",") + val validCommits = completedTimeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",") conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index 0dd54237ef582..10a13c1c7cf26 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -168,7 +168,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, } val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants, fileStatuses.toArray) + .filterCompletedInstants, metaClient.getActiveTimeline.getCommitsTimeline, fileStatuses.toArray) val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList if (log.isDebugEnabled) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index cefd319780535..284003b99ae0c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -53,11 +53,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") } - override protected def timeline: HoodieTimeline = { + override protected def completedTimeline: HoodieTimeline = { if (fullTableScan) { - super.timeline + super.completedTimeline } else { - super.timeline.findInstantsInRange(startTimestamp, endTimestamp) + super.completedTimeline.findInstantsInRange(startTimestamp, endTimestamp) } } @@ -95,7 +95,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, } else { val latestCommit = includedCommits.last.getTimestamp - val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) + val fsView = new HoodieTableFileSystemView(metaClient, completedTimeline, writeTimeline, affectedFilesInCommits) val modifiedPartitions = getWritePartitionPaths(commitsMetadata) @@ -130,10 +130,10 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { validate() protected def startTimestamp: String = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) - protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp) + protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.completedTimeline.lastInstant().get.getTimestamp) - protected def startInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(startTimestamp) - protected def endInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(endTimestamp) + protected def startInstantArchived: Boolean = super.completedTimeline.isBeforeTimelineStarts(startTimestamp) + protected def endInstantArchived: Boolean = super.completedTimeline.isBeforeTimelineStarts(endTimestamp) // Fallback to full table scan if any of the following conditions matches: // 1. the start commit is archived @@ -150,13 +150,13 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { if (!startInstantArchived || !endInstantArchived) { // If endTimestamp commit is not archived, will filter instants // before endTimestamp. - super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.asScala.toList + super.completedTimeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.asScala.toList } else { - super.timeline.getInstants.asScala.toList + super.completedTimeline.getInstants.asScala.toList } } - protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava + protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.completedTimeline)).asJava protected lazy val affectedFilesInCommits: Array[FileStatus] = { listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) @@ -183,7 +183,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { } protected def validate(): Unit = { - if (super.timeline.empty()) { + if (super.completedTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala index e39d22aa05462..15131a79f31bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala @@ -74,7 +74,7 @@ class DedupeSparkJob(basePath: String, val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath")) - val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), metadata.getActiveTimeline.getCommitTimeline, allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) val filteredStatuses = latestFiles.map(f => f.getPath) LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}") @@ -183,7 +183,7 @@ class DedupeSparkJob(basePath: String, val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath")) - val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), metadata.getActiveTimeline.getCommitTimeline, allFiles) val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala index 6f2aa2c918722..ec6377579ab8c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala @@ -121,7 +121,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit val filteredTimeline = new HoodieDefaultTimeline( new java.util.ArrayList[HoodieInstant](JavaConversions.asJavaCollection(instants.toList)).stream(), details) - new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](0))) + new HoodieTableFileSystemView(metaClient, filteredTimeline, filteredTimeline, statuses.toArray(new Array[FileStatus](0))) } private def showAllFileSlices(fsView: HoodieTableFileSystemView): java.util.List[Row] = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 375abcebfeee5..4a61ca0864d92 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -547,7 +547,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase with ScalaAssertionSuppor private def getFileCountInPartitionPath(partitionPath: String): Int = { metaClient.reloadActiveTimeline() val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants - val fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants) + val fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, metaClient.getActiveTimeline.getCommitsTimeline) fileSystemView.getAllBaseFiles(partitionPath).iterator().asScala.toSeq.length } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 3908801f11c93..1f1dddd45969b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -376,7 +376,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { } private def getLatestFileGroupsFileId(partition: String):Array[String] = { - getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, + getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, metaClient.getActiveTimeline, HoodieTestTable.of(metaClient).listAllBaseFiles()) tableView.getLatestFileSlices(partition) .toArray().map(slice => slice.asInstanceOf[FileSlice].getFileGroupId.getFileId) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala index 52570596e1737..7e227dc128a04 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala @@ -229,6 +229,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { // get fs and check number of latest files val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + metaClient.getActiveTimeline.getCommitTimeline, metaClient.getFs.listStatus(new Path(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files @@ -289,7 +290,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { // get fs and check number of latest files val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, - metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpdates))) + metaClient.getActiveTimeline.getCommitTimeline, metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpdates))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 2 files assertResult(2) { @@ -350,7 +351,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { // get fs and check number of latest files val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, - metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpserts))) + metaClient.getActiveTimeline.getCommitTimeline, metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpserts))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files assertResult(3) { @@ -411,7 +412,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { // get fs and check number of latest files val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, - metaClient.getFs.listStatus(new Path(duplicatedPartitionPath))) + metaClient.getActiveTimeline.getCommitTimeline, metaClient.getFs.listStatus(new Path(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files assertResult(3) { diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java index 3ac238c895ad3..bdc9689526fd3 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java @@ -92,7 +92,7 @@ public static Stream fetchLatestBaseFilesForAllPartitions(HoodieTableMet Configuration hadoopConf = metaClient.getHadoopConf(); HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf); HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getActiveTimeline().getCommitsTimeline(), HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build()); return fsView.getLatestBaseFiles(p).map(HoodieBaseFile::getFileName); }); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index 402b380a00e08..641836c8da67c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -83,7 +83,7 @@ public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDi final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration()); final HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir).build(); final BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata, - tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants()); + tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants(), tableMetadata.getActiveTimeline().getWriteTimeline()); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); // Get the latest commit Option latestCommit = diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index ca0a495e31213..270413cc9201e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -275,7 +275,7 @@ private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) { .setBasePath(cfg.sourceBasePath) .build(); return new HoodieTableFileSystemView(tableMetadata, tableMetadata - .getActiveTimeline().getWriteTimeline().filterCompletedInstants()); + .getActiveTimeline().getWriteTimeline().filterCompletedInstants(), tableMetadata.getActiveTimeline().getWriteTimeline()); } public static void main(String[] args) throws IOException {