diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index c3bcf06b6e5f..02d88e9ffa43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.unsafe.types.UTF8String /** @@ -192,6 +193,36 @@ object FileFormat { // create a file metadata struct col def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT) + + // create an internal row given required metadata fields and file information + def createMetadataInternalRow( + fieldNames: Seq[String], + filePath: Path, + fileSize: Long, + fileModificationTime: Long): InternalRow = + updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames, + filePath, fileSize, fileModificationTime) + + // update an internal row given required metadata fields and file information + def updateMetadataInternalRow( + row: InternalRow, + fieldNames: Seq[String], + filePath: Path, + fileSize: Long, + fileModificationTime: Long): InternalRow = { + fieldNames.zipWithIndex.foreach { case (name, i) => + name match { + case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString)) + case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName)) + case FILE_SIZE => row.update(i, fileSize) + case FILE_MODIFICATION_TIME => + // the modificationTime from the file is in millisecond, + // while internally, the TimestampType `file_modification_time` is stored in microsecond + row.update(i, fileModificationTime * 1000L) + } + } + row + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 5baa59758255..b2c7931b661e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.datasources.FileFormat._ import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.types.{LongType, StringType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator /** @@ -136,18 +135,8 @@ class FileScanRDD( */ private def updateMetadataRow(): Unit = { if (metadataColumns.nonEmpty && currentFile != null) { - val path = new Path(currentFile.filePath) - metadataColumns.zipWithIndex.foreach { case (attr, i) => - attr.name match { - case FILE_PATH => metadataRow.update(i, UTF8String.fromString(path.toString)) - case FILE_NAME => metadataRow.update(i, UTF8String.fromString(path.getName)) - case FILE_SIZE => metadataRow.update(i, currentFile.fileSize) - case FILE_MODIFICATION_TIME => - // the modificationTime from the file is in millisecond, - // while internally, the TimestampType is stored in microsecond - metadataRow.update(i, currentFile.modificationTime * 1000L) - } - } + updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name), + new Path(currentFile.filePath), currentFile.fileSize, currentFile.modificationTime) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 5b0d0606da09..9b56bcf35365 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.FileFormat.createMetadataInternalRow import org.apache.spark.sql.types.StructType /** @@ -71,8 +72,37 @@ abstract class PartitioningAwareFileIndex( def isNonEmptyFile(f: FileStatus): Boolean = { isDataPath(f.getPath) && f.getLen > 0 } + + // retrieve the file metadata filters and reduce to a final filter expression + val fileMetadataFilterOpt = dataFilters.filter(_.references.forall { + case MetadataAttribute(_) => true + case _ => false + }).reduceOption(expressions.And) + + // - create a bound references for filters: put the metadata struct at 0 position for each file + // - retrieve the final metadata struct (could be pruned) from filters + val boundedFilterMetadataStructOpt = fileMetadataFilterOpt.map { fileMetadataFilter => + val metadataStruct = fileMetadataFilter.references.head.dataType + val boundedFilter = Predicate.createInterpreted(fileMetadataFilter.transform { + case _: AttributeReference => BoundReference(0, metadataStruct, nullable = true) + }) + (boundedFilter, metadataStruct) + } + + def matchFileMetadataPredicate(f: FileStatus): Boolean = { + // use option.forall, so if there is no filter no metadata struct, return true + boundedFilterMetadataStructOpt.forall { case (boundedFilter, metadataStruct) => + val row = InternalRow.fromSeq(Seq( + createMetadataInternalRow(metadataStruct.asInstanceOf[StructType].names, + f.getPath, f.getLen, f.getModificationTime) + )) + boundedFilter.eval(row) + } + } + val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil + PartitionDirectory(InternalRow.empty, allFiles() + .filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil } else { if (recursiveFileLookup) { throw new IllegalArgumentException( @@ -83,7 +113,8 @@ abstract class PartitioningAwareFileIndex( val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f)) + existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) && + matchFileMetadataPredicate(f)) case None => // Directory does not exist, or has no children files diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 8bf5d6183925..0d391e0dcd5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -279,9 +279,21 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { } metadataColumnsTest("filter", schema) { (df, f0, _) => + val filteredDF = df.select("name", "age", METADATA_FILE_NAME) + .where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME)) + + // check the filtered file + val partitions = filteredDF.queryExecution.sparkPlan.collectFirst { + case p: FileSourceScanExec => p.selectedPartitions + }.get + + assert(partitions.length == 1) // 1 partition + assert(partitions.head.files.length == 1) // 1 file in that partition + assert(partitions.head.files.head.getPath.toString == f0(METADATA_FILE_PATH)) // the file is f0 + + // check result checkAnswer( - df.select("name", "age", METADATA_FILE_NAME) - .where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME)), + filteredDF, Seq( // _file_name == f0's name, so we will only have 1 row Row("jack", 24, f0(METADATA_FILE_NAME)) @@ -289,6 +301,36 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { ) } + metadataColumnsTest("filter on metadata and user data", schema) { (df, _, f1) => + + val filteredDF = df.select("name", "age", "info", + METADATA_FILE_NAME, METADATA_FILE_PATH, + METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME) + // mix metadata column + user column + .where(Column(METADATA_FILE_NAME) === f1(METADATA_FILE_NAME) and Column("name") === "lily") + // only metadata columns + .where(Column(METADATA_FILE_PATH) === f1(METADATA_FILE_PATH)) + // only user column + .where("age == 31") + + // check the filtered file + val partitions = filteredDF.queryExecution.sparkPlan.collectFirst { + case p: FileSourceScanExec => p.selectedPartitions + }.get + + assert(partitions.length == 1) // 1 partition + assert(partitions.head.files.length == 1) // 1 file in that partition + assert(partitions.head.files.head.getPath.toString == f1(METADATA_FILE_PATH)) // the file is f1 + + // check result + checkAnswer( + filteredDF, + Seq(Row("lily", 31, Row(54321L, "ucb"), + f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + ) + } + Seq(true, false).foreach { caseSensitive => metadataColumnsTest(s"upper/lower case when case " + s"sensitive is $caseSensitive", schemaWithNameConflicts) { (df, f0, f1) =>