Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ private void doRefresh() {

// Reset it to null to trigger re-loading of all partition path
this.cachedAllPartitionPaths = null;
// Reset to force reload file slices inside partitions
this.cachedAllInputFileSlices = new HashMap<>();
if (!shouldListLazily) {
ensurePreloadedPartitions(getAllQueryPartitionPaths());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.engine.EngineType
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
Expand Down Expand Up @@ -240,6 +240,67 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions)
}

@ParameterizedTest
@CsvSource(value = Array("lazy,true", "lazy,false",
"eager,true", "eager,false"))
def testIndexRefreshesFileSlices(listingModeOverride: String,
useMetadataTable: Boolean): Unit = {
def getDistinctCommitTimeFromAllFilesInIndex(files: Seq[PartitionDirectory]): Seq[String] = {
files.flatMap(_.files).map(fileStatus => new HoodieBaseFile(fileStatus.getPath.toString)).map(_.getCommitTime).distinct
}

val r = new Random(0xDEED)
// partition column values are [0, 5)
val tuples = for (i <- 1 to 1000) yield (r.nextString(1000), r.nextInt(5), r.nextString(1000))

val writeOpts = commonOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString)
val _spark = spark
import _spark.implicits._
val inputDF = tuples.toDF("_row_key", "partition", "timestamp")
inputDF
.write
.format("hudi")
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

val readOpts = queryOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString,
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key -> listingModeOverride
)

metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndexFirstWrite = HoodieFileIndex(spark, metaClient, None, readOpts)

val listFilesAfterFirstWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
val distinctListOfCommitTimesAfterFirstWrite = getDistinctCommitTimeFromAllFilesInIndex(listFilesAfterFirstWrite)
val firstWriteCommitTime = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
assertEquals(1, distinctListOfCommitTimesAfterFirstWrite.size, "Should have only one commit")
assertEquals(firstWriteCommitTime, distinctListOfCommitTimesAfterFirstWrite.head, "All files should belong to the first existing commit")

val nextBatch = for (
i <- 0 to 4
) yield(r.nextString(1000), i, r.nextString(1000))

nextBatch.toDF("_row_key", "partition", "timestamp")
.write
.format("hudi")
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)

fileIndexFirstWrite.refresh()
val fileSlicesAfterSecondWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
val distinctListOfCommitTimesAfterSecondWrite = getDistinctCommitTimeFromAllFilesInIndex(fileSlicesAfterSecondWrite)
metaClient = HoodieTableMetaClient.reload(metaClient)
val lastCommitTime = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

assertEquals(1, distinctListOfCommitTimesAfterSecondWrite.size, "All basefiles affected so all have same commit time")
assertEquals(lastCommitTime, distinctListOfCommitTimesAfterSecondWrite.head, "All files should be of second commit after index refresh")
}

@ParameterizedTest
@CsvSource(value = Array("lazy,true,true", "lazy,true,false", "lazy,false,true", "lazy,false,false",
"eager,true,true", "eager,true,false", "eager,false,true", "eager,false,false"))
Expand Down