From 27fd4aa08c38389f5205ce266cdf3f852933d87b Mon Sep 17 00:00:00 2001 From: Rex An Date: Wed, 1 Jun 2022 12:17:19 +0800 Subject: [PATCH 1/5] FIx the error if the user read no base files hudi table by glob paths --- .../view/AbstractTableFileSystemView.java | 14 ++++ .../org/apache/hudi/HoodieBaseRelation.scala | 15 ++++ .../hudi/MergeOnReadSnapshotRelation.scala | 4 +- .../datasources/HoodieInMemoryFileIndex.scala | 71 +++++++++++++++++++ .../hudi/functional/TestMORDataSource.scala | 1 + 5 files changed, 103 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index c6e618ac4769a..a0fea0eca2456 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -470,6 +470,20 @@ public final Stream> getPendingCompactionOpera } } + public final List getPartitionPaths() { + try { + readLock.lock(); + return fetchAllStoredFileGroups() + .filter(fg -> !isFileGroupReplaced(fg)) + .map(HoodieFileGroup::getPartitionPath) + .distinct() + .map(name -> name.isEmpty() ? metaClient.getBasePathV2() : new Path(metaClient.getBasePathV2(), name)) + .collect(Collectors.toList()); + } finally { + readLock.unlock(); + } + } + @Override public final Stream getLatestBaseFiles(String partitionStr) { try { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 08f87816d7c35..52f6982c3d5da 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -340,6 +340,21 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, latestBaseFiles.groupBy(getPartitionPath) } + protected def getSelectedPartitionPaths( + globbedPaths: Seq[Path], + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[Path] = { + val partitionDirs = if (globbedPaths.isEmpty) { + fileIndex.listFiles(partitionFilters, dataFilters) + } else { + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths) + inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + } + + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + fsView.getPartitionPaths.asScala + } + protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = { val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 75bc96624e7b0..e1928ae9b38cd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -105,8 +105,8 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, buildSplits(fileSlices.values.flatten.toSeq) } else { // TODO refactor to avoid iterating over listed files multiple times - val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters) - val partitionPaths = partitions.keys.toSeq + val partitionPaths = getSelectedPartitionPaths(globPaths, convertedPartitionFilters, dataFilters) + if (partitionPaths.isEmpty || latestInstant.isEmpty) { // If this an empty table OR it has no completed commits yet, return List.empty[HoodieMergeOnReadFileSplit] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala index 8ad78af7f3267..6a55f9190c433 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala @@ -23,6 +23,8 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.HoodieHadoopFSUtils import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType @@ -36,6 +38,75 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession, fileStatusCache: FileStatusCache = NoopCache) extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) { + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is unpartitioned, + * this will return a single partition with no partition values + * + * NOTE: This method replicates the one it overrides, however it uses custom method + * that accepts files starting with "." + */ + override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { + PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + } else { + prunePartitions(partitionFilters, partitionSpec()).map { + case PartitionPath(values, path) => + val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { + case Some(existingDir) => + // Directory has children files in it, return them + existingDir.filter(f => isDataPath(f.getPath)) + + case None => + // Directory does not exist, or has no children files + Nil + } + PartitionDirectory(values, files) + } + } + logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) + selectedPartitions + } + + private def isDataPath(path: Path): Boolean = { + val name = path.getName + !(name.startsWith("_") && !name.contains("=")) + } + + private def prunePartitions( + predicates: Seq[Expression], + partitionSpec: PartitionSpec): Seq[PartitionPath] = { + val PartitionSpec(partitionColumns, partitions) = partitionSpec + val partitionColumnNames = partitionColumns.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate.create(predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) + + val selected = partitions.filter { + case PartitionPath(values, _) => boundPredicate.eval(values) + } + logInfo { + val total = partitions.length + val selectedSize = selected.length + val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 + s"Selected $selectedSize partitions out of $total, " + + s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions." + } + + selected + } else { + partitions + } + } + /** * List leaf files of given paths. This method will submit a Spark job to do parallel * listing whenever there is a path having more files than the parallel partition discovery threshold. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index f9f14438933f3..d1edca1ef1f89 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -720,6 +720,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // There should no base file in the file list. assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) // Test read log only mor table. + assertEquals(20, spark.read.format("hudi").load(basePath + "/*/*/*/*").count()) assertEquals(20, spark.read.format("hudi").load(basePath).count()) } From 407a00ddc394e5b5f73b410480b42f05ddd2750a Mon Sep 17 00:00:00 2001 From: Rex An Date: Wed, 1 Jun 2022 15:55:10 +0800 Subject: [PATCH 2/5] Fix compile issue --- .../scala/org/apache/spark/sql/hudi/SparkAdapter.scala | 7 ++++++- .../execution/datasources/HoodieInMemoryFileIndex.scala | 6 ++++-- .../scala/org/apache/spark/sql/adapter/Spark2Adapter.scala | 6 +++++- .../org/apache/spark/sql/adapter/BaseSpark3Adapter.scala | 6 +++++- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index a97743e62fac8..d986443de6d8e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} @@ -180,4 +180,9 @@ trait SparkAdapter extends Serializable { * Create instance of [[ParquetFileFormat]] */ def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] + + /** + * + */ + def createInterpretedPredicate(e: Expression): InterpretedPredicate } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala index 6a55f9190c433..3e541bb09a0c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala @@ -20,6 +20,7 @@ package org.apache.spark.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.hudi.SparkAdapterSupport import org.apache.spark.HoodieHadoopFSUtils import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession @@ -36,7 +37,8 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession, parameters: Map[String, String], userSpecifiedSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) - extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) { + extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) + with SparkAdapterSupport { /** * Returns all valid files grouped into partitions when the data is partitioned. If the data is unpartitioned, @@ -84,7 +86,7 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession, if (partitionPruningPredicates.nonEmpty) { val predicate = partitionPruningPredicates.reduce(expressions.And) - val boundPredicate = InterpretedPredicate.create(predicate.transform { + val boundPredicate = sparkAdapter.createInterpretedPredicate(predicate.transform { case a: AttributeReference => val index = partitionColumns.indexWhere(a.name == _.name) BoundReference(index, partitionColumns(index).dataType, nullable = true) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 0e74c997d7ee4..03268237ddd3b 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -24,7 +24,7 @@ import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark2_4AvroDeserializer, HoodieSpark2_4AvroSerializer, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} @@ -168,4 +168,8 @@ class Spark2Adapter extends SparkAdapter { override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { Some(new Spark24HoodieParquetFileFormat(appendPartitionValues)) } + + override def createInterpretedPredicate(e: Expression): InterpretedPredicate = { + InterpretedPredicate.create(e) + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index e5f4476cc5a98..7a6d70d12863a 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -24,7 +24,7 @@ import org.apache.hudi.spark3.internal.ReflectUtil import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} @@ -149,4 +149,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter { None } } + + override def createInterpretedPredicate(e: Expression): InterpretedPredicate = { + Predicate.createInterpreted(e) + } } From 3baf9d12d91a1375f37f679cd2f9b6205ff349cf Mon Sep 17 00:00:00 2001 From: Rex An Date: Wed, 1 Jun 2022 15:57:01 +0800 Subject: [PATCH 3/5] Add comments --- .../src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index d986443de6d8e..93e206a9b793e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -182,7 +182,7 @@ trait SparkAdapter extends Serializable { def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] /** - * + * Create instance of [[InterpretedPredicate]] */ def createInterpretedPredicate(e: Expression): InterpretedPredicate } From d67e7a8ddac236c3af0724a08c23e493d0d9e969 Mon Sep 17 00:00:00 2001 From: Rex An Date: Mon, 6 Jun 2022 10:37:08 +0800 Subject: [PATCH 4/5] Add comments for test --- .../scala/org/apache/hudi/functional/TestMORDataSource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index d1edca1ef1f89..ccb98deca884d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -719,8 +719,9 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .save(basePath) // There should no base file in the file list. assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) - // Test read log only mor table. + // Test read logs only mor table with glob paths. assertEquals(20, spark.read.format("hudi").load(basePath + "/*/*/*/*").count()) + // Test read log only mor table. assertEquals(20, spark.read.format("hudi").load(basePath).count()) } From c6e3a4e36e60020c4e6601745c252536643180f2 Mon Sep 17 00:00:00 2001 From: Rex An Date: Wed, 8 Jun 2022 15:45:47 +0800 Subject: [PATCH 5/5] Fix review --- .../org/apache/hudi/HoodieBaseRelation.scala | 15 ---------- .../hudi/MergeOnReadSnapshotRelation.scala | 28 ++++++++----------- 2 files changed, 11 insertions(+), 32 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 52f6982c3d5da..08f87816d7c35 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -340,21 +340,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, latestBaseFiles.groupBy(getPartitionPath) } - protected def getSelectedPartitionPaths( - globbedPaths: Seq[Path], - partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): Seq[Path] = { - val partitionDirs = if (globbedPaths.isEmpty) { - fileIndex.listFiles(partitionFilters, dataFilters) - } else { - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths) - inMemoryFileIndex.listFiles(partitionFilters, dataFilters) - } - - val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) - fsView.getPartitionPaths.asScala - } - protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = { val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index e1928ae9b38cd..afa43cee209fe 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -104,14 +104,22 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters) buildSplits(fileSlices.values.flatten.toSeq) } else { - // TODO refactor to avoid iterating over listed files multiple times - val partitionPaths = getSelectedPartitionPaths(globPaths, convertedPartitionFilters, dataFilters) + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths) + val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val partitionPaths = fsView.getPartitionPaths.asScala if (partitionPaths.isEmpty || latestInstant.isEmpty) { // If this an empty table OR it has no completed commits yet, return List.empty[HoodieMergeOnReadFileSplit] } else { - val fileSlices = listFileSlices(partitionPaths) + val queryTimestamp = this.queryTimestamp.get + + val fileSlices = partitionPaths.flatMap { partitionPath => + val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq + } buildSplits(fileSlices) } } @@ -130,20 +138,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles) }.toList } - - private def listFileSlices(partitionPaths: Seq[Path]): Seq[FileSlice] = { - // NOTE: It's critical for us to re-use [[InMemoryFileIndex]] to make sure we're leveraging - // [[FileStatusCache]] and avoid listing the whole table again - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, partitionPaths) - val fsView = new HoodieTableFileSystemView(metaClient, timeline, inMemoryFileIndex.allFiles.toArray) - - val queryTimestamp = this.queryTimestamp.get - - partitionPaths.flatMap { partitionPath => - val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq - } - } } object MergeOnReadSnapshotRelation {