diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 9961e46b50145..59d5e60f78962 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -111,6 +111,8 @@ case class HoodieFileIndex(spark: SparkSession, */ @transient private lazy val functionalIndex = new FunctionalIndexSupport(spark, metadataConfig, metaClient) + private var currentTimelineHash: String = null + override def rootPaths: Seq[Path] = getQueryPaths.asScala /** @@ -153,6 +155,13 @@ case class HoodieFileIndex(spark: SparkSession, * @return list of PartitionDirectory containing partition to base files mapping */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + // Clear the old version's file index cache if there are new commits, to ensure the data from the new commits is visible. + if (currentTimelineHash == null || !currentTimelineHash.equals(metaClient.reloadActiveTimeline().getTimelineHash)) { + logInfo(s"Refreshing HoodieFileIndex for new instant ${metaClient.getActiveTimeline.lastInstant().get().toString}") + currentTimelineHash = metaClient.getActiveTimeline.getTimelineHash + refresh() + logInfo( "Refreshing HoodieFileIndex completed") + } val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, partitionFilters).map { case (partitionOpt, fileSlices) => if (shouldEmbedFileSlices) {