diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index e697f385e0445..824a94abab4bd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -428,6 +428,8 @@ private void doRefresh() { // Reset it to null to trigger re-loading of all partition path this.cachedAllPartitionPaths = null; + // Reset to force reload file slices inside partitions + this.cachedAllInputFileSlices = new HashMap<>(); if (!shouldListLazily) { ensurePreloadedPartitions(getAllQueryPartitionPaths()); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index a88d263e9dc7c..803702addb489 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -29,7 +29,7 @@ import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} import org.apache.hudi.common.engine.EngineType import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} +import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime @@ -240,6 +240,67 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions) } + @ParameterizedTest + @CsvSource(value = Array("lazy,true", "lazy,false", + "eager,true", "eager,false")) + def testIndexRefreshesFileSlices(listingModeOverride: String, + useMetadataTable: Boolean): Unit = { + def getDistinctCommitTimeFromAllFilesInIndex(files: Seq[PartitionDirectory]): Seq[String] = { + files.flatMap(_.files).map(fileStatus => new HoodieBaseFile(fileStatus.getPath.toString)).map(_.getCommitTime).distinct + } + + val r = new Random(0xDEED) + // partition column values are [0, 5) + val tuples = for (i <- 1 to 1000) yield (r.nextString(1000), r.nextInt(5), r.nextString(1000)) + + val writeOpts = commonOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString) + val _spark = spark + import _spark.implicits._ + val inputDF = tuples.toDF("_row_key", "partition", "timestamp") + inputDF + .write + .format("hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + val readOpts = queryOpts ++ Map( + HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString, + DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key -> listingModeOverride + ) + + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndexFirstWrite = HoodieFileIndex(spark, metaClient, None, readOpts) + + val listFilesAfterFirstWrite = fileIndexFirstWrite.listFiles(Nil, Nil) + val distinctListOfCommitTimesAfterFirstWrite = getDistinctCommitTimeFromAllFilesInIndex(listFilesAfterFirstWrite) + val firstWriteCommitTime = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + assertEquals(1, distinctListOfCommitTimesAfterFirstWrite.size, "Should have only one commit") + assertEquals(firstWriteCommitTime, distinctListOfCommitTimesAfterFirstWrite.head, "All files should belong to the first existing commit") + + val nextBatch = for ( + i <- 0 to 4 + ) yield(r.nextString(1000), i, r.nextString(1000)) + + nextBatch.toDF("_row_key", "partition", "timestamp") + .write + .format("hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + fileIndexFirstWrite.refresh() + val fileSlicesAfterSecondWrite = fileIndexFirstWrite.listFiles(Nil, Nil) + val distinctListOfCommitTimesAfterSecondWrite = getDistinctCommitTimeFromAllFilesInIndex(fileSlicesAfterSecondWrite) + metaClient = HoodieTableMetaClient.reload(metaClient) + val lastCommitTime = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + assertEquals(1, distinctListOfCommitTimesAfterSecondWrite.size, "All basefiles affected so all have same commit time") + assertEquals(lastCommitTime, distinctListOfCommitTimesAfterSecondWrite.head, "All files should be of second commit after index refresh") + } + @ParameterizedTest @CsvSource(value = Array("lazy,true,true", "lazy,true,false", "lazy,false,true", "lazy,false,false", "eager,true,true", "eager,true,false", "eager,false,true", "eager,false,false"))