diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index a97743e62fac8..93e206a9b793e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} @@ -180,4 +180,9 @@ trait SparkAdapter extends Serializable { * Create instance of [[ParquetFileFormat]] */ def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] + + /** + * Create instance of [[InterpretedPredicate]] + */ + def createInterpretedPredicate(e: Expression): InterpretedPredicate } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index c6e618ac4769a..a0fea0eca2456 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -470,6 +470,20 @@ public final Stream> getPendingCompactionOpera } } + public final List getPartitionPaths() { + try { + readLock.lock(); + return fetchAllStoredFileGroups() + .filter(fg -> !isFileGroupReplaced(fg)) + .map(HoodieFileGroup::getPartitionPath) + .distinct() + .map(name -> name.isEmpty() ? metaClient.getBasePathV2() : new Path(metaClient.getBasePathV2(), name)) + .collect(Collectors.toList()); + } finally { + readLock.unlock(); + } + } + @Override public final Stream getLatestBaseFiles(String partitionStr) { try { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 75bc96624e7b0..afa43cee209fe 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -104,14 +104,22 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters) buildSplits(fileSlices.values.flatten.toSeq) } else { - // TODO refactor to avoid iterating over listed files multiple times - val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters) - val partitionPaths = partitions.keys.toSeq + val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths) + val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val partitionPaths = fsView.getPartitionPaths.asScala + if (partitionPaths.isEmpty || latestInstant.isEmpty) { // If this an empty table OR it has no completed commits yet, return List.empty[HoodieMergeOnReadFileSplit] } else { - val fileSlices = listFileSlices(partitionPaths) + val queryTimestamp = this.queryTimestamp.get + + val fileSlices = partitionPaths.flatMap { partitionPath => + val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq + } buildSplits(fileSlices) } } @@ -130,20 +138,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles) }.toList } - - private def listFileSlices(partitionPaths: Seq[Path]): Seq[FileSlice] = { - // NOTE: It's critical for us to re-use [[InMemoryFileIndex]] to make sure we're leveraging - // [[FileStatusCache]] and avoid listing the whole table again - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, partitionPaths) - val fsView = new HoodieTableFileSystemView(metaClient, timeline, inMemoryFileIndex.allFiles.toArray) - - val queryTimestamp = this.queryTimestamp.get - - partitionPaths.flatMap { partitionPath => - val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath) - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq - } - } } object MergeOnReadSnapshotRelation { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala index 8ad78af7f3267..3e541bb09a0c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/execution/datasources/HoodieInMemoryFileIndex.scala @@ -20,9 +20,12 @@ package org.apache.spark.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.hudi.SparkAdapterSupport import org.apache.spark.HoodieHadoopFSUtils import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType @@ -34,7 +37,77 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession, parameters: Map[String, String], userSpecifiedSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) - extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) { + extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) + with SparkAdapterSupport { + + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is unpartitioned, + * this will return a single partition with no partition values + * + * NOTE: This method replicates the one it overrides, however it uses custom method + * that accepts files starting with "." + */ + override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { + PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + } else { + prunePartitions(partitionFilters, partitionSpec()).map { + case PartitionPath(values, path) => + val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { + case Some(existingDir) => + // Directory has children files in it, return them + existingDir.filter(f => isDataPath(f.getPath)) + + case None => + // Directory does not exist, or has no children files + Nil + } + PartitionDirectory(values, files) + } + } + logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) + selectedPartitions + } + + private def isDataPath(path: Path): Boolean = { + val name = path.getName + !(name.startsWith("_") && !name.contains("=")) + } + + private def prunePartitions( + predicates: Seq[Expression], + partitionSpec: PartitionSpec): Seq[PartitionPath] = { + val PartitionSpec(partitionColumns, partitions) = partitionSpec + val partitionColumnNames = partitionColumns.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = sparkAdapter.createInterpretedPredicate(predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) + + val selected = partitions.filter { + case PartitionPath(values, _) => boundPredicate.eval(values) + } + logInfo { + val total = partitions.length + val selectedSize = selected.length + val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 + s"Selected $selectedSize partitions out of $total, " + + s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions." + } + + selected + } else { + partitions + } + } /** * List leaf files of given paths. This method will submit a Spark job to do parallel diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index f9f14438933f3..ccb98deca884d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -719,6 +719,8 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .save(basePath) // There should no base file in the file list. assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) + // Test read logs only mor table with glob paths. + assertEquals(20, spark.read.format("hudi").load(basePath + "/*/*/*/*").count()) // Test read log only mor table. assertEquals(20, spark.read.format("hudi").load(basePath).count()) } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 0e74c997d7ee4..03268237ddd3b 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -24,7 +24,7 @@ import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark2_4AvroDeserializer, HoodieSpark2_4AvroSerializer, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} @@ -168,4 +168,8 @@ class Spark2Adapter extends SparkAdapter { override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { Some(new Spark24HoodieParquetFileFormat(appendPartitionValues)) } + + override def createInterpretedPredicate(e: Expression): InterpretedPredicate = { + InterpretedPredicate.create(e) + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index e5f4476cc5a98..7a6d70d12863a 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -24,7 +24,7 @@ import org.apache.hudi.spark3.internal.ReflectUtil import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} @@ -149,4 +149,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter { None } } + + override def createInterpretedPredicate(e: Expression): InterpretedPredicate = { + Predicate.createInterpreted(e) + } }