diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 118f41f9cd232..0c8666b72cace 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -149,10 +149,12 @@ object NestedColumnAliasing { case _ => false } + // Note that when we group by extractors with their references, we should remove + // cosmetic variations. val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences) val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]] .filter(!_.references.subsetOf(exclusiveAttrSet)) - .groupBy(_.references.head) + .groupBy(_.references.head.canonicalized.asInstanceOf[Attribute]) .flatMap { case (attr, nestedFields: Seq[ExtractValue]) => // Remove redundant `ExtractValue`s if they share the same parent nest field. // For example, when `a.b` and `a.b.c` are in project list, we only need to alias `a.b`. @@ -174,9 +176,12 @@ object NestedColumnAliasing { // If all nested fields of `attr` are used, we don't need to introduce new aliases. // By default, ColumnPruning rule uses `attr` already. + // Note that we need to remove cosmetic variations first, so we only count a + // nested field once. if (nestedFieldToAlias.nonEmpty && - nestedFieldToAlias - .map { case (nestedField, _) => totalFieldNum(nestedField.dataType) } + dedupNestedFields.map(_.canonicalized) + .distinct + .map { nestedField => totalFieldNum(nestedField.dataType) } .sum < totalFieldNum(attr.dataType)) { Some(attr.exprId -> nestedFieldToAlias) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index 8b859e951b9b9..d51eafa5a8aed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -497,6 +497,26 @@ abstract class SchemaPruningSuite Row(Row("Janet", null, "Jones"), "Jones") ::Nil) } + testSchemaPruning("SPARK-32163: nested pruning should work even with cosmetic variations") { + withTempView("contact_alias") { + sql("select * from contacts") + .repartition(100, col("name.first"), col("name.last")) + .selectExpr("name").createOrReplaceTempView("contact_alias") + + val query1 = sql("select name.first from contact_alias") + checkScan(query1, "struct>") + checkAnswer(query1, Row("Jane") :: Row("John") :: Row("Jim") :: Row("Janet") ::Nil) + + sql("select * from contacts") + .select(explode(col("friends.first")), col("friends")) + .createOrReplaceTempView("contact_alias") + + val query2 = sql("select friends.middle, col from contact_alias") + checkScan(query2, "struct>>") + checkAnswer(query2, Row(Array("Z."), "Susan") :: Nil) + } + } + protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = { test(s"Spark vectorized reader - without partition data column - $testName") { withSQLConf(vectorizedReaderEnabledKey -> "true") {