Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ object NestedColumnAliasing {
case _ => false
}

// Note that when we group by extractors with their references, we should remove
// cosmetic variations.
val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)
val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]]
.filter(!_.references.subsetOf(exclusiveAttrSet))
.groupBy(_.references.head)
.groupBy(_.references.head.canonicalized.asInstanceOf[Attribute])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, @viirya @frankyin-factual !

.flatMap { case (attr, nestedFields: Seq[ExtractValue]) =>
// Remove redundant `ExtractValue`s if they share the same parent nest field.
// For example, when `a.b` and `a.b.c` are in project list, we only need to alias `a.b`.
Expand All @@ -174,9 +176,12 @@ object NestedColumnAliasing {

// If all nested fields of `attr` are used, we don't need to introduce new aliases.
// By default, ColumnPruning rule uses `attr` already.
// Note that we need to remove cosmetic variations first, so we only count a
// nested field once.
if (nestedFieldToAlias.nonEmpty &&
nestedFieldToAlias
.map { case (nestedField, _) => totalFieldNum(nestedField.dataType) }
dedupNestedFields.map(_.canonicalized)
.distinct
.map { nestedField => totalFieldNum(nestedField.dataType) }
.sum < totalFieldNum(attr.dataType)) {
Some(attr.exprId -> nestedFieldToAlias)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,26 @@ abstract class SchemaPruningSuite
Row(Row("Janet", null, "Jones"), "Jones") ::Nil)
}

testSchemaPruning("SPARK-32163: nested pruning should work even with cosmetic variations") {
withTempView("contact_alias") {
sql("select * from contacts")
.repartition(100, col("name.first"), col("name.last"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue cannot happen in branch-3.0?

private def canProjectPushThrough(plan: LogicalPlan) = plan match {
case _: GlobalLimit => true
case _: LocalLimit => true
case _: Repartition => true
case _: Sample => true
case _ => false

Copy link
Member Author

@viirya viirya Jul 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is only for master branch. However this issue can happen in branch-3.0 too. I added another new test here, which is for branch-3.0.

But when we backport this to branch-3.0, we need to remove first test case as it will fail on checkScan(query, "struct<name:struct<first:string,last:string>>"), because branch-3.0 doesn't prune for repartition by expression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, thanks for adding the test.

.selectExpr("name").createOrReplaceTempView("contact_alias")

val query1 = sql("select name.first from contact_alias")
checkScan(query1, "struct<name:struct<first:string,last:string>>")
checkAnswer(query1, Row("Jane") :: Row("John") :: Row("Jim") :: Row("Janet") ::Nil)

sql("select * from contacts")
.select(explode(col("friends.first")), col("friends"))
.createOrReplaceTempView("contact_alias")

val query2 = sql("select friends.middle, col from contact_alias")
checkScan(query2, "struct<friends:array<struct<first:string,middle:string>>>")
checkAnswer(query2, Row(Array("Z."), "Susan") :: Nil)
Comment on lines +514 to +516
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu This test is for branch-3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the test case.

}
}

protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(vectorizedReaderEnabledKey -> "true") {
Expand Down