Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,28 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we donot need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}

val partitionColumns =
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")

val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)

// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)

// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,7 @@ class HDFSFileCatalog(
}

if (partitionPruningPredicates.nonEmpty) {
val predicate =
partitionPruningPredicates
.reduceOption(expressions.And)
.getOrElse(Literal(true))
val predicate = partitionPruningPredicates.reduce(expressions.And)

val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,34 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
}

test("partitioned table - case insensitive") {
withSQLConf("spark.sql.caseSensitive" -> "false") {
val table =
createTable(
files = Seq(
"p1=1/file1" -> 10,
"p1=2/file2" -> 10))

// Only one file should be read.
checkScan(table.where("P1 = 1")) { partitions =>
assert(partitions.size == 1, "when checking partitions")
assert(partitions.head.files.size == 1, "when files in partition 1")
}
// We don't need to reevaluate filters that are only on partitions.
checkDataFilters(Set.empty)

// Only one file should be read.
checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { partitions =>
assert(partitions.size == 1, "when checking partitions")
assert(partitions.head.files.size == 1, "when checking files in partition 1")
assert(partitions.head.files.head.partitionValues.getInt(0) == 1,
"when checking partition values")
}
// Only the filters that do not contain the partition column should be pushed down
checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
}
}

test("partitioned table - after scan filters") {
val table =
createTable(
Expand Down