Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_STRUCT
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
* An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
Expand Down Expand Up @@ -71,8 +73,34 @@ abstract class PartitioningAwareFileIndex(
def isNonEmptyFile(f: FileStatus): Boolean = {
isDataPath(f.getPath) && f.getLen > 0
}

// retrieve the file metadata filters and reduce to a final filter expression
val fileMetadataFilterOpt = dataFilters.filter(_.references.forall {
case MetadataAttribute(_) => true
case _ => false
}).reduceOption(expressions.And)

// will create internal rows for each file, position 0: Metadata Struct
val boundedFilterOpt = fileMetadataFilterOpt.map { fileMetadataFilter =>
Predicate.createInterpreted(fileMetadataFilter.transform {
case _: AttributeReference => BoundReference(0, METADATA_STRUCT, nullable = true)
})
}

def matchFileMetadataPredicate(f: FileStatus): Boolean = {
// use option.forall, so if there is no filter, return true
boundedFilterOpt.forall(_.eval(
InternalRow.fromSeq(Seq(InternalRow.fromSeq(Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a util method to create InternalRow from Path, length: Long and modificationTime: Long? To share code with FileScanRDD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have this util method and share the common code maybe along with the metadata schema pruning PR?

I am thinking after pruning, we don't need to always create InternalRow with all fields in both PartitioningAwareFileIndex.listFiles(...) and FileScanRDD. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

UTF8String.fromString(f.getPath.toString),
UTF8String.fromString(f.getPath.getName),
f.getLen,
f.getModificationTime * 1000L))))
))
}

val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
PartitionDirectory(InternalRow.empty, allFiles()
.filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil
} else {
if (recursiveFileLookup) {
throw new IllegalArgumentException(
Expand All @@ -83,7 +111,8 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) &&
matchFileMetadataPredicate(f))

case None =>
// Directory does not exist, or has no children files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,20 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
)
}

metadataColumnsTest("filter on metadata and user data", schema) { (df, _, f1) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test fail before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also check the physical plan and make sure the final file list is pruned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it won't fail before. I added this test just wanna make sure we only get metadata filters in listFiles(...) and do the correct filtering here

thanks for the suggestion! updated tests to check files!

checkAnswer(
df.select("name", "age", "info",
METADATA_FILE_NAME, METADATA_FILE_PATH,
METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME)
.where(Column(METADATA_FILE_NAME) === f1(METADATA_FILE_NAME) and Column("name") === "lily")
.where(Column(METADATA_FILE_PATH) === f1(METADATA_FILE_PATH))
.where("age == 31"),
Seq(Row("lily", 31, Row(54321L, "ucb"),
f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
)
}

Seq(true, false).foreach { caseSensitive =>
metadataColumnsTest(s"upper/lower case when case " +
s"sensitive is $caseSensitive", schemaWithNameConflicts) { (df, f0, f1) =>
Expand Down