Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String


/**
Expand Down Expand Up @@ -192,6 +193,36 @@ object FileFormat {

// create a file metadata struct col
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)

// create an internal row given required metadata fields and file information
def createMetadataInternalRow(
fieldNames: Seq[String],
filePath: Path,
fileSize: Long,
fileModificationTime: Long): InternalRow =
updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames,
filePath, fileSize, fileModificationTime)

// update an internal row given required metadata fields and file information
def updateMetadataInternalRow(
row: InternalRow,
fieldNames: Seq[String],
filePath: Path,
fileSize: Long,
fileModificationTime: Long): InternalRow = {
fieldNames.zipWithIndex.foreach { case (name, i) =>
name match {
case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString))
case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName))
case FILE_SIZE => row.update(i, fileSize)
case FILE_MODIFICATION_TIME =>
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType `file_modification_time` is stored in microsecond
row.update(i, fileModificationTime * 1000L)
}
}
row
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator

/**
Expand Down Expand Up @@ -136,18 +135,8 @@ class FileScanRDD(
*/
private def updateMetadataRow(): Unit = {
if (metadataColumns.nonEmpty && currentFile != null) {
val path = new Path(currentFile.filePath)
metadataColumns.zipWithIndex.foreach { case (attr, i) =>
attr.name match {
case FILE_PATH => metadataRow.update(i, UTF8String.fromString(path.toString))
case FILE_NAME => metadataRow.update(i, UTF8String.fromString(path.getName))
case FILE_SIZE => metadataRow.update(i, currentFile.fileSize)
case FILE_MODIFICATION_TIME =>
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType is stored in microsecond
metadataRow.update(i, currentFile.modificationTime * 1000L)
}
}
updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name),
new Path(currentFile.filePath), currentFile.fileSize, currentFile.modificationTime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.FileFormat.createMetadataInternalRow
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -71,8 +72,37 @@ abstract class PartitioningAwareFileIndex(
def isNonEmptyFile(f: FileStatus): Boolean = {
isDataPath(f.getPath) && f.getLen > 0
}

// retrieve the file metadata filters and reduce to a final filter expression
val fileMetadataFilterOpt = dataFilters.filter(_.references.forall {
case MetadataAttribute(_) => true
case _ => false
}).reduceOption(expressions.And)

// - create a bound references for filters: put the metadata struct at 0 position for each file
// - retrieve the final metadata struct (could be pruned) from filters
val boundedFilterMetadataStructOpt = fileMetadataFilterOpt.map { fileMetadataFilter =>
val metadataStruct = fileMetadataFilter.references.head.dataType
val boundedFilter = Predicate.createInterpreted(fileMetadataFilter.transform {
case _: AttributeReference => BoundReference(0, metadataStruct, nullable = true)
})
(boundedFilter, metadataStruct)
}

def matchFileMetadataPredicate(f: FileStatus): Boolean = {
// use option.forall, so if there is no filter no metadata struct, return true
boundedFilterMetadataStructOpt.forall { case (boundedFilter, metadataStruct) =>
val row = InternalRow.fromSeq(Seq(
createMetadataInternalRow(metadataStruct.asInstanceOf[StructType].names,
f.getPath, f.getLen, f.getModificationTime)
))
boundedFilter.eval(row)
}
}

val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
PartitionDirectory(InternalRow.empty, allFiles()
.filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil
} else {
if (recursiveFileLookup) {
throw new IllegalArgumentException(
Expand All @@ -83,7 +113,8 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) &&
matchFileMetadataPredicate(f))

case None =>
// Directory does not exist, or has no children files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}

metadataColumnsTest("filter", schema) { (df, f0, _) =>
val filteredDF = df.select("name", "age", METADATA_FILE_NAME)
.where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME))

// check the filtered file
val partitions = filteredDF.queryExecution.sparkPlan.collectFirst {
case p: FileSourceScanExec => p.selectedPartitions
}.get

assert(partitions.length == 1) // 1 partition
assert(partitions.head.files.length == 1) // 1 file in that partition
assert(partitions.head.files.head.getPath.toString == f0(METADATA_FILE_PATH)) // the file is f0

// check result
checkAnswer(
df.select("name", "age", METADATA_FILE_NAME)
.where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME)),
filteredDF,
Seq(
// _file_name == f0's name, so we will only have 1 row
Row("jack", 24, f0(METADATA_FILE_NAME))
)
)
}

metadataColumnsTest("filter on metadata and user data", schema) { (df, _, f1) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test fail before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also check the physical plan and make sure the final file list is pruned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it won't fail before. I added this test just wanna make sure we only get metadata filters in listFiles(...) and do the correct filtering here

thanks for the suggestion! updated tests to check files!


val filteredDF = df.select("name", "age", "info",
METADATA_FILE_NAME, METADATA_FILE_PATH,
METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME)
// mix metadata column + user column
.where(Column(METADATA_FILE_NAME) === f1(METADATA_FILE_NAME) and Column("name") === "lily")
// only metadata columns
.where(Column(METADATA_FILE_PATH) === f1(METADATA_FILE_PATH))
// only user column
.where("age == 31")

// check the filtered file
val partitions = filteredDF.queryExecution.sparkPlan.collectFirst {
case p: FileSourceScanExec => p.selectedPartitions
}.get

assert(partitions.length == 1) // 1 partition
assert(partitions.head.files.length == 1) // 1 file in that partition
assert(partitions.head.files.head.getPath.toString == f1(METADATA_FILE_PATH)) // the file is f1

// check result
checkAnswer(
filteredDF,
Seq(Row("lily", 31, Row(54321L, "ucb"),
f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
)
}

Seq(true, false).foreach { caseSensitive =>
metadataColumnsTest(s"upper/lower case when case " +
s"sensitive is $caseSensitive", schemaWithNameConflicts) { (df, f0, f1) =>
Expand Down